refactor(workflow): Rename entities files

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/20455/head
-LAN- 12 months ago
parent 94a61f9ecf
commit cf698ddccc
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -6,7 +6,7 @@ from sqlalchemy.orm import Session
from controllers.console import api from controllers.console import api
from controllers.console.app.wraps import get_app_model from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required from controllers.console.wraps import account_initialization_required, setup_required
from core.workflow.entities.workflow_execution_entities import WorkflowExecutionStatus from core.workflow.entities.workflow_execution import WorkflowExecutionStatus
from extensions.ext_database import db from extensions.ext_database import db
from fields.workflow_app_log_fields import workflow_app_log_pagination_fields from fields.workflow_app_log_fields import workflow_app_log_pagination_fields
from libs.login import login_required from libs.login import login_required

@ -24,7 +24,7 @@ from core.errors.error import (
QuotaExceededError, QuotaExceededError,
) )
from core.model_runtime.errors.invoke import InvokeError from core.model_runtime.errors.invoke import InvokeError
from core.workflow.entities.workflow_execution_entities import WorkflowExecutionStatus from core.workflow.entities.workflow_execution import WorkflowExecutionStatus
from extensions.ext_database import db from extensions.ext_database import db
from fields.workflow_app_log_fields import workflow_app_log_pagination_fields from fields.workflow_app_log_fields import workflow_app_log_pagination_fields
from libs import helper from libs import helper

