@ -1,11 +1,9 @@
from collections . abc import Mapping
from dataclasses import dataclass
from datetime import UTC , datetime
from typing import Any , Optional , Union
from uuid import uuid4
from sqlalchemy import func , select
from sqlalchemy . orm import Session
from core . app . entities . app_invoke_entities import AdvancedChatAppGenerateEntity , WorkflowAppGenerateEntity
from core . app . entities . queue_entities import (
QueueNodeExceptionEvent ,
@ -19,21 +17,24 @@ from core.app.entities.queue_entities import (
from core . app . task_pipeline . exc import WorkflowRunNotFoundError
from core . ops . entities . trace_entity import TraceTaskName
from core . ops . ops_trace_manager import TraceQueueManager , TraceTask
from core . workflow . entities . node_entities import NodeRunMetadataKey
from core . workflow . entities . node_execution_entities import (
NodeExecution ,
NodeExecutionStatus ,
from core . workflow . entities . workflow_execution import WorkflowExecution , WorkflowExecutionStatus , WorkflowType
from core . workflow . entities . workflow_node_execution import (
WorkflowNodeExecution ,
WorkflowNodeExecutionMetadataKey ,
WorkflowNodeExecutionStatus ,
)
from core . workflow . entities . workflow_execution_entities import WorkflowExecution , WorkflowExecutionStatus , WorkflowType
from core . workflow . enums import SystemVariableKey
from core . workflow . repositor y . workflow_execution_repository import WorkflowExecutionRepository
from core . workflow . repositor y . workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core . workflow . repositor ies . workflow_execution_repository import WorkflowExecutionRepository
from core . workflow . repositor ies . workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core . workflow . workflow_entry import WorkflowEntry
from models import (
Workflow ,
WorkflowRun ,
WorkflowRunStatus ,
)
@dataclass
class CycleManagerWorkflowInfo :
workflow_id : str
workflow_type : WorkflowType
version : str
graph_data : Mapping [ str , Any ]
class WorkflowCycleManager :
@ -42,32 +43,17 @@ class WorkflowCycleManager:
* ,
application_generate_entity : Union [ AdvancedChatAppGenerateEntity , WorkflowAppGenerateEntity ] ,
workflow_system_variables : dict [ SystemVariableKey , Any ] ,
workflow_info : CycleManagerWorkflowInfo ,
workflow_execution_repository : WorkflowExecutionRepository ,
workflow_node_execution_repository : WorkflowNodeExecutionRepository ,
) - > None :
self . _application_generate_entity = application_generate_entity
self . _workflow_system_variables = workflow_system_variables
self . _workflow_info = workflow_info
self . _workflow_execution_repository = workflow_execution_repository
self . _workflow_node_execution_repository = workflow_node_execution_repository
def handle_workflow_run_start (
self ,
* ,
session : Session ,
workflow_id : str ,
) - > WorkflowExecution :
workflow_stmt = select ( Workflow ) . where ( Workflow . id == workflow_id )
workflow = session . scalar ( workflow_stmt )
if not workflow :
raise ValueError ( f " Workflow not found: { workflow_id } " )
max_sequence_stmt = select ( func . max ( WorkflowRun . sequence_number ) ) . where (
WorkflowRun . tenant_id == workflow . tenant_id ,
WorkflowRun . app_id == workflow . app_id ,
)
max_sequence = session . scalar ( max_sequence_stmt ) or 0
new_sequence_number = max_sequence + 1
def handle_workflow_run_start ( self ) - > WorkflowExecution :
inputs = { * * self . _application_generate_entity . inputs }
for key , value in ( self . _workflow_system_variables or { } ) . items ( ) :
if key . value == " conversation " :
@ -79,14 +65,13 @@ class WorkflowCycleManager:
# init workflow run
# TODO: This workflow_run_id should always not be None, maybe we can use a more elegant way to handle this
execution_id = str ( self . _workflow_system_variables . get ( SystemVariableKey . WORKFLOW_ RU N_ID) or uuid4 ( ) )
execution_id = str ( self . _workflow_system_variables . get ( SystemVariableKey . WORKFLOW_ EXECUTIO N_ID) or uuid4 ( ) )
execution = WorkflowExecution . new (
id = execution_id ,
workflow_id = workflow . id ,
sequence_number = new_sequence_number ,
type = WorkflowType ( workflow . type ) ,
workflow_version = workflow . version ,
graph = workflow . graph_dict ,
id_ = execution_id ,
workflow_id = self . _workflow_info . workflow_id ,
workflow_type = self . _workflow_info . workflow_type ,
workflow_version = self . _workflow_info . version ,
graph = self . _workflow_info . graph_data ,
inputs = inputs ,
started_at = datetime . now ( UTC ) . replace ( tzinfo = None ) ,
)
@ -168,7 +153,7 @@ class WorkflowCycleManager:
workflow_run_id : str ,
total_tokens : int ,
total_steps : int ,
status : Workflow Ru nStatus,
status : Workflow Executio nStatus,
error_message : str ,
conversation_id : Optional [ str ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
@ -185,7 +170,7 @@ class WorkflowCycleManager:
# Use the instance repository to find running executions for a workflow run
running_node_executions = self . _workflow_node_execution_repository . get_running_executions (
workflow_run_id = workflow_execution . id
workflow_run_id = workflow_execution . id _
)
# Update the domain models
@ -193,7 +178,7 @@ class WorkflowCycleManager:
for node_execution in running_node_executions :
if node_execution . node_execution_id :
# Update the domain model
node_execution . status = NodeExecutionStatus. FAILED
node_execution . status = Workflow NodeExecutionStatus. FAILED
node_execution . error = error_message
node_execution . finished_at = now
node_execution . elapsed_time = ( now - node_execution . created_at ) . total_seconds ( )
@ -219,28 +204,28 @@ class WorkflowCycleManager:
* ,
workflow_execution_id : str ,
event : QueueNodeStartedEvent ,
) - > NodeExecution:
) - > Workflow NodeExecution:
workflow_execution = self . _get_workflow_execution_or_raise_error ( workflow_execution_id )
# Create a domain model
created_at = datetime . now ( UTC ) . replace ( tzinfo = None )
metadata = {
NodeRu nMetadataKey. PARALLEL_MODE_RUN_ID : event . parallel_mode_run_id ,
NodeRu nMetadataKey. ITERATION_ID : event . in_iteration_id ,
NodeRu nMetadataKey. LOOP_ID : event . in_loop_id ,
WorkflowNodeExecutio nMetadataKey. PARALLEL_MODE_RUN_ID : event . parallel_mode_run_id ,
WorkflowNodeExecutio nMetadataKey. ITERATION_ID : event . in_iteration_id ,
WorkflowNodeExecutio nMetadataKey. LOOP_ID : event . in_loop_id ,
}
domain_execution = NodeExecution(
domain_execution = Workflow NodeExecution(
id = str ( uuid4 ( ) ) ,
workflow_id = workflow_execution . workflow_id ,
workflow_ ru n_id= workflow_execution . id ,
workflow_ executio n_id= workflow_execution . id _ ,
predecessor_node_id = event . predecessor_node_id ,
index = event . node_run_index ,
node_execution_id = event . node_execution_id ,
node_id = event . node_id ,
node_type = event . node_type ,
title = event . node_data . title ,
status = NodeExecutionStatus. RUNNING ,
status = Workflow NodeExecutionStatus. RUNNING ,
metadata = metadata ,
created_at = created_at ,
)
@ -250,7 +235,7 @@ class WorkflowCycleManager:
return domain_execution
def handle_workflow_node_execution_success ( self , * , event : QueueNodeSucceededEvent ) - > NodeExecution:
def handle_workflow_node_execution_success ( self , * , event : QueueNodeSucceededEvent ) - > Workflow NodeExecution:
# Get the domain model from repository
domain_execution = self . _workflow_node_execution_repository . get_by_node_execution_id ( event . node_execution_id )
if not domain_execution :
@ -271,7 +256,7 @@ class WorkflowCycleManager:
elapsed_time = ( finished_at - event . start_at ) . total_seconds ( )
# Update domain model
domain_execution . status = NodeExecutionStatus. SUCCEEDED
domain_execution . status = Workflow NodeExecutionStatus. SUCCEEDED
domain_execution . update_from_mapping (
inputs = inputs , process_data = process_data , outputs = outputs , metadata = execution_metadata_dict
)
@ -290,7 +275,7 @@ class WorkflowCycleManager:
| QueueNodeInIterationFailedEvent
| QueueNodeInLoopFailedEvent
| QueueNodeExceptionEvent ,
) - > NodeExecution:
) - > Workflow NodeExecution:
"""
Workflow node execution failed
: param event : queue node failed event
@ -317,9 +302,9 @@ class WorkflowCycleManager:
# Update domain model
domain_execution . status = (
NodeExecutionStatus. FAILED
Workflow NodeExecutionStatus. FAILED
if not isinstance ( event , QueueNodeExceptionEvent )
else NodeExecutionStatus. EXCEPTION
else Workflow NodeExecutionStatus. EXCEPTION
)
domain_execution . error = event . error
domain_execution . update_from_mapping (
@ -335,7 +320,7 @@ class WorkflowCycleManager:
def handle_workflow_node_execution_retried (
self , * , workflow_execution_id : str , event : QueueNodeRetryEvent
) - > NodeExecution:
) - > Workflow NodeExecution:
workflow_execution = self . _get_workflow_execution_or_raise_error ( workflow_execution_id )
created_at = event . start_at
finished_at = datetime . now ( UTC ) . replace ( tzinfo = None )
@ -345,13 +330,13 @@ class WorkflowCycleManager:
# Convert metadata keys to strings
origin_metadata = {
NodeRu nMetadataKey. ITERATION_ID : event . in_iteration_id ,
NodeRu nMetadataKey. PARALLEL_MODE_RUN_ID : event . parallel_mode_run_id ,
NodeRu nMetadataKey. LOOP_ID : event . in_loop_id ,
WorkflowNodeExecutio nMetadataKey. ITERATION_ID : event . in_iteration_id ,
WorkflowNodeExecutio nMetadataKey. PARALLEL_MODE_RUN_ID : event . parallel_mode_run_id ,
WorkflowNodeExecutio nMetadataKey. LOOP_ID : event . in_loop_id ,
}
# Convert execution metadata keys to strings
execution_metadata_dict : dict [ NodeRu nMetadataKey, str | None ] = { }
execution_metadata_dict : dict [ WorkflowNodeExecutio nMetadataKey, str | None ] = { }
if event . execution_metadata :
for key , value in event . execution_metadata . items ( ) :
execution_metadata_dict [ key ] = value
@ -359,16 +344,16 @@ class WorkflowCycleManager:
merged_metadata = { * * execution_metadata_dict , * * origin_metadata } if execution_metadata_dict else origin_metadata
# Create a domain model
domain_execution = NodeExecution(
domain_execution = Workflow NodeExecution(
id = str ( uuid4 ( ) ) ,
workflow_id = workflow_execution . workflow_id ,
workflow_ ru n_id= workflow_execution . id ,
workflow_ executio n_id= workflow_execution . id _ ,
predecessor_node_id = event . predecessor_node_id ,
node_execution_id = event . node_execution_id ,
node_id = event . node_id ,
node_type = event . node_type ,
title = event . node_data . title ,
status = NodeExecutionStatus. RETRY ,
status = Workflow NodeExecutionStatus. RETRY ,
created_at = created_at ,
finished_at = finished_at ,
elapsed_time = elapsed_time ,