diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 5e5ab974b7..4fe6691e4b 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -560,8 +560,10 @@ class RepositoryConfig(BaseSettings): CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY: str = Field( description="Repository implementation for WorkflowNodeExecution. Options: " - "'core.repositories.sqlalchemy_workflow_node_execution_repository.SQLAlchemyWorkflowNodeExecutionRepository' (default), " - "'core.repositories.celery_workflow_node_execution_repository.CeleryWorkflowNodeExecutionRepository'", + "'core.repositories.sqlalchemy_workflow_node_execution_repository." + "SQLAlchemyWorkflowNodeExecutionRepository' (default), " + "'core.repositories.celery_workflow_node_execution_repository." + "CeleryWorkflowNodeExecutionRepository'", default="core.repositories.sqlalchemy_workflow_node_execution_repository.SQLAlchemyWorkflowNodeExecutionRepository", ) diff --git a/api/core/repositories/celery_workflow_execution_repository.py b/api/core/repositories/celery_workflow_execution_repository.py index 39e0c99337..4d7d518536 100644 --- a/api/core/repositories/celery_workflow_execution_repository.py +++ b/api/core/repositories/celery_workflow_execution_repository.py @@ -8,7 +8,7 @@ providing improved performance by offloading database operations to background w import logging from typing import Optional, Union -from celery.result import AsyncResult +from celery.result import AsyncResult # type: ignore[import-untyped] from sqlalchemy.engine import Engine from sqlalchemy.orm import sessionmaker @@ -40,6 +40,15 @@ class CeleryWorkflowExecutionRepository(WorkflowExecutionRepository): - Configurable timeouts for async operations """ + _session_factory: sessionmaker + _tenant_id: str + _app_id: Optional[str] + _triggered_from: Optional[WorkflowRunTriggeredFrom] + _creator_user_id: str + _creator_user_role: CreatorUserRole + _async_timeout: int + _pending_saves: dict[str, AsyncResult] + def __init__( self, session_factory: sessionmaker | Engine, @@ -72,7 +81,7 @@ class CeleryWorkflowExecutionRepository(WorkflowExecutionRepository): tenant_id = extract_tenant_id(user) if not tenant_id: raise ValueError("User must have a tenant_id or current_tenant_id") - self._tenant_id = tenant_id + self._tenant_id = tenant_id # type: ignore[assignment] # We've already checked tenant_id is not None # Store app context self._app_id = app_id diff --git a/api/core/repositories/celery_workflow_node_execution_repository.py b/api/core/repositories/celery_workflow_node_execution_repository.py index c18f0c93a1..61d9014794 100644 --- a/api/core/repositories/celery_workflow_node_execution_repository.py +++ b/api/core/repositories/celery_workflow_node_execution_repository.py @@ -9,7 +9,7 @@ import logging from collections.abc import Sequence from typing import Optional, Union -from celery.result import AsyncResult +from celery.result import AsyncResult # type: ignore[import-untyped] from sqlalchemy.engine import Engine from sqlalchemy.orm import sessionmaker @@ -46,6 +46,16 @@ class CeleryWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository): - Batch operations for improved efficiency """ + _session_factory: sessionmaker + _tenant_id: str + _app_id: Optional[str] + _triggered_from: Optional[WorkflowNodeExecutionTriggeredFrom] + _creator_user_id: str + _creator_user_role: CreatorUserRole + _async_timeout: int + _pending_saves: dict[str, AsyncResult] + _workflow_execution_mapping: dict[str, str] + def __init__( self, session_factory: sessionmaker | Engine, @@ -78,7 +88,7 @@ class CeleryWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository): tenant_id = extract_tenant_id(user) if not tenant_id: raise ValueError("User must have a tenant_id or current_tenant_id") - self._tenant_id = tenant_id + self._tenant_id = tenant_id # type: ignore[assignment] # We've already checked tenant_id is not None # Store app context self._app_id = app_id @@ -132,7 +142,8 @@ class CeleryWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository): self._pending_saves[execution.id] = task_result # Cache the workflow_execution_id mapping for efficient workflow-specific waiting - self._workflow_execution_mapping[execution.id] = execution.workflow_execution_id + if execution.workflow_execution_id: + self._workflow_execution_mapping[execution.id] = execution.workflow_execution_id logger.debug(f"Queued async save for workflow node execution: {execution.id}") diff --git a/api/tasks/workflow_execution_tasks.py b/api/tasks/workflow_execution_tasks.py index 93f3604ed3..76c7faebae 100644 --- a/api/tasks/workflow_execution_tasks.py +++ b/api/tasks/workflow_execution_tasks.py @@ -8,7 +8,7 @@ improving performance by offloading storage operations to background workers. import json import logging -from celery import shared_task +from celery import shared_task # type: ignore[import-untyped] from sqlalchemy import select from sqlalchemy.orm import sessionmaker diff --git a/api/tasks/workflow_node_execution_tasks.py b/api/tasks/workflow_node_execution_tasks.py index e2b5ff7ab2..58098c681f 100644 --- a/api/tasks/workflow_node_execution_tasks.py +++ b/api/tasks/workflow_node_execution_tasks.py @@ -9,7 +9,7 @@ import json import logging from typing import Optional -from celery import shared_task +from celery import shared_task # type: ignore[import-untyped] from sqlalchemy import select from sqlalchemy.orm import sessionmaker