@ -62,13 +62,13 @@ from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.utils.encoders import jsonable_encoder from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.ops_trace_manager import TraceQueueManager from core.ops.ops_trace_manager import TraceQueueManager
from core.workflow.entities.workflow_execution_entities import WorkflowExecutionStatus, WorkflowType from core.workflow.entities.workflow_execution import WorkflowExecutionStatus, WorkflowType
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.workflow_cycle_manager import TempWorkflowEntity, WorkflowCycleManager from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager
from events.message_event import message_was_created from events.message_event import message_was_created
from extensions.ext_database import db from extensions.ext_database import db
from models import Conversation, EndUser, Message, MessageFile from models import Conversation, EndUser, Message, MessageFile
@ -126,11 +126,11 @@ class AdvancedChatAppGenerateTaskPipeline:
SystemVariableKey.WORKFLOW_ID: workflow.id, SystemVariableKey.WORKFLOW_ID: workflow.id,
SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id, SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id,
}, },
workflow_entity=TempWorkflowEntity( workflow_info=CycleManagerWorkflowInfo(
id_=workflow.id, workflow_id=workflow.id,
type_=WorkflowType(workflow.type), workflow_type=WorkflowType(workflow.type),
version=workflow.version, version=workflow.version,
graph=workflow.graph_dict, graph_data=workflow.graph_dict,
), ),
workflow_execution_repository=workflow_execution_repository, workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository,
@ -306,15 +306,12 @@ class AdvancedChatAppGenerateTaskPipeline:
with Session(db.engine, expire_on_commit=False) as session: with Session(db.engine, expire_on_commit=False) as session:
# init workflow run # init workflow run
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start( workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start()
session=session, self._workflow_run_id = workflow_execution.id_
workflow_id=self._workflow_id,
)
self._workflow_run_id = workflow_execution.id
message = self._get_message(session=session) message = self._get_message(session=session)
if not message: if not message:
raise ValueError(f"Message not found: {self._message_id}") raise ValueError(f"Message not found: {self._message_id}")
message.workflow_run_id = workflow_execution.id message.workflow_run_id = workflow_execution.id_
workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response( workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,

@ -44,8 +44,8 @@ from core.app.entities.task_entities import (
) )
from core.file import FILE_MODEL_IDENTITY, File from core.file import FILE_MODEL_IDENTITY, File
from core.tools.tool_manager import ToolManager from core.tools.tool_manager import ToolManager
from core.workflow.entities.node_execution_entities import NodeExecution from core.workflow.entities.workflow_execution import WorkflowExecution
from core.workflow.entities.workflow_execution_entities import WorkflowExecution from core.workflow.entities.workflow_node_execution import NodeExecution
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.nodes.tool.entities import ToolNodeData from core.workflow.nodes.tool.entities import ToolNodeData
from models import ( from models import (
@ -73,9 +73,9 @@ class WorkflowResponseConverter:
) -> WorkflowStartStreamResponse: ) -> WorkflowStartStreamResponse:
return WorkflowStartStreamResponse( return WorkflowStartStreamResponse(
task_id=task_id, task_id=task_id,
workflow_run_id=workflow_execution.id, workflow_run_id=workflow_execution.id_,
data=WorkflowStartStreamResponse.Data( data=WorkflowStartStreamResponse.Data(
id=workflow_execution.id, id=workflow_execution.id_,
workflow_id=workflow_execution.workflow_id, workflow_id=workflow_execution.workflow_id,
inputs=workflow_execution.inputs, inputs=workflow_execution.inputs,
created_at=int(workflow_execution.started_at.timestamp()), created_at=int(workflow_execution.started_at.timestamp()),
@ -90,7 +90,7 @@ class WorkflowResponseConverter:
workflow_execution: WorkflowExecution, workflow_execution: WorkflowExecution,
) -> WorkflowFinishStreamResponse: ) -> WorkflowFinishStreamResponse:
created_by = None created_by = None
workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id)) workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id_))
assert workflow_run is not None assert workflow_run is not None
if workflow_run.created_by_role == CreatorUserRole.ACCOUNT: if workflow_run.created_by_role == CreatorUserRole.ACCOUNT:
stmt = select(Account).where(Account.id == workflow_run.created_by) stmt = select(Account).where(Account.id == workflow_run.created_by)
@ -121,9 +121,9 @@ class WorkflowResponseConverter:
return WorkflowFinishStreamResponse( return WorkflowFinishStreamResponse(
task_id=task_id, task_id=task_id,
workflow_run_id=workflow_execution.id, workflow_run_id=workflow_execution.id_,
data=WorkflowFinishStreamResponse.Data( data=WorkflowFinishStreamResponse.Data(
id=workflow_execution.id, id=workflow_execution.id_,
workflow_id=workflow_execution.workflow_id, workflow_id=workflow_execution.workflow_id,
status=workflow_execution.status, status=workflow_execution.status,
outputs=workflow_execution.outputs, outputs=workflow_execution.outputs,

@ -55,11 +55,11 @@ from core.app.entities.task_entities import (
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.ops.ops_trace_manager import TraceQueueManager from core.ops.ops_trace_manager import TraceQueueManager
from core.workflow.entities.workflow_execution_entities import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.workflow_cycle_manager import TempWorkflowEntity, WorkflowCycleManager from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager
from extensions.ext_database import db from extensions.ext_database import db
from models.account import Account from models.account import Account
from models.enums import CreatorUserRole from models.enums import CreatorUserRole
@ -115,11 +115,11 @@ class WorkflowAppGenerateTaskPipeline:
SystemVariableKey.WORKFLOW_ID: workflow.id, SystemVariableKey.WORKFLOW_ID: workflow.id,
SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_execution_id, SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_execution_id,
}, },
workflow_entity=TempWorkflowEntity( workflow_info=CycleManagerWorkflowInfo(
id_=workflow.id, workflow_id=workflow.id,
type_=WorkflowType(workflow.type), workflow_type=WorkflowType(workflow.type),
version=workflow.version, version=workflow.version,
graph=workflow.graph_dict, graph_data=workflow.graph_dict,
), ),
workflow_execution_repository=workflow_execution_repository, workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository,
@ -271,17 +271,13 @@ class WorkflowAppGenerateTaskPipeline:
# override graph runtime state # override graph runtime state
graph_runtime_state = event.graph_runtime_state graph_runtime_state = event.graph_runtime_state
with Session(db.engine, expire_on_commit=False) as session: # init workflow run
# init workflow run workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start()
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start( self._workflow_run_id = workflow_execution.id_
session=session, start_resp = self._workflow_response_converter.workflow_start_to_stream_response(
workflow_id=self._workflow_id, task_id=self._application_generate_entity.task_id,
) workflow_execution=workflow_execution,
self._workflow_run_id = workflow_execution.id )
start_resp = self._workflow_response_converter.workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution,
)
yield start_resp yield start_resp
elif isinstance( elif isinstance(
@ -562,7 +558,7 @@ class WorkflowAppGenerateTaskPipeline:
tts_publisher.publish(None) tts_publisher.publish(None)
def _save_workflow_app_log(self, *, session: Session, workflow_execution: WorkflowExecution) -> None: def _save_workflow_app_log(self, *, session: Session, workflow_execution: WorkflowExecution) -> None:
workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id)) workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id_))
assert workflow_run is not None assert workflow_run is not None
invoke_from = self._application_generate_entity.invoke_from invoke_from = self._application_generate_entity.invoke_from
if invoke_from == InvokeFrom.SERVICE_API: if invoke_from == InvokeFrom.SERVICE_API:

