From cf698ddcccb5e778a416d8f743bc94dcca5b801b Mon Sep 17 00:00:00 2001 From: -LAN- Date: Fri, 30 May 2025 03:37:46 +0800 Subject: [PATCH] refactor(workflow): Rename entities files Signed-off-by: -LAN- --- .../console/app/workflow_app_log.py | 2 +- api/controllers/service_api/app/workflow.py | 2 +- .../advanced_chat/generate_task_pipeline.py | 21 +++--- .../common/workflow_response_converter.py | 14 ++-- .../apps/workflow/generate_task_pipeline.py | 32 ++++------ api/core/ops/ops_trace_manager.py | 4 +- ...qlalchemy_workflow_execution_repository.py | 12 ++-- ...hemy_workflow_node_execution_repository.py | 2 +- ...tion_entities.py => workflow_execution.py} | 12 ++-- ...entities.py => workflow_node_execution.py} | 0 .../workflow_execution_repository.py | 2 +- .../workflow_node_execution_repository.py | 2 +- api/core/workflow/workflow_cycle_manager.py | 41 +++++------- api/models/model.py | 2 +- api/services/workflow_app_service.py | 2 +- api/services/workflow_service.py | 2 +- .../workflow/test_workflow_cycle_manager.py | 64 ++++++++----------- .../test_sqlalchemy_repository.py | 2 +- 18 files changed, 97 insertions(+), 121 deletions(-) rename api/core/workflow/entities/{workflow_execution_entities.py => workflow_execution.py} (92%) rename api/core/workflow/entities/{node_execution_entities.py => workflow_node_execution.py} (100%) diff --git a/api/controllers/console/app/workflow_app_log.py b/api/controllers/console/app/workflow_app_log.py index 96c6c2623e..b9579e2120 100644 --- a/api/controllers/console/app/workflow_app_log.py +++ b/api/controllers/console/app/workflow_app_log.py @@ -6,7 +6,7 @@ from sqlalchemy.orm import Session from controllers.console import api from controllers.console.app.wraps import get_app_model from controllers.console.wraps import account_initialization_required, setup_required -from core.workflow.entities.workflow_execution_entities import WorkflowExecutionStatus +from core.workflow.entities.workflow_execution import WorkflowExecutionStatus from extensions.ext_database import db from fields.workflow_app_log_fields import workflow_app_log_pagination_fields from libs.login import login_required diff --git a/api/controllers/service_api/app/workflow.py b/api/controllers/service_api/app/workflow.py index 1fdff68327..df52b49424 100644 --- a/api/controllers/service_api/app/workflow.py +++ b/api/controllers/service_api/app/workflow.py @@ -24,7 +24,7 @@ from core.errors.error import ( QuotaExceededError, ) from core.model_runtime.errors.invoke import InvokeError -from core.workflow.entities.workflow_execution_entities import WorkflowExecutionStatus +from core.workflow.entities.workflow_execution import WorkflowExecutionStatus from extensions.ext_database import db from fields.workflow_app_log_fields import workflow_app_log_pagination_fields from libs import helper diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 1a789918b8..827ca452d5 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -62,13 +62,13 @@ from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.utils.encoders import jsonable_encoder from core.ops.ops_trace_manager import TraceQueueManager -from core.workflow.entities.workflow_execution_entities import WorkflowExecutionStatus, WorkflowType +from core.workflow.entities.workflow_execution import WorkflowExecutionStatus, WorkflowType from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes import NodeType from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository -from core.workflow.workflow_cycle_manager import TempWorkflowEntity, WorkflowCycleManager +from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager from events.message_event import message_was_created from extensions.ext_database import db from models import Conversation, EndUser, Message, MessageFile @@ -126,11 +126,11 @@ class AdvancedChatAppGenerateTaskPipeline: SystemVariableKey.WORKFLOW_ID: workflow.id, SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id, }, - workflow_entity=TempWorkflowEntity( - id_=workflow.id, - type_=WorkflowType(workflow.type), + workflow_info=CycleManagerWorkflowInfo( + workflow_id=workflow.id, + workflow_type=WorkflowType(workflow.type), version=workflow.version, - graph=workflow.graph_dict, + graph_data=workflow.graph_dict, ), workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, @@ -306,15 +306,12 @@ class AdvancedChatAppGenerateTaskPipeline: with Session(db.engine, expire_on_commit=False) as session: # init workflow run - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start( - session=session, - workflow_id=self._workflow_id, - ) - self._workflow_run_id = workflow_execution.id + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start() + self._workflow_run_id = workflow_execution.id_ message = self._get_message(session=session) if not message: raise ValueError(f"Message not found: {self._message_id}") - message.workflow_run_id = workflow_execution.id + message.workflow_run_id = workflow_execution.id_ workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index 82057a045f..9e12df46a5 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -44,8 +44,8 @@ from core.app.entities.task_entities import ( ) from core.file import FILE_MODEL_IDENTITY, File from core.tools.tool_manager import ToolManager -from core.workflow.entities.node_execution_entities import NodeExecution -from core.workflow.entities.workflow_execution_entities import WorkflowExecution +from core.workflow.entities.workflow_execution import WorkflowExecution +from core.workflow.entities.workflow_node_execution import NodeExecution from core.workflow.nodes import NodeType from core.workflow.nodes.tool.entities import ToolNodeData from models import ( @@ -73,9 +73,9 @@ class WorkflowResponseConverter: ) -> WorkflowStartStreamResponse: return WorkflowStartStreamResponse( task_id=task_id, - workflow_run_id=workflow_execution.id, + workflow_run_id=workflow_execution.id_, data=WorkflowStartStreamResponse.Data( - id=workflow_execution.id, + id=workflow_execution.id_, workflow_id=workflow_execution.workflow_id, inputs=workflow_execution.inputs, created_at=int(workflow_execution.started_at.timestamp()), @@ -90,7 +90,7 @@ class WorkflowResponseConverter: workflow_execution: WorkflowExecution, ) -> WorkflowFinishStreamResponse: created_by = None - workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id)) + workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id_)) assert workflow_run is not None if workflow_run.created_by_role == CreatorUserRole.ACCOUNT: stmt = select(Account).where(Account.id == workflow_run.created_by) @@ -121,9 +121,9 @@ class WorkflowResponseConverter: return WorkflowFinishStreamResponse( task_id=task_id, - workflow_run_id=workflow_execution.id, + workflow_run_id=workflow_execution.id_, data=WorkflowFinishStreamResponse.Data( - id=workflow_execution.id, + id=workflow_execution.id_, workflow_id=workflow_execution.workflow_id, status=workflow_execution.status, outputs=workflow_execution.outputs, diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 04b698f35d..ef9302425b 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -55,11 +55,11 @@ from core.app.entities.task_entities import ( from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.ops.ops_trace_manager import TraceQueueManager -from core.workflow.entities.workflow_execution_entities import WorkflowExecution, WorkflowExecutionStatus, WorkflowType +from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.enums import SystemVariableKey from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository -from core.workflow.workflow_cycle_manager import TempWorkflowEntity, WorkflowCycleManager +from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager from extensions.ext_database import db from models.account import Account from models.enums import CreatorUserRole @@ -115,11 +115,11 @@ class WorkflowAppGenerateTaskPipeline: SystemVariableKey.WORKFLOW_ID: workflow.id, SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_execution_id, }, - workflow_entity=TempWorkflowEntity( - id_=workflow.id, - type_=WorkflowType(workflow.type), + workflow_info=CycleManagerWorkflowInfo( + workflow_id=workflow.id, + workflow_type=WorkflowType(workflow.type), version=workflow.version, - graph=workflow.graph_dict, + graph_data=workflow.graph_dict, ), workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, @@ -271,17 +271,13 @@ class WorkflowAppGenerateTaskPipeline: # override graph runtime state graph_runtime_state = event.graph_runtime_state - with Session(db.engine, expire_on_commit=False) as session: - # init workflow run - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start( - session=session, - workflow_id=self._workflow_id, - ) - self._workflow_run_id = workflow_execution.id - start_resp = self._workflow_response_converter.workflow_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) + # init workflow run + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start() + self._workflow_run_id = workflow_execution.id_ + start_resp = self._workflow_response_converter.workflow_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) yield start_resp elif isinstance( @@ -562,7 +558,7 @@ class WorkflowAppGenerateTaskPipeline: tts_publisher.publish(None) def _save_workflow_app_log(self, *, session: Session, workflow_execution: WorkflowExecution) -> None: - workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id)) + workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id_)) assert workflow_run is not None invoke_from = self._application_generate_entity.invoke_from if invoke_from == InvokeFrom.SERVICE_API: diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index 32301e11e7..addf164e6f 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -30,7 +30,7 @@ from core.ops.entities.trace_entity import ( WorkflowTraceInfo, ) from core.ops.utils import get_message_data -from core.workflow.entities.workflow_execution_entities import WorkflowExecution +from core.workflow.entities.workflow_execution import WorkflowExecution from extensions.ext_database import db from extensions.ext_storage import storage from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig @@ -386,7 +386,7 @@ class TraceTask: ): self.trace_type = trace_type self.message_id = message_id - self.workflow_run_id = workflow_execution.id if workflow_execution else None + self.workflow_run_id = workflow_execution.id_ if workflow_execution else None self.conversation_id = conversation_id self.user_id = user_id self.timer = timer diff --git a/api/core/repositories/sqlalchemy_workflow_execution_repository.py b/api/core/repositories/sqlalchemy_workflow_execution_repository.py index 4ea71696de..c596445769 100644 --- a/api/core/repositories/sqlalchemy_workflow_execution_repository.py +++ b/api/core/repositories/sqlalchemy_workflow_execution_repository.py @@ -10,7 +10,7 @@ from sqlalchemy import select from sqlalchemy.engine import Engine from sqlalchemy.orm import sessionmaker -from core.workflow.entities.workflow_execution_entities import ( +from core.workflow.entities.workflow_execution import ( WorkflowExecution, WorkflowExecutionStatus, WorkflowType, @@ -104,9 +104,9 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository): status = WorkflowExecutionStatus(db_model.status) return WorkflowExecution( - id=db_model.id, + id_=db_model.id, workflow_id=db_model.workflow_id, - type=WorkflowType(db_model.type), + workflow_type=WorkflowType(db_model.type), workflow_version=db_model.version, graph=graph, inputs=inputs, @@ -139,7 +139,7 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository): raise ValueError("created_by_role is required in repository constructor") db_model = WorkflowRun() - db_model.id = domain_model.id + db_model.id = domain_model.id_ db_model.tenant_id = self._tenant_id if self._app_id is not None: db_model.app_id = self._app_id @@ -148,7 +148,7 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository): # Check if this is a new record with self._session_factory() as session: - existing = session.scalar(select(WorkflowRun).where(WorkflowRun.id == domain_model.id)) + existing = session.scalar(select(WorkflowRun).where(WorkflowRun.id == domain_model.id_)) if not existing: # For new records, get the next sequence number stmt = select(WorkflowRun.sequence_number).where( @@ -161,7 +161,7 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository): # For updates, keep the existing sequence number db_model.sequence_number = existing.sequence_number - db_model.type = domain_model.type + db_model.type = domain_model.workflow_type db_model.version = domain_model.workflow_version db_model.graph = json.dumps(domain_model.graph) if domain_model.graph else None db_model.inputs = json.dumps(domain_model.inputs) if domain_model.inputs else None diff --git a/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py b/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py index 8d916a19db..30f2437cf4 100644 --- a/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py +++ b/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py @@ -13,7 +13,7 @@ from sqlalchemy.orm import sessionmaker from core.model_runtime.utils.encoders import jsonable_encoder from core.workflow.entities.node_entities import NodeRunMetadataKey -from core.workflow.entities.node_execution_entities import ( +from core.workflow.entities.workflow_node_execution import ( NodeExecution, NodeExecutionStatus, ) diff --git a/api/core/workflow/entities/workflow_execution_entities.py b/api/core/workflow/entities/workflow_execution.py similarity index 92% rename from api/core/workflow/entities/workflow_execution_entities.py rename to api/core/workflow/entities/workflow_execution.py index 80c3f65205..781be4b3c6 100644 --- a/api/core/workflow/entities/workflow_execution_entities.py +++ b/api/core/workflow/entities/workflow_execution.py @@ -36,10 +36,10 @@ class WorkflowExecution(BaseModel): user, tenant, and app attributes. """ - id: str = Field(...) + id_: str = Field(...) workflow_id: str = Field(...) workflow_version: str = Field(...) - type: WorkflowType = Field(...) + workflow_type: WorkflowType = Field(...) graph: Mapping[str, Any] = Field(...) inputs: Mapping[str, Any] = Field(...) @@ -67,18 +67,18 @@ class WorkflowExecution(BaseModel): def new( cls, *, - id: str, + id_: str, workflow_id: str, - type: WorkflowType, + workflow_type: WorkflowType, workflow_version: str, graph: Mapping[str, Any], inputs: Mapping[str, Any], started_at: datetime, ) -> "WorkflowExecution": return WorkflowExecution( - id=id, + id_=id_, workflow_id=workflow_id, - type=type, + workflow_type=workflow_type, workflow_version=workflow_version, graph=graph, inputs=inputs, diff --git a/api/core/workflow/entities/node_execution_entities.py b/api/core/workflow/entities/workflow_node_execution.py similarity index 100% rename from api/core/workflow/entities/node_execution_entities.py rename to api/core/workflow/entities/workflow_node_execution.py diff --git a/api/core/workflow/repository/workflow_execution_repository.py b/api/core/workflow/repository/workflow_execution_repository.py index a39a98ee33..5917310c8b 100644 --- a/api/core/workflow/repository/workflow_execution_repository.py +++ b/api/core/workflow/repository/workflow_execution_repository.py @@ -1,6 +1,6 @@ from typing import Optional, Protocol -from core.workflow.entities.workflow_execution_entities import WorkflowExecution +from core.workflow.entities.workflow_execution import WorkflowExecution class WorkflowExecutionRepository(Protocol): diff --git a/api/core/workflow/repository/workflow_node_execution_repository.py b/api/core/workflow/repository/workflow_node_execution_repository.py index 3ca9e2ecab..8c83b5ea6c 100644 --- a/api/core/workflow/repository/workflow_node_execution_repository.py +++ b/api/core/workflow/repository/workflow_node_execution_repository.py @@ -2,7 +2,7 @@ from collections.abc import Sequence from dataclasses import dataclass from typing import Literal, Optional, Protocol -from core.workflow.entities.node_execution_entities import NodeExecution +from core.workflow.entities.workflow_node_execution import NodeExecution @dataclass diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 5e9db11a30..18d91e585c 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -4,8 +4,6 @@ from datetime import UTC, datetime from typing import Any, Optional, Union from uuid import uuid4 -from sqlalchemy.orm import Session - from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity from core.app.entities.queue_entities import ( QueueNodeExceptionEvent, @@ -20,11 +18,11 @@ from core.app.task_pipeline.exc import WorkflowRunNotFoundError from core.ops.entities.trace_entity import TraceTaskName from core.ops.ops_trace_manager import TraceQueueManager, TraceTask from core.workflow.entities.node_entities import NodeRunMetadataKey -from core.workflow.entities.node_execution_entities import ( +from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType +from core.workflow.entities.workflow_node_execution import ( NodeExecution, NodeExecutionStatus, ) -from core.workflow.entities.workflow_execution_entities import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.enums import SystemVariableKey from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository @@ -32,11 +30,11 @@ from core.workflow.workflow_entry import WorkflowEntry @dataclass -class TempWorkflowEntity: - id_: str - type_: WorkflowType +class CycleManagerWorkflowInfo: + workflow_id: str + workflow_type: WorkflowType version: str - graph: Mapping[str, Any] + graph_data: Mapping[str, Any] class WorkflowCycleManager: @@ -45,22 +43,17 @@ class WorkflowCycleManager: *, application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity], workflow_system_variables: dict[SystemVariableKey, Any], - workflow_entity: TempWorkflowEntity, + workflow_info: CycleManagerWorkflowInfo, workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, ) -> None: self._application_generate_entity = application_generate_entity self._workflow_system_variables = workflow_system_variables + self._workflow_info = workflow_info self._workflow_execution_repository = workflow_execution_repository self._workflow_node_execution_repository = workflow_node_execution_repository - self._temp_workflow_entity = workflow_entity - def handle_workflow_run_start( - self, - *, - session: Session, - workflow_id: str, - ) -> WorkflowExecution: + def handle_workflow_run_start(self) -> WorkflowExecution: inputs = {**self._application_generate_entity.inputs} for key, value in (self._workflow_system_variables or {}).items(): if key.value == "conversation": @@ -74,11 +67,11 @@ class WorkflowCycleManager: # TODO: This workflow_run_id should always not be None, maybe we can use a more elegant way to handle this execution_id = str(self._workflow_system_variables.get(SystemVariableKey.WORKFLOW_RUN_ID) or uuid4()) execution = WorkflowExecution.new( - id=execution_id, - workflow_id=self._temp_workflow_entity.id_, - type=self._temp_workflow_entity.type_, - workflow_version=self._temp_workflow_entity.version, - graph=self._temp_workflow_entity.graph, + id_=execution_id, + workflow_id=self._workflow_info.workflow_id, + workflow_type=self._workflow_info.workflow_type, + workflow_version=self._workflow_info.version, + graph=self._workflow_info.graph_data, inputs=inputs, started_at=datetime.now(UTC).replace(tzinfo=None), ) @@ -177,7 +170,7 @@ class WorkflowCycleManager: # Use the instance repository to find running executions for a workflow run running_node_executions = self._workflow_node_execution_repository.get_running_executions( - workflow_run_id=workflow_execution.id + workflow_run_id=workflow_execution.id_ ) # Update the domain models @@ -225,7 +218,7 @@ class WorkflowCycleManager: domain_execution = NodeExecution( id=str(uuid4()), workflow_id=workflow_execution.workflow_id, - workflow_run_id=workflow_execution.id, + workflow_run_id=workflow_execution.id_, predecessor_node_id=event.predecessor_node_id, index=event.node_run_index, node_execution_id=event.node_execution_id, @@ -354,7 +347,7 @@ class WorkflowCycleManager: domain_execution = NodeExecution( id=str(uuid4()), workflow_id=workflow_execution.workflow_id, - workflow_run_id=workflow_execution.id, + workflow_run_id=workflow_execution.id_, predecessor_node_id=event.predecessor_node_id, node_execution_id=event.node_execution_id, node_id=event.node_id, diff --git a/api/models/model.py b/api/models/model.py index aa1464ff10..229e77134e 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, Any, Literal, Optional, cast from core.plugin.entities.plugin import GenericProviderID from core.tools.entities.tool_entities import ToolProviderType from core.tools.signature import sign_tool_file -from core.workflow.entities.workflow_execution_entities import WorkflowExecutionStatus +from core.workflow.entities.workflow_execution import WorkflowExecutionStatus from services.plugin.plugin_service import PluginService if TYPE_CHECKING: diff --git a/api/services/workflow_app_service.py b/api/services/workflow_app_service.py index fe9282d312..6b30a70372 100644 --- a/api/services/workflow_app_service.py +++ b/api/services/workflow_app_service.py @@ -4,7 +4,7 @@ from datetime import datetime from sqlalchemy import and_, func, or_, select from sqlalchemy.orm import Session -from core.workflow.entities.workflow_execution_entities import WorkflowExecutionStatus +from core.workflow.entities.workflow_execution import WorkflowExecutionStatus from models import App, EndUser, WorkflowAppLog, WorkflowRun from models.enums import CreatorUserRole diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 50bb8f40ae..e15f05a562 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -13,7 +13,7 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.variables import Variable from core.workflow.entities.node_entities import NodeRunResult -from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus +from core.workflow.entities.workflow_node_execution import NodeExecution, NodeExecutionStatus from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.graph_engine.entities.event import InNodeEvent from core.workflow.nodes import NodeType diff --git a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py index 0d12406c49..54f58c43eb 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py @@ -13,20 +13,16 @@ from core.app.entities.queue_entities import ( QueueNodeSucceededEvent, ) from core.workflow.entities.node_entities import NodeRunMetadataKey -from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus -from core.workflow.entities.workflow_execution_entities import WorkflowExecution, WorkflowExecutionStatus, WorkflowType +from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType +from core.workflow.entities.workflow_node_execution import NodeExecution, NodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.nodes import NodeType from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository -from core.workflow.workflow_cycle_manager import TempWorkflowEntity, WorkflowCycleManager +from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager from models.enums import CreatorUserRole from models.model import AppMode -from models.workflow import ( - Workflow, - WorkflowRun, - WorkflowRunStatus, -) +from models.workflow import Workflow, WorkflowRun @pytest.fixture @@ -95,11 +91,11 @@ def mock_workflow_execution_repository(): @pytest.fixture def real_workflow_entity(): - return TempWorkflowEntity( - id_="test-workflow-id", # Matches ID used in other fixtures - type_=WorkflowType.CHAT, + return CycleManagerWorkflowInfo( + workflow_id="test-workflow-id", # Matches ID used in other fixtures + workflow_type=WorkflowType.CHAT, version="1.0.0", - graph={ + graph_data={ "nodes": [ { "id": "node1", @@ -124,7 +120,7 @@ def workflow_cycle_manager( return WorkflowCycleManager( application_generate_entity=real_app_generate_entity, workflow_system_variables=real_workflow_system_variables, - workflow_entity=real_workflow_entity, + workflow_info=real_workflow_entity, workflow_execution_repository=mock_workflow_execution_repository, workflow_node_execution_repository=mock_node_execution_repository, ) @@ -170,7 +166,7 @@ def real_workflow_run(): workflow_run.version = "1.0" workflow_run.graph = json.dumps({"nodes": [], "edges": []}) workflow_run.inputs = json.dumps({"query": "test query"}) - workflow_run.status = WorkflowRunStatus.RUNNING + workflow_run.status = WorkflowExecutionStatus.RUNNING workflow_run.outputs = json.dumps({"answer": "test answer"}) workflow_run.created_by_role = CreatorUserRole.ACCOUNT workflow_run.created_by = "test-user-id" @@ -193,19 +189,13 @@ def test_init( assert workflow_cycle_manager._workflow_node_execution_repository == mock_node_execution_repository -def test_handle_workflow_run_start(workflow_cycle_manager, mock_session, real_workflow): +def test_handle_workflow_run_start(workflow_cycle_manager): """Test handle_workflow_run_start method""" - # Mock session.scalar to return the workflow and max sequence - mock_session.scalar.side_effect = [real_workflow, 5] - # Call the method - workflow_execution = workflow_cycle_manager.handle_workflow_run_start( - session=mock_session, - workflow_id="test-workflow-id", - ) + workflow_execution = workflow_cycle_manager.handle_workflow_run_start() # Verify the result - assert workflow_execution.workflow_id == real_workflow.id + assert workflow_execution.workflow_id == "test-workflow-id" # Verify the workflow_execution_repository.save was called workflow_cycle_manager._workflow_execution_repository.save.assert_called_once_with(workflow_execution) @@ -216,10 +206,10 @@ def test_handle_workflow_run_success(workflow_cycle_manager, mock_workflow_execu # Create a real WorkflowExecution workflow_execution = WorkflowExecution( - id="test-workflow-run-id", + id_="test-workflow-run-id", workflow_id="test-workflow-id", workflow_version="1.0", - type=WorkflowType.CHAT, + workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, started_at=datetime.now(UTC).replace(tzinfo=None), @@ -250,10 +240,10 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut # Create a real WorkflowExecution workflow_execution = WorkflowExecution( - id="test-workflow-run-id", + id_="test-workflow-run-id", workflow_id="test-workflow-id", workflow_version="1.0", - type=WorkflowType.CHAT, + workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, started_at=datetime.now(UTC).replace(tzinfo=None), @@ -270,13 +260,13 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut workflow_run_id="test-workflow-run-id", total_tokens=50, total_steps=3, - status=WorkflowRunStatus.FAILED, + status=WorkflowExecutionStatus.FAILED, error_message="Test error message", ) # Verify the result assert result == workflow_execution - assert result.status == WorkflowExecutionStatus(WorkflowRunStatus.FAILED.value) + assert result.status == WorkflowExecutionStatus.FAILED assert result.error_message == "Test error message" assert result.total_tokens == 50 assert result.total_steps == 3 @@ -288,10 +278,10 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execu # Create a real WorkflowExecution workflow_execution = WorkflowExecution( - id="test-workflow-execution-id", + id_="test-workflow-execution-id", workflow_id="test-workflow-id", workflow_version="1.0", - type=WorkflowType.CHAT, + workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, started_at=datetime.now(UTC).replace(tzinfo=None), @@ -319,13 +309,13 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execu # Call the method result = workflow_cycle_manager.handle_node_execution_start( - workflow_execution_id=workflow_execution.id, + workflow_execution_id=workflow_execution.id_, event=event, ) # Verify the result assert result.workflow_id == workflow_execution.workflow_id - assert result.workflow_run_id == workflow_execution.id + assert result.workflow_run_id == workflow_execution.id_ assert result.node_execution_id == event.node_execution_id assert result.node_id == event.node_id assert result.node_type == event.node_type @@ -341,10 +331,10 @@ def test_get_workflow_execution_or_raise_error(workflow_cycle_manager, mock_work # Create a real WorkflowExecution workflow_execution = WorkflowExecution( - id="test-workflow-run-id", + id_="test-workflow-run-id", workflow_id="test-workflow-id", workflow_version="1.0", - type=WorkflowType.CHAT, + workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, started_at=datetime.now(UTC).replace(tzinfo=None), @@ -413,10 +403,10 @@ def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_workfl # Create a real WorkflowExecution workflow_execution = WorkflowExecution( - id="test-workflow-run-id", + id_="test-workflow-run-id", workflow_id="test-workflow-id", workflow_version="1.0", - type=WorkflowType.CHAT, + workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, started_at=datetime.now(UTC).replace(tzinfo=None), diff --git a/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py b/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py index 7c5020db02..4a7b901d66 100644 --- a/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py +++ b/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py @@ -14,7 +14,7 @@ from sqlalchemy.orm import Session, sessionmaker from core.model_runtime.utils.encoders import jsonable_encoder from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.workflow.entities.node_entities import NodeRunMetadataKey -from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus +from core.workflow.entities.workflow_node_execution import NodeExecution, NodeExecutionStatus from core.workflow.nodes.enums import NodeType from core.workflow.repository.workflow_node_execution_repository import OrderConfig from models.account import Account, Tenant