Compare commits

..

2 Commits

Author SHA1 Message Date
53771e53f5 + Some thread debugging 2024-08-02 16:21:26 +04:00
7571373443 Trying to fix thread leak: maybe some async will help? 2024-08-02 16:20:18 +04:00
15 changed files with 147 additions and 86 deletions

View File

@ -1,13 +1,32 @@
from chain_service.logging import setup_logging
import logging
import threading
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from chain_service.logging import setup_logging
from chain_service.controllers.__main__ import setup_controllers
setup_logging()
application = FastAPI()
logging.basicConfig(level=logging.DEBUG)
# Subclass threading.Thread for logging
class DebugThread(threading.Thread):
def __init__(self, *args, **kwargs):
logging.debug(f"Creating thread {args} {kwargs}")
super().__init__(*args, **kwargs)
def _delete(self):
logging.debug(f"Deleting thread {self.name}")
super()._delete()
threading.Thread = DebugThread
application.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173"],

View File

@ -1,16 +1,15 @@
from typing import Annotated
from loguru import logger
from fastapi import APIRouter, HTTPException, Depends
from fastapi import APIRouter, HTTPException
from chain_service.database.models.chain import Chain
from chain_service.dependencies.chain_repository import get_chain_repository
from chain_service.dependencies.namespace_repository import get_namespace_repository
from chain_service.dependencies.file_uploader_service import get_file_uploader_service
from chain_service.repositories.chain import ChainRepository
from chain_service.repositories.namespace import NamespaceRepository
from chain_service.services.file_uploader import FileUploaderService
from chain_service.dependencies.chain_repository import ChainRepositoryDependency
from chain_service.dependencies.namespace_repository import (
NamespaceRepositoryDependency,
)
from chain_service.dependencies.file_uploader_service import (
FileUploaderServiceDependency,
)
router = APIRouter(prefix="/chain")
@ -18,9 +17,9 @@ router = APIRouter(prefix="/chain")
@router.post("/")
async def chain_upsert_controller(
chain: Chain,
chain_repository: Annotated[ChainRepository, Depends(get_chain_repository)],
namespace_repository: Annotated[NamespaceRepository, Depends(get_namespace_repository)],
file_uploader_service: Annotated[FileUploaderService, Depends(get_file_uploader_service)],
chain_repository: ChainRepositoryDependency,
namespace_repository: NamespaceRepositoryDependency,
file_uploader_service: FileUploaderServiceDependency,
):
try:
assert await namespace_repository.get_by_name(name=chain.namespace_id)
@ -40,8 +39,8 @@ async def chain_upsert_controller(
@router.get("/list")
async def chain_list_controller(
namespace_id: str,
chain_repository: Annotated[ChainRepository, Depends(get_chain_repository)],
namespace_repository: Annotated[NamespaceRepository, Depends(get_namespace_repository)],
chain_repository: ChainRepositoryDependency,
namespace_repository: NamespaceRepositoryDependency,
):
try:
@ -60,7 +59,7 @@ async def chain_list_controller(
@router.get("/{chain_id}")
async def chain_get_controller(
chain_id: str, chain_repository: Annotated[ChainRepository, Depends(get_chain_repository)]
chain_id: str, chain_repository: ChainRepositoryDependency
):
try:
assert (chain := await chain_repository.get_by_id(chain_id))
@ -77,7 +76,7 @@ async def chain_get_controller(
@router.delete("/delete/{chain_id}")
async def chain_delete_controller(
chain_id: str, chain_repository: Annotated[ChainRepository, Depends(get_chain_repository)]
chain_id: str, chain_repository: ChainRepositoryDependency
):
try:
await chain_repository.delete_by_id(chain_id)

View File

@ -1,19 +1,18 @@
from typing import Annotated
from loguru import logger
from fastapi import APIRouter, HTTPException, Depends
from fastapi import APIRouter, HTTPException
from chain_service.database.models.namespace import Namespace
from chain_service.dependencies.namespace_repository import get_namespace_repository
from chain_service.repositories.namespace import NamespaceRepository
from chain_service.dependencies.namespace_repository import (
NamespaceRepositoryDependency,
)
router = APIRouter(prefix="/namespace")
@router.post("/")
async def namespace_controller(
namespace: Namespace, namespace_repository: Annotated[NamespaceRepository, Depends(get_namespace_repository)]
namespace: Namespace, namespace_repository: NamespaceRepositoryDependency
):
try:
upserted_namespace = await namespace_repository.upsert(namespace)
@ -26,7 +25,7 @@ async def namespace_controller(
# @router.get("/{namespace_name}")
async def namespace_get_by_name_controller(
namespace_name: str, namespace_repository: Annotated[NamespaceRepository, Depends(get_namespace_repository)]
namespace_name: str, namespace_repository: NamespaceRepositoryDependency
):
try:
assert (namespace := await namespace_repository.get_by_name(namespace_name))

View File

@ -1,22 +1,24 @@
from typing import Annotated
from chain_service.repositories.chain import ChainRepository
from chain_service.repositories.progress_chain import ProgressChainRepository
from chain_service.repositories.running_chain import RunningChainRepository
from chain_service.schema.run_chain import RunChainInput, AbortChainInput
from chain_service.dependencies.chain_repository import get_chain_repository
from chain_service.dependencies.progress_chain_repository import get_progress_chain_repository
from chain_service.dependencies.progress_chain_runner_service import get_progress_chain_runner_service
from chain_service.dependencies.running_chain_repository import get_running_chain_repository
from chain_service.dependencies.chain_repository import ChainRepositoryDependency
from chain_service.dependencies.progress_chain_repository import (
ProgressChainRepositoryDependency,
)
from chain_service.dependencies.progress_chain_runner_service import (
ProgressChainRunnerServiceDependency,
)
from chain_service.dependencies.running_chain_repository import (
RunningChainRepositoryDependency,
)
from chain_service.database.models.progress_chain import ProgressChain
import asyncio
from loguru import logger
from fastapi import APIRouter, HTTPException, Depends
from chain_service.services.progress_chain_runner import ProgressChainRunnerService
from fastapi import APIRouter, HTTPException
router = APIRouter()
@ -24,10 +26,10 @@ router = APIRouter()
@router.post("/run_chain")
async def run_chain_controller(
run_chain_input: RunChainInput,
chain_repository: Annotated[ChainRepository, Depends(get_chain_repository)],
progress_chain_repository: Annotated[ProgressChainRepository, Depends(get_progress_chain_repository)],
progress_chain_runner_service: Annotated[ProgressChainRunnerService, Depends(get_progress_chain_runner_service)],
running_chain_repository: Annotated[RunningChainRepository, Depends(get_running_chain_repository)],
chain_repository: ChainRepositoryDependency,
progress_chain_repository: ProgressChainRepositoryDependency,
progress_chain_runner_service: ProgressChainRunnerServiceDependency,
running_chain_repository: RunningChainRepositoryDependency,
):
try:
assert (chain := await chain_repository.get_by_id(run_chain_input.chain_id))
@ -68,7 +70,7 @@ async def run_chain_controller(
@router.post("/abort_chain")
async def abort_chain_controller(
abort_chain_input: AbortChainInput,
running_chain_repository: Annotated[RunningChainRepository, Depends(get_running_chain_repository)],
running_chain_repository: RunningChainRepositoryDependency,
):
try:
assert await running_chain_repository.exists(str(abort_chain_input.task_id))
@ -89,7 +91,7 @@ async def abort_chain_controller(
@router.post("/abort_all_chains")
async def abort_all_chains_controller(
running_chain_repository: Annotated[RunningChainRepository, Depends(get_running_chain_repository)],
running_chain_repository: RunningChainRepositoryDependency,
):
try:
await running_chain_repository.delete_all()

View File

@ -1,5 +1,13 @@
from chain_service.services.audio_converter import AudioConverterService
from fastapi import Depends
from typing import Annotated
async def get_audio_converter_service() -> AudioConverterService:
return AudioConverterService()
AudioConverterServiceDependency = Annotated[
AudioConverterService, Depends(get_audio_converter_service)
]

View File

@ -1,9 +1,12 @@
from .database import get_database, Database
from .database import DatabaseDependency
from chain_service.repositories.chain import ChainRepository
from fastapi import Depends
from typing import Annotated
async def get_chain_repository(database: Annotated[Database, Depends(get_database)]) -> ChainRepository:
async def get_chain_repository(database: DatabaseDependency) -> ChainRepository:
return ChainRepository(database=database)
ChainRepositoryDependency = Annotated[ChainRepository, Depends(get_chain_repository)]

View File

@ -1,6 +1,9 @@
from chain_service.settings import Settings
from chain_service.database.database import Database
from fastapi import Depends
from typing import Annotated
async def get_database() -> Database:
settings = Settings()
@ -8,3 +11,6 @@ async def get_database() -> Database:
return Database(
database_url=settings.database_url, database_name=settings.database_name
)
DatabaseDependency = Annotated[Database, Depends(get_database)]

View File

@ -1,25 +1,25 @@
from planfix_client import PlanfixClient
from chain_service.services.file_uploader import FileUploaderService
from .planfix_client import get_planfix_client
from .uploaded_file_repository import get_uploaded_file_repository
from .audio_converter_service import get_audio_converter_service
from .planfix_client import PlanfixClientDependency
from .uploaded_file_repository import UploadedFileRepositoryDependency
from .audio_converter_service import AudioConverterServiceDependency
from fastapi import Depends
from typing import Annotated
from chain_service.repositories.uploaded_file import UploadedFileRepository
from chain_service.services.audio_converter import AudioConverterService
async def get_file_uploader_service(
planfix_client: Annotated[PlanfixClient, Depends(get_planfix_client)],
uploaded_file_repository: Annotated[UploadedFileRepository, Depends(get_uploaded_file_repository)],
audio_converter_service: Annotated[AudioConverterService, Depends(get_audio_converter_service)],
planfix_client: PlanfixClientDependency,
uploaded_file_repository: UploadedFileRepositoryDependency,
audio_converter_service: AudioConverterServiceDependency,
) -> FileUploaderService:
return FileUploaderService(
planfix_client=planfix_client,
uploaded_file_repository=uploaded_file_repository,
audio_converter_service=audio_converter_service,
)
FileUploaderServiceDependency = Annotated[
FileUploaderService, Depends(get_file_uploader_service)
]

View File

@ -1,11 +1,15 @@
from .database import DatabaseDependency
from chain_service.repositories.namespace import NamespaceRepository
from chain_service.database.database import Database
from .database import get_database
from fastapi import Depends
from typing import Annotated
async def get_namespace_repository(database: Annotated[Database, Depends(get_database)]) -> NamespaceRepository:
async def get_namespace_repository(database: DatabaseDependency) -> NamespaceRepository:
return NamespaceRepository(database=database)
NamespaceRepositoryDependency = Annotated[
NamespaceRepository, Depends(get_namespace_repository)
]

View File

@ -1,6 +1,9 @@
from planfix_client import PlanfixClient
from chain_service.settings import Settings
from fastapi import Depends
from typing import Annotated
async def get_planfix_client() -> PlanfixClient:
settings = Settings()
@ -8,3 +11,6 @@ async def get_planfix_client() -> PlanfixClient:
return PlanfixClient(
planfix_hostname=settings.planfix_hostname, planfix_token=settings.planfix_token
)
PlanfixClientDependency = Annotated[PlanfixClient, Depends(get_planfix_client)]

View File

@ -1,21 +1,23 @@
from planfix_client import PlanfixClient
from .planfix_client import get_planfix_client
from .planfix_client import PlanfixClientDependency
from chain_service.services.progress_action.factory import ProgressActionServiceFactory
from chain_service.dependencies.uploaded_file_repository import get_uploaded_file_repository
from chain_service.dependencies.uploaded_file_repository import (
UploadedFileRepositoryDependency,
)
from fastapi import Depends
from typing import Annotated
from chain_service.repositories.uploaded_file import UploadedFileRepository
async def get_progress_action_service_factory(
planfix_client: Annotated[PlanfixClient, Depends(get_planfix_client)],
uploaded_file_repository: Annotated[UploadedFileRepository, Depends(get_uploaded_file_repository)]
planfix_client: PlanfixClientDependency,
uploaded_file_repository: UploadedFileRepositoryDependency,
) -> ProgressActionServiceFactory:
return ProgressActionServiceFactory(
planfix_client=planfix_client, uploaded_file_repository=uploaded_file_repository
)
ProgressActionServiceFactoryDependency = Annotated[
ProgressActionServiceFactory, Depends(get_progress_action_service_factory)
]

View File

@ -1,12 +1,16 @@
from chain_service.database.database import Database
from .database import DatabaseDependency
from chain_service.repositories.progress_chain import ProgressChainRepository
from .database import get_database
from fastapi import Depends
from typing import Annotated
async def get_progress_chain_repository(
database: Annotated[Database, Depends(get_database)],
database: DatabaseDependency,
) -> ProgressChainRepository:
return ProgressChainRepository(database=database)
ProgressChainRepositoryDependency = Annotated[
ProgressChainRepository, Depends(get_progress_chain_repository)
]

View File

@ -1,24 +1,26 @@
from chain_service.services.progress_chain_runner import ProgressChainRunnerService
from .progress_chain_repository import get_progress_chain_repository
from .progress_action_service_factory import get_progress_action_service_factory
from .running_chain_repository import get_running_chain_repository
from .progress_chain_repository import ProgressChainRepositoryDependency
from .progress_action_service_factory import ProgressActionServiceFactoryDependency
from .running_chain_repository import RunningChainRepositoryDependency
from fastapi import Depends
from typing import Annotated
from chain_service.repositories.progress_chain import ProgressChainRepository
from chain_service.repositories.running_chain import RunningChainRepository
from chain_service.services.progress_action.factory import ProgressActionServiceFactory
async def get_progress_chain_runner_service(
progress_chain_repository: Annotated[ProgressChainRepository, Depends(get_progress_chain_repository)],
progress_action_service_factory: Annotated[ProgressActionServiceFactory, Depends(get_progress_action_service_factory)],
running_chain_repository: Annotated[RunningChainRepository, Depends(get_running_chain_repository)],
progress_chain_repository: ProgressChainRepositoryDependency,
progress_action_service_factory: ProgressActionServiceFactoryDependency,
running_chain_repository: RunningChainRepositoryDependency,
) -> ProgressChainRunnerService:
return ProgressChainRunnerService(
progress_chain_repository=progress_chain_repository,
progress_action_service_factory=progress_action_service_factory,
running_chain_repository=running_chain_repository,
)
ProgressChainRunnerServiceDependency = Annotated[
ProgressChainRunnerService, Depends(get_progress_chain_runner_service)
]

View File

@ -1,5 +1,4 @@
from .database import get_database
from chain_service.database.database import Database
from .database import DatabaseDependency
from chain_service.repositories.running_chain import RunningChainRepository
from fastapi import Depends
@ -7,6 +6,11 @@ from typing import Annotated
async def get_running_chain_repository(
database: Annotated[Database, Depends(get_database)],
database: DatabaseDependency,
) -> RunningChainRepository:
return RunningChainRepository(database=database)
RunningChainRepositoryDependency = Annotated[
RunningChainRepository, Depends(get_running_chain_repository)
]

View File

@ -1,13 +1,16 @@
from chain_service.database.database import Database
from .database import DatabaseDependency
from chain_service.repositories.uploaded_file import UploadedFileRepository
from .database import get_database
from fastapi import Depends
from typing import Annotated
async def get_uploaded_file_repository(
database: Annotated[Database, Depends(get_database)],
database: DatabaseDependency,
) -> UploadedFileRepository:
return UploadedFileRepository(database=database)
UploadedFileRepositoryDependency = Annotated[
UploadedFileRepository, Depends(get_uploaded_file_repository)
]