From 8c945b7a088368b1129e3cfc9ca84f4d1389306e Mon Sep 17 00:00:00 2001 From: -LAN- Date: Fri, 27 Jun 2025 12:54:17 +0800 Subject: [PATCH] feat: Create a APIWorkflowRunRepository Signed-off-by: -LAN- --- api/configs/feature/__init__.py | 5 +++ .../advanced_chat/generate_task_pipeline.py | 1 + .../common/workflow_response_converter.py | 3 ++ .../apps/workflow/generate_task_pipeline.py | 1 + api/core/memory/token_buffer_memory.py | 2 +- .../api_workflow_run_repository.py | 3 +- api/repositories/factory.py | 37 ++++++++++++++++ .../sqlalchemy_api_workflow_run_repository.py | 43 +++++++++++++------ .../clear_free_plan_tenant_expired_logs.py | 42 ------------------ api/services/workflow_run_service.py | 1 + api/tasks/remove_app_and_related_data_task.py | 4 +- 11 files changed, 81 insertions(+), 61 deletions(-) diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 38ba3d8994..f1d529355d 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -567,6 +567,11 @@ class RepositoryConfig(BaseSettings): default="repositories.sqlalchemy_api_workflow_node_execution_repository.DifyAPISQLAlchemyWorkflowNodeExecutionRepository", ) + API_WORKFLOW_RUN_REPOSITORY: str = Field( + description="Service-layer repository implementation for WorkflowRun operations. Specify as a module path", + default="repositories.sqlalchemy_api_workflow_run_repository.DifyAPISQLAlchemyWorkflowRunRepository", + ) + class AuthConfig(BaseSettings): """ diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 1dc9796d5b..a8d3027d14 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -138,6 +138,7 @@ class AdvancedChatAppGenerateTaskPipeline: self._workflow_response_converter = WorkflowResponseConverter( application_generate_entity=application_generate_entity, + workflow_execution_repository=workflow_execution_repository, ) self._task_state = WorkflowTaskState() diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index 34a1da2227..e027b4e470 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -49,6 +49,7 @@ from core.workflow.entities.workflow_execution import WorkflowExecution from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus from core.workflow.nodes import NodeType from core.workflow.nodes.tool.entities import ToolNodeData +from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter from models import ( Account, @@ -63,8 +64,10 @@ class WorkflowResponseConverter: self, *, application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity], + workflow_execution_repository: WorkflowExecutionRepository, ) -> None: self._application_generate_entity = application_generate_entity + self._workflow_execution_repository = workflow_execution_repository def workflow_start_to_stream_response( self, diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 7adc03e9c3..a1d2a3f3e3 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -126,6 +126,7 @@ class WorkflowAppGenerateTaskPipeline: self._workflow_response_converter = WorkflowResponseConverter( application_generate_entity=application_generate_entity, + workflow_execution_repository=workflow_execution_repository, ) self._application_generate_entity = application_generate_entity diff --git a/api/core/memory/token_buffer_memory.py b/api/core/memory/token_buffer_memory.py index a9f0a92e5d..04f6779764 100644 --- a/api/core/memory/token_buffer_memory.py +++ b/api/core/memory/token_buffer_memory.py @@ -83,7 +83,7 @@ class TokenBufferMemory: raise ValueError(f"Workflow not found: {workflow_run.workflow_id}") file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False) else: - raise AssertionError(f"Invalid app mode: {self.conversation.mode}") + raise ValueError(f"Invalid app mode: {self.conversation.mode}") detail = ImagePromptMessageContent.DETAIL.LOW if file_extra_config and app_record: diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index 59e7baeb79..c91e6e0904 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -38,12 +38,11 @@ from collections.abc import Sequence from datetime import datetime from typing import Optional, Protocol -from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from libs.infinite_scroll_pagination import InfiniteScrollPagination from models.workflow import WorkflowRun -class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): +class APIWorkflowRunRepository(Protocol): """ Protocol for service-layer WorkflowRun repository operations. diff --git a/api/repositories/factory.py b/api/repositories/factory.py index fe0bf17441..0a0adbf2c2 100644 --- a/api/repositories/factory.py +++ b/api/repositories/factory.py @@ -12,6 +12,7 @@ from sqlalchemy.orm import sessionmaker from configs import dify_config from core.repositories import DifyCoreRepositoryFactory, RepositoryImportError from repositories.api_workflow_node_execution_repository import DifyAPIWorkflowNodeExecutionRepository +from repositories.api_workflow_run_repository import APIWorkflowRunRepository logger = logging.getLogger(__name__) @@ -64,3 +65,39 @@ class DifyAPIRepositoryFactory(DifyCoreRepositoryFactory): raise RepositoryImportError( f"Failed to create DifyAPIWorkflowNodeExecutionRepository from '{class_path}': {e}" ) from e + + @classmethod + def create_api_workflow_run_repository(cls, session_maker: sessionmaker) -> APIWorkflowRunRepository: + """ + Create an APIWorkflowRunRepository instance based on configuration. + + This repository is designed for service-layer WorkflowRun operations and uses dependency + injection with a sessionmaker for better testability and separation of concerns. It provides + database access patterns specifically needed by service classes for workflow run management, + including pagination, filtering, and bulk operations. + + Args: + session_maker: SQLAlchemy sessionmaker to inject for database session management. + + Returns: + Configured APIWorkflowRunRepository instance + + Raises: + RepositoryImportError: If the configured repository cannot be imported or instantiated + """ + class_path = dify_config.API_WORKFLOW_RUN_REPOSITORY + logger.debug(f"Creating APIWorkflowRunRepository from: {class_path}") + + try: + repository_class = cls._import_class(class_path) + cls._validate_repository_interface(repository_class, APIWorkflowRunRepository) + # Service repository requires session_maker parameter + cls._validate_constructor_signature(repository_class, ["session_maker"]) + + return repository_class(session_maker=session_maker) # type: ignore[no-any-return] + except RepositoryImportError: + # Re-raise our custom errors as-is + raise + except Exception as e: + logger.exception("Failed to create APIWorkflowRunRepository") + raise RepositoryImportError(f"Failed to create APIWorkflowRunRepository from '{class_path}': {e}") from e diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index ebd1d74b20..e61ae57594 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -25,16 +25,15 @@ from datetime import datetime from typing import Optional, cast from sqlalchemy import delete, select -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.orm import sessionmaker from libs.infinite_scroll_pagination import InfiniteScrollPagination from models.workflow import WorkflowRun -from repositories.api_workflow_run_repository import APIWorkflowRunRepository logger = logging.getLogger(__name__) -class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): +class DifyAPISQLAlchemyWorkflowRunRepository: """ SQLAlchemy implementation of APIWorkflowRunRepository. @@ -46,7 +45,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): session_maker: SQLAlchemy sessionmaker instance for database connections """ - def __init__(self, session_maker: sessionmaker[Session]) -> None: + def __init__(self, session_maker: sessionmaker) -> None: """ Initialize the repository with a sessionmaker. @@ -87,18 +86,34 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): raise ValueError("Last workflow run not exists") # Get records created before the last run's timestamp - base_stmt = base_stmt.where( - WorkflowRun.created_at < last_workflow_run.created_at, - WorkflowRun.id != last_workflow_run.id, - ) - - # First page - get most recent records - workflow_runs = session.scalars(base_stmt.order_by(WorkflowRun.created_at.desc()).limit(limit + 1)).all() + workflow_runs = session.scalars( + base_stmt.where( + WorkflowRun.created_at < last_workflow_run.created_at, + WorkflowRun.id != last_workflow_run.id, + ) + .order_by(WorkflowRun.created_at.desc()) + .limit(limit) + ).all() + else: + # First page - get most recent records + workflow_runs = session.scalars(base_stmt.order_by(WorkflowRun.created_at.desc()).limit(limit)).all() # Check if there are more records for pagination - has_more = len(workflow_runs) > limit - if has_more: - workflow_runs = workflow_runs[:-1] + has_more = False + if len(workflow_runs) == limit: + current_page_last_run = workflow_runs[-1] + remaining_count = session.scalar( + select(WorkflowRun.id) + .where( + WorkflowRun.tenant_id == tenant_id, + WorkflowRun.app_id == app_id, + WorkflowRun.triggered_from == triggered_from, + WorkflowRun.created_at < current_page_last_run.created_at, + WorkflowRun.id != current_page_last_run.id, + ) + .limit(1) + ) + has_more = remaining_count is not None return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more) diff --git a/api/services/clear_free_plan_tenant_expired_logs.py b/api/services/clear_free_plan_tenant_expired_logs.py index cb088304c6..ddd16b2e0c 100644 --- a/api/services/clear_free_plan_tenant_expired_logs.py +++ b/api/services/clear_free_plan_tenant_expired_logs.py @@ -14,7 +14,6 @@ from extensions.ext_database import db from extensions.ext_storage import storage from models.account import Tenant from models.model import App, Conversation, Message -from models.workflow import WorkflowRun from repositories.factory import DifyAPIRepositoryFactory from services.billing_service import BillingService @@ -112,47 +111,6 @@ class ClearFreePlanTenantExpiredLogs: before_date = datetime.datetime.now() - datetime.timedelta(days=days) total_deleted = 0 - while True: - # Get a batch of expired executions for backup - workflow_node_executions = node_execution_repo.get_expired_executions_batch( - tenant_id=tenant_id, - before_date=before_date, - batch_size=batch, - ) - - if len(workflow_node_executions) == 0: - break - - # Save workflow node executions to storage - storage.save( - f"free_plan_tenant_expired_logs/" - f"{tenant_id}/workflow_node_executions/{datetime.datetime.now().strftime('%Y-%m-%d')}" - f"-{time.time()}.json", - json.dumps( - jsonable_encoder(workflow_node_executions), - ).encode("utf-8"), - ) - - # Extract IDs for deletion - workflow_node_execution_ids = [ - workflow_node_execution.id for workflow_node_execution in workflow_node_executions - ] - - # Delete the backed up executions - deleted_count = node_execution_repo.delete_executions_by_ids(workflow_node_execution_ids) - total_deleted += deleted_count - - click.echo( - click.style( - f"[{datetime.datetime.now()}] Processed {len(workflow_node_execution_ids)}" - f" workflow node executions for tenant {tenant_id}" - ) - ) - - # If we got fewer than the batch size, we're done - if len(workflow_node_executions) < batch: - break - while True: # Get a batch of expired executions for backup workflow_node_executions = node_execution_repo.get_expired_executions_batch( diff --git a/api/services/workflow_run_service.py b/api/services/workflow_run_service.py index 3ec3dc193a..e43999a8c9 100644 --- a/api/services/workflow_run_service.py +++ b/api/services/workflow_run_service.py @@ -25,6 +25,7 @@ class WorkflowRunService: self._node_execution_service_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository( session_maker ) + self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) def get_paginate_advanced_chat_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination: """ diff --git a/api/tasks/remove_app_and_related_data_task.py b/api/tasks/remove_app_and_related_data_task.py index 7f8c6c26a0..9545afbde6 100644 --- a/api/tasks/remove_app_and_related_data_task.py +++ b/api/tasks/remove_app_and_related_data_task.py @@ -32,7 +32,7 @@ from models import ( ) from models.tools import WorkflowToolProvider from models.web import PinnedConversation, SavedMessage -from models.workflow import ConversationVariable, Workflow, WorkflowAppLog, WorkflowRun +from models.workflow import ConversationVariable, Workflow, WorkflowAppLog from repositories.factory import DifyAPIRepositoryFactory @@ -192,7 +192,7 @@ def _delete_app_workflows(tenant_id: str, app_id: str): def _delete_app_workflow_runs(tenant_id: str, app_id: str): """Delete all workflow runs for an app using the service repository.""" - session_maker = sessionmaker(bind=db.engine) + session_maker = sessionmaker(bind=db.engine, expire_on_commit=False) workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) deleted_count = workflow_run_repo.delete_runs_by_app(