@ -30,7 +30,7 @@ from core.ops.entities.trace_entity import (
WorkflowTraceInfo, WorkflowTraceInfo,
) )
from core.ops.utils import get_message_data from core.ops.utils import get_message_data
from core.workflow.entities.workflow_execution_entities import WorkflowExecution from core.workflow.entities.workflow_execution import WorkflowExecution
from extensions.ext_database import db from extensions.ext_database import db
from extensions.ext_storage import storage from extensions.ext_storage import storage
from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
@ -386,7 +386,7 @@ class TraceTask:
): ):
self.trace_type = trace_type self.trace_type = trace_type
self.message_id = message_id self.message_id = message_id
self.workflow_run_id = workflow_execution.id if workflow_execution else None self.workflow_run_id = workflow_execution.id_ if workflow_execution else None
self.conversation_id = conversation_id self.conversation_id = conversation_id
self.user_id = user_id self.user_id = user_id
self.timer = timer self.timer = timer

@ -10,7 +10,7 @@ from sqlalchemy import select
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from core.workflow.entities.workflow_execution_entities import ( from core.workflow.entities.workflow_execution import (
WorkflowExecution, WorkflowExecution,
WorkflowExecutionStatus, WorkflowExecutionStatus,
WorkflowType, WorkflowType,
@ -104,9 +104,9 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository):
status = WorkflowExecutionStatus(db_model.status) status = WorkflowExecutionStatus(db_model.status)
return WorkflowExecution( return WorkflowExecution(
id=db_model.id, id_=db_model.id,
workflow_id=db_model.workflow_id, workflow_id=db_model.workflow_id,
type=WorkflowType(db_model.type), workflow_type=WorkflowType(db_model.type),
workflow_version=db_model.version, workflow_version=db_model.version,
graph=graph, graph=graph,
inputs=inputs, inputs=inputs,
@ -139,7 +139,7 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository):
raise ValueError("created_by_role is required in repository constructor") raise ValueError("created_by_role is required in repository constructor")
db_model = WorkflowRun() db_model = WorkflowRun()
db_model.id = domain_model.id db_model.id = domain_model.id_
db_model.tenant_id = self._tenant_id db_model.tenant_id = self._tenant_id
if self._app_id is not None: if self._app_id is not None:
db_model.app_id = self._app_id db_model.app_id = self._app_id
@ -148,7 +148,7 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository):
# Check if this is a new record # Check if this is a new record
with self._session_factory() as session: with self._session_factory() as session:
existing = session.scalar(select(WorkflowRun).where(WorkflowRun.id == domain_model.id)) existing = session.scalar(select(WorkflowRun).where(WorkflowRun.id == domain_model.id_))
if not existing: if not existing:
# For new records, get the next sequence number # For new records, get the next sequence number
stmt = select(WorkflowRun.sequence_number).where( stmt = select(WorkflowRun.sequence_number).where(
@ -161,7 +161,7 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository):
# For updates, keep the existing sequence number # For updates, keep the existing sequence number
db_model.sequence_number = existing.sequence_number db_model.sequence_number = existing.sequence_number
db_model.type = domain_model.type db_model.type = domain_model.workflow_type
db_model.version = domain_model.workflow_version db_model.version = domain_model.workflow_version
db_model.graph = json.dumps(domain_model.graph) if domain_model.graph else None db_model.graph = json.dumps(domain_model.graph) if domain_model.graph else None
db_model.inputs = json.dumps(domain_model.inputs) if domain_model.inputs else None db_model.inputs = json.dumps(domain_model.inputs) if domain_model.inputs else None

@ -13,7 +13,7 @@ from sqlalchemy.orm import sessionmaker
from core.model_runtime.utils.encoders import jsonable_encoder from core.model_runtime.utils.encoders import jsonable_encoder
from core.workflow.entities.node_entities import NodeRunMetadataKey from core.workflow.entities.node_entities import NodeRunMetadataKey
from core.workflow.entities.node_execution_entities import ( from core.workflow.entities.workflow_node_execution import (
NodeExecution, NodeExecution,
NodeExecutionStatus, NodeExecutionStatus,
) )

@ -36,10 +36,10 @@ class WorkflowExecution(BaseModel):
user, tenant, and app attributes. user, tenant, and app attributes.
""" """
id: str = Field(...) id_: str = Field(...)
workflow_id: str = Field(...) workflow_id: str = Field(...)
workflow_version: str = Field(...) workflow_version: str = Field(...)
type: WorkflowType = Field(...) workflow_type: WorkflowType = Field(...)
graph: Mapping[str, Any] = Field(...) graph: Mapping[str, Any] = Field(...)
inputs: Mapping[str, Any] = Field(...) inputs: Mapping[str, Any] = Field(...)
@ -67,18 +67,18 @@ class WorkflowExecution(BaseModel):
def new( def new(
cls, cls,
*, *,
id: str, id_: str,
workflow_id: str, workflow_id: str,
type: WorkflowType, workflow_type: WorkflowType,
workflow_version: str, workflow_version: str,
graph: Mapping[str, Any], graph: Mapping[str, Any],
inputs: Mapping[str, Any], inputs: Mapping[str, Any],
started_at: datetime, started_at: datetime,
) -> "WorkflowExecution": ) -> "WorkflowExecution":
return WorkflowExecution( return WorkflowExecution(
id=id, id_=id_,
workflow_id=workflow_id, workflow_id=workflow_id,
type=type, workflow_type=workflow_type,
workflow_version=workflow_version, workflow_version=workflow_version,
graph=graph, graph=graph,
inputs=inputs, inputs=inputs,

@ -1,6 +1,6 @@
from typing import Optional, Protocol from typing import Optional, Protocol
from core.workflow.entities.workflow_execution_entities import WorkflowExecution from core.workflow.entities.workflow_execution import WorkflowExecution
class WorkflowExecutionRepository(Protocol): class WorkflowExecutionRepository(Protocol):

@ -2,7 +2,7 @@ from collections.abc import Sequence
from dataclasses import dataclass from dataclasses import dataclass
from typing import Literal, Optional, Protocol from typing import Literal, Optional, Protocol
from core.workflow.entities.node_execution_entities import NodeExecution from core.workflow.entities.workflow_node_execution import NodeExecution
@dataclass @dataclass

@ -4,8 +4,6 @@ from datetime import UTC, datetime
from typing import Any, Optional, Union from typing import Any, Optional, Union
from uuid import uuid4 from uuid import uuid4
from sqlalchemy.orm import Session
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity
from core.app.entities.queue_entities import ( from core.app.entities.queue_entities import (
QueueNodeExceptionEvent, QueueNodeExceptionEvent,
@ -20,11 +18,11 @@ from core.app.task_pipeline.exc import WorkflowRunNotFoundError
from core.ops.entities.trace_entity import TraceTaskName from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.workflow.entities.node_entities import NodeRunMetadataKey from core.workflow.entities.node_entities import NodeRunMetadataKey
from core.workflow.entities.node_execution_entities import ( from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType
from core.workflow.entities.workflow_node_execution import (
NodeExecution, NodeExecution,
NodeExecutionStatus, NodeExecutionStatus,
) )
from core.workflow.entities.workflow_execution_entities import WorkflowExecution, WorkflowExecutionStatus, WorkflowType
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
@ -32,11 +30,11 @@ from core.workflow.workflow_entry import WorkflowEntry
@dataclass @dataclass
class TempWorkflowEntity: class CycleManagerWorkflowInfo:
id_: str workflow_id: str
type_: WorkflowType workflow_type: WorkflowType
version: str version: str
graph: Mapping[str, Any] graph_data: Mapping[str, Any]
class WorkflowCycleManager: class WorkflowCycleManager:
@ -45,22 +43,17 @@ class WorkflowCycleManager:
*, *,
application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity], application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity],
workflow_system_variables: dict[SystemVariableKey, Any], workflow_system_variables: dict[SystemVariableKey, Any],
workflow_entity: TempWorkflowEntity, workflow_info: CycleManagerWorkflowInfo,
workflow_execution_repository: WorkflowExecutionRepository, workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository,
) -> None: ) -> None:
self._application_generate_entity = application_generate_entity self._application_generate_entity = application_generate_entity
self._workflow_system_variables = workflow_system_variables self._workflow_system_variables = workflow_system_variables
self._workflow_info = workflow_info
self._workflow_execution_repository = workflow_execution_repository self._workflow_execution_repository = workflow_execution_repository
self._workflow_node_execution_repository = workflow_node_execution_repository self._workflow_node_execution_repository = workflow_node_execution_repository
self._temp_workflow_entity = workflow_entity
def handle_workflow_run_start( def handle_workflow_run_start(self) -> WorkflowExecution:
self,
*,
session: Session,
workflow_id: str,
) -> WorkflowExecution:
inputs = {**self._application_generate_entity.inputs} inputs = {**self._application_generate_entity.inputs}
for key, value in (self._workflow_system_variables or {}).items(): for key, value in (self._workflow_system_variables or {}).items():
if key.value == "conversation": if key.value == "conversation":
@ -74,11 +67,11 @@ class WorkflowCycleManager:
# TODO: This workflow_run_id should always not be None, maybe we can use a more elegant way to handle this # TODO: This workflow_run_id should always not be None, maybe we can use a more elegant way to handle this
execution_id = str(self._workflow_system_variables.get(SystemVariableKey.WORKFLOW_RUN_ID) or uuid4()) execution_id = str(self._workflow_system_variables.get(SystemVariableKey.WORKFLOW_RUN_ID) or uuid4())
execution = WorkflowExecution.new( execution = WorkflowExecution.new(
id=execution_id, id_=execution_id,
workflow_id=self._temp_workflow_entity.id_, workflow_id=self._workflow_info.workflow_id,
type=self._temp_workflow_entity.type_, workflow_type=self._workflow_info.workflow_type,
workflow_version=self._temp_workflow_entity.version, workflow_version=self._workflow_info.version,
graph=self._temp_workflow_entity.graph, graph=self._workflow_info.graph_data,
inputs=inputs, inputs=inputs,
started_at=datetime.now(UTC).replace(tzinfo=None), started_at=datetime.now(UTC).replace(tzinfo=None),
) )
@ -177,7 +170,7 @@ class WorkflowCycleManager:
# Use the instance repository to find running executions for a workflow run # Use the instance repository to find running executions for a workflow run
running_node_executions = self._workflow_node_execution_repository.get_running_executions( running_node_executions = self._workflow_node_execution_repository.get_running_executions(
workflow_run_id=workflow_execution.id workflow_run_id=workflow_execution.id_
) )
# Update the domain models # Update the domain models
@ -225,7 +218,7 @@ class WorkflowCycleManager:
domain_execution = NodeExecution( domain_execution = NodeExecution(
id=str(uuid4()), id=str(uuid4()),
workflow_id=workflow_execution.workflow_id, workflow_id=workflow_execution.workflow_id,
workflow_run_id=workflow_execution.id, workflow_run_id=workflow_execution.id_,
predecessor_node_id=event.predecessor_node_id, predecessor_node_id=event.predecessor_node_id,
index=event.node_run_index, index=event.node_run_index,
node_execution_id=event.node_execution_id, node_execution_id=event.node_execution_id,
@ -354,7 +347,7 @@ class WorkflowCycleManager:
domain_execution = NodeExecution( domain_execution = NodeExecution(
id=str(uuid4()), id=str(uuid4()),
workflow_id=workflow_execution.workflow_id, workflow_id=workflow_execution.workflow_id,
workflow_run_id=workflow_execution.id, workflow_run_id=workflow_execution.id_,
predecessor_node_id=event.predecessor_node_id, predecessor_node_id=event.predecessor_node_id,
node_execution_id=event.node_execution_id, node_execution_id=event.node_execution_id,
node_id=event.node_id, node_id=event.node_id,

@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, Any, Literal, Optional, cast
from core.plugin.entities.plugin import GenericProviderID from core.plugin.entities.plugin import GenericProviderID
from core.tools.entities.tool_entities import ToolProviderType from core.tools.entities.tool_entities import ToolProviderType
from core.tools.signature import sign_tool_file from core.tools.signature import sign_tool_file
from core.workflow.entities.workflow_execution_entities import WorkflowExecutionStatus from core.workflow.entities.workflow_execution import WorkflowExecutionStatus
from services.plugin.plugin_service import PluginService from services.plugin.plugin_service import PluginService
if TYPE_CHECKING: if TYPE_CHECKING:

@ -4,7 +4,7 @@ from datetime import datetime
from sqlalchemy import and_, func, or_, select from sqlalchemy import and_, func, or_, select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from core.workflow.entities.workflow_execution_entities import WorkflowExecutionStatus from core.workflow.entities.workflow_execution import WorkflowExecutionStatus
from models import App, EndUser, WorkflowAppLog, WorkflowRun from models import App, EndUser, WorkflowAppLog, WorkflowRun
from models.enums import CreatorUserRole from models.enums import CreatorUserRole

@ -13,7 +13,7 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.variables import Variable from core.variables import Variable
from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus from core.workflow.entities.workflow_node_execution import NodeExecution, NodeExecutionStatus
from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.graph_engine.entities.event import InNodeEvent from core.workflow.graph_engine.entities.event import InNodeEvent
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType

@ -13,20 +13,16 @@ from core.app.entities.queue_entities import (
QueueNodeSucceededEvent, QueueNodeSucceededEvent,
) )
from core.workflow.entities.node_entities import NodeRunMetadataKey from core.workflow.entities.node_entities import NodeRunMetadataKey
from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType
from core.workflow.entities.workflow_execution_entities import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.entities.workflow_node_execution import NodeExecution, NodeExecutionStatus
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.workflow_cycle_manager import TempWorkflowEntity, WorkflowCycleManager from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager
from models.enums import CreatorUserRole from models.enums import CreatorUserRole
from models.model import AppMode from models.model import AppMode
from models.workflow import ( from models.workflow import Workflow, WorkflowRun
Workflow,
WorkflowRun,
WorkflowRunStatus,
)
@pytest.fixture @pytest.fixture
@ -95,11 +91,11 @@ def mock_workflow_execution_repository():
@pytest.fixture @pytest.fixture
def real_workflow_entity(): def real_workflow_entity():
return TempWorkflowEntity( return CycleManagerWorkflowInfo(
id_="test-workflow-id", # Matches ID used in other fixtures workflow_id="test-workflow-id", # Matches ID used in other fixtures
type_=WorkflowType.CHAT, workflow_type=WorkflowType.CHAT,
version="1.0.0", version="1.0.0",
graph={ graph_data={
"nodes": [ "nodes": [
{ {
"id": "node1", "id": "node1",
@ -124,7 +120,7 @@ def workflow_cycle_manager(
return WorkflowCycleManager( return WorkflowCycleManager(
application_generate_entity=real_app_generate_entity, application_generate_entity=real_app_generate_entity,
workflow_system_variables=real_workflow_system_variables, workflow_system_variables=real_workflow_system_variables,
workflow_entity=real_workflow_entity, workflow_info=real_workflow_entity,
workflow_execution_repository=mock_workflow_execution_repository, workflow_execution_repository=mock_workflow_execution_repository,
workflow_node_execution_repository=mock_node_execution_repository, workflow_node_execution_repository=mock_node_execution_repository,
) )
@ -170,7 +166,7 @@ def real_workflow_run():
workflow_run.version = "1.0" workflow_run.version = "1.0"
workflow_run.graph = json.dumps({"nodes": [], "edges": []}) workflow_run.graph = json.dumps({"nodes": [], "edges": []})
workflow_run.inputs = json.dumps({"query": "test query"}) workflow_run.inputs = json.dumps({"query": "test query"})
workflow_run.status = WorkflowRunStatus.RUNNING workflow_run.status = WorkflowExecutionStatus.RUNNING
workflow_run.outputs = json.dumps({"answer": "test answer"}) workflow_run.outputs = json.dumps({"answer": "test answer"})
workflow_run.created_by_role = CreatorUserRole.ACCOUNT workflow_run.created_by_role = CreatorUserRole.ACCOUNT
workflow_run.created_by = "test-user-id" workflow_run.created_by = "test-user-id"
@ -193,19 +189,13 @@ def test_init(
assert workflow_cycle_manager._workflow_node_execution_repository == mock_node_execution_repository assert workflow_cycle_manager._workflow_node_execution_repository == mock_node_execution_repository
def test_handle_workflow_run_start(workflow_cycle_manager, mock_session, real_workflow): def test_handle_workflow_run_start(workflow_cycle_manager):
"""Test handle_workflow_run_start method""" """Test handle_workflow_run_start method"""
# Mock session.scalar to return the workflow and max sequence
mock_session.scalar.side_effect = [real_workflow, 5]
# Call the method # Call the method
workflow_execution = workflow_cycle_manager.handle_workflow_run_start( workflow_execution = workflow_cycle_manager.handle_workflow_run_start()
session=mock_session,
workflow_id="test-workflow-id",
)
# Verify the result # Verify the result
assert workflow_execution.workflow_id == real_workflow.id assert workflow_execution.workflow_id == "test-workflow-id"
# Verify the workflow_execution_repository.save was called # Verify the workflow_execution_repository.save was called
workflow_cycle_manager._workflow_execution_repository.save.assert_called_once_with(workflow_execution) workflow_cycle_manager._workflow_execution_repository.save.assert_called_once_with(workflow_execution)
@ -216,10 +206,10 @@ def test_handle_workflow_run_success(workflow_cycle_manager, mock_workflow_execu
# Create a real WorkflowExecution # Create a real WorkflowExecution
workflow_execution = WorkflowExecution( workflow_execution = WorkflowExecution(
id="test-workflow-run-id", id_="test-workflow-run-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
workflow_version="1.0", workflow_version="1.0",
type=WorkflowType.CHAT, workflow_type=WorkflowType.CHAT,
graph={"nodes": [], "edges": []}, graph={"nodes": [], "edges": []},
inputs={"query": "test query"}, inputs={"query": "test query"},
started_at=datetime.now(UTC).replace(tzinfo=None), started_at=datetime.now(UTC).replace(tzinfo=None),
@ -250,10 +240,10 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut
# Create a real WorkflowExecution # Create a real WorkflowExecution
workflow_execution = WorkflowExecution( workflow_execution = WorkflowExecution(
id="test-workflow-run-id", id_="test-workflow-run-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
workflow_version="1.0", workflow_version="1.0",
type=WorkflowType.CHAT, workflow_type=WorkflowType.CHAT,
graph={"nodes": [], "edges": []}, graph={"nodes": [], "edges": []},
inputs={"query": "test query"}, inputs={"query": "test query"},
started_at=datetime.now(UTC).replace(tzinfo=None), started_at=datetime.now(UTC).replace(tzinfo=None),
@ -270,13 +260,13 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut
workflow_run_id="test-workflow-run-id", workflow_run_id="test-workflow-run-id",
total_tokens=50, total_tokens=50,
total_steps=3, total_steps=3,
status=WorkflowRunStatus.FAILED, status=WorkflowExecutionStatus.FAILED,
error_message="Test error message", error_message="Test error message",
) )
# Verify the result # Verify the result
assert result == workflow_execution assert result == workflow_execution
assert result.status == WorkflowExecutionStatus(WorkflowRunStatus.FAILED.value) assert result.status == WorkflowExecutionStatus.FAILED
assert result.error_message == "Test error message" assert result.error_message == "Test error message"
assert result.total_tokens == 50 assert result.total_tokens == 50
assert result.total_steps == 3 assert result.total_steps == 3
@ -288,10 +278,10 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execu
# Create a real WorkflowExecution # Create a real WorkflowExecution
workflow_execution = WorkflowExecution( workflow_execution = WorkflowExecution(
id="test-workflow-execution-id", id_="test-workflow-execution-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
workflow_version="1.0", workflow_version="1.0",
type=WorkflowType.CHAT, workflow_type=WorkflowType.CHAT,
graph={"nodes": [], "edges": []}, graph={"nodes": [], "edges": []},
inputs={"query": "test query"}, inputs={"query": "test query"},
started_at=datetime.now(UTC).replace(tzinfo=None), started_at=datetime.now(UTC).replace(tzinfo=None),
@ -319,13 +309,13 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execu
# Call the method # Call the method
result = workflow_cycle_manager.handle_node_execution_start( result = workflow_cycle_manager.handle_node_execution_start(
workflow_execution_id=workflow_execution.id, workflow_execution_id=workflow_execution.id_,
event=event, event=event,
) )
# Verify the result # Verify the result
assert result.workflow_id == workflow_execution.workflow_id assert result.workflow_id == workflow_execution.workflow_id
assert result.workflow_run_id == workflow_execution.id assert result.workflow_run_id == workflow_execution.id_
assert result.node_execution_id == event.node_execution_id assert result.node_execution_id == event.node_execution_id
assert result.node_id == event.node_id assert result.node_id == event.node_id
assert result.node_type == event.node_type assert result.node_type == event.node_type
@ -341,10 +331,10 @@ def test_get_workflow_execution_or_raise_error(workflow_cycle_manager, mock_work
# Create a real WorkflowExecution # Create a real WorkflowExecution
workflow_execution = WorkflowExecution( workflow_execution = WorkflowExecution(
id="test-workflow-run-id", id_="test-workflow-run-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
workflow_version="1.0", workflow_version="1.0",
type=WorkflowType.CHAT, workflow_type=WorkflowType.CHAT,
graph={"nodes": [], "edges": []}, graph={"nodes": [], "edges": []},
inputs={"query": "test query"}, inputs={"query": "test query"},
started_at=datetime.now(UTC).replace(tzinfo=None), started_at=datetime.now(UTC).replace(tzinfo=None),
@ -413,10 +403,10 @@ def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_workfl
# Create a real WorkflowExecution # Create a real WorkflowExecution
workflow_execution = WorkflowExecution( workflow_execution = WorkflowExecution(
id="test-workflow-run-id", id_="test-workflow-run-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
workflow_version="1.0", workflow_version="1.0",
type=WorkflowType.CHAT, workflow_type=WorkflowType.CHAT,
graph={"nodes": [], "edges": []}, graph={"nodes": [], "edges": []},
inputs={"query": "test query"}, inputs={"query": "test query"},
started_at=datetime.now(UTC).replace(tzinfo=None), started_at=datetime.now(UTC).replace(tzinfo=None),

@ -14,7 +14,7 @@ from sqlalchemy.orm import Session, sessionmaker
from core.model_runtime.utils.encoders import jsonable_encoder from core.model_runtime.utils.encoders import jsonable_encoder
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.entities.node_entities import NodeRunMetadataKey from core.workflow.entities.node_entities import NodeRunMetadataKey
from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus from core.workflow.entities.workflow_node_execution import NodeExecution, NodeExecutionStatus
from core.workflow.nodes.enums import NodeType from core.workflow.nodes.enums import NodeType
from core.workflow.repository.workflow_node_execution_repository import OrderConfig from core.workflow.repository.workflow_node_execution_repository import OrderConfig
from models.account import Account, Tenant from models.account import Account, Tenant

Loading…
Cancel
Save