pull/20050/head
liangxin 10 months ago
parent d882f73dfd
commit 380341d35c

@ -560,8 +560,10 @@ class RepositoryConfig(BaseSettings):
CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY: str = Field( CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY: str = Field(
description="Repository implementation for WorkflowNodeExecution. Options: " description="Repository implementation for WorkflowNodeExecution. Options: "
"'core.repositories.sqlalchemy_workflow_node_execution_repository.SQLAlchemyWorkflowNodeExecutionRepository' (default), " "'core.repositories.sqlalchemy_workflow_node_execution_repository."
"'core.repositories.celery_workflow_node_execution_repository.CeleryWorkflowNodeExecutionRepository'", "SQLAlchemyWorkflowNodeExecutionRepository' (default), "
"'core.repositories.celery_workflow_node_execution_repository."
"CeleryWorkflowNodeExecutionRepository'",
default="core.repositories.sqlalchemy_workflow_node_execution_repository.SQLAlchemyWorkflowNodeExecutionRepository", default="core.repositories.sqlalchemy_workflow_node_execution_repository.SQLAlchemyWorkflowNodeExecutionRepository",
) )

@ -8,7 +8,7 @@ providing improved performance by offloading database operations to background w
import logging import logging
from typing import Optional, Union from typing import Optional, Union
from celery.result import AsyncResult from celery.result import AsyncResult # type: ignore[import-untyped]
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
@ -40,6 +40,15 @@ class CeleryWorkflowExecutionRepository(WorkflowExecutionRepository):
- Configurable timeouts for async operations - Configurable timeouts for async operations
""" """
_session_factory: sessionmaker
_tenant_id: str
_app_id: Optional[str]
_triggered_from: Optional[WorkflowRunTriggeredFrom]
_creator_user_id: str
_creator_user_role: CreatorUserRole
_async_timeout: int
_pending_saves: dict[str, AsyncResult]
def __init__( def __init__(
self, self,
session_factory: sessionmaker | Engine, session_factory: sessionmaker | Engine,
@ -72,7 +81,7 @@ class CeleryWorkflowExecutionRepository(WorkflowExecutionRepository):
tenant_id = extract_tenant_id(user) tenant_id = extract_tenant_id(user)
if not tenant_id: if not tenant_id:
raise ValueError("User must have a tenant_id or current_tenant_id") raise ValueError("User must have a tenant_id or current_tenant_id")
self._tenant_id = tenant_id self._tenant_id = tenant_id # type: ignore[assignment] # We've already checked tenant_id is not None
# Store app context # Store app context
self._app_id = app_id self._app_id = app_id

@ -9,7 +9,7 @@ import logging
from collections.abc import Sequence from collections.abc import Sequence
from typing import Optional, Union from typing import Optional, Union
from celery.result import AsyncResult from celery.result import AsyncResult # type: ignore[import-untyped]
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
@ -46,6 +46,16 @@ class CeleryWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
- Batch operations for improved efficiency - Batch operations for improved efficiency
""" """
_session_factory: sessionmaker
_tenant_id: str
_app_id: Optional[str]
_triggered_from: Optional[WorkflowNodeExecutionTriggeredFrom]
_creator_user_id: str
_creator_user_role: CreatorUserRole
_async_timeout: int
_pending_saves: dict[str, AsyncResult]
_workflow_execution_mapping: dict[str, str]
def __init__( def __init__(
self, self,
session_factory: sessionmaker | Engine, session_factory: sessionmaker | Engine,
@ -78,7 +88,7 @@ class CeleryWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
tenant_id = extract_tenant_id(user) tenant_id = extract_tenant_id(user)
if not tenant_id: if not tenant_id:
raise ValueError("User must have a tenant_id or current_tenant_id") raise ValueError("User must have a tenant_id or current_tenant_id")
self._tenant_id = tenant_id self._tenant_id = tenant_id # type: ignore[assignment] # We've already checked tenant_id is not None
# Store app context # Store app context
self._app_id = app_id self._app_id = app_id
@ -132,7 +142,8 @@ class CeleryWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
self._pending_saves[execution.id] = task_result self._pending_saves[execution.id] = task_result
# Cache the workflow_execution_id mapping for efficient workflow-specific waiting # Cache the workflow_execution_id mapping for efficient workflow-specific waiting
self._workflow_execution_mapping[execution.id] = execution.workflow_execution_id if execution.workflow_execution_id:
self._workflow_execution_mapping[execution.id] = execution.workflow_execution_id
logger.debug(f"Queued async save for workflow node execution: {execution.id}") logger.debug(f"Queued async save for workflow node execution: {execution.id}")

@ -8,7 +8,7 @@ improving performance by offloading storage operations to background workers.
import json import json
import logging import logging
from celery import shared_task from celery import shared_task # type: ignore[import-untyped]
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker

@ -9,7 +9,7 @@ import json
import logging import logging
from typing import Optional from typing import Optional
from celery import shared_task from celery import shared_task # type: ignore[import-untyped]
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker

Loading…
Cancel
Save