@ -125,6 +125,7 @@ class WorkflowCycleManager:
)
)
self . _workflow_execution_repository . save ( workflow_execution )
return workflow_execution
def handle_workflow_run_partial_success (
@ -158,6 +159,7 @@ class WorkflowCycleManager:
)
)
self . _workflow_execution_repository . save ( execution )
return execution
def handle_workflow_run_failed (
@ -172,44 +174,45 @@ class WorkflowCycleManager:
trace_manager : Optional [ TraceQueueManager ] = None ,
exceptions_count : int = 0 ,
) - > WorkflowExecution :
execution = self . _get_workflow_execution_or_raise_error ( workflow_run_id )
workflow_ execution = self . _get_workflow_execution_or_raise_error ( workflow_run_id )
execution. status = WorkflowExecutionStatus ( status . value )
execution. error_message = error_message
execution. total_tokens = total_tokens
execution. total_steps = total_steps
execution. finished_at = datetime . now ( UTC ) . replace ( tzinfo = None )
execution. exceptions_count = exceptions_count
workflow_ execution. status = WorkflowExecutionStatus ( status . value )
workflow_ execution. error_message = error_message
workflow_ execution. total_tokens = total_tokens
workflow_ execution. total_steps = total_steps
workflow_ execution. finished_at = datetime . now ( UTC ) . replace ( tzinfo = None )
workflow_ execution. exceptions_count = exceptions_count
# Use the instance repository to find running executions for a workflow run
running_ domai n_executions = self . _workflow_node_execution_repository . get_running_executions (
workflow_run_id = execution. id
running_ node _executions = self . _workflow_node_execution_repository . get_running_executions (
workflow_run_id = workflow_ execution. id
)
# Update the domain models
now = datetime . now ( UTC ) . replace ( tzinfo = None )
for domai n_execution in running_ domai n_executions:
if domai n_execution. node_execution_id :
for node _execution in running_ node _executions:
if node _execution. node_execution_id :
# Update the domain model
domai n_execution. status = NodeExecutionStatus . FAILED
domai n_execution. error = error_message
domai n_execution. finished_at = now
domai n_execution. elapsed_time = ( now - domai n_execution. created_at ) . total_seconds ( )
node _execution. status = NodeExecutionStatus . FAILED
node _execution. error = error_message
node _execution. finished_at = now
node _execution. elapsed_time = ( now - node _execution. created_at ) . total_seconds ( )
# Update the repository with the domain model
self . _workflow_node_execution_repository . save ( domai n_execution)
self . _workflow_node_execution_repository . save ( node _execution)
if trace_manager :
trace_manager . add_trace_task (
TraceTask (
TraceTaskName . WORKFLOW_TRACE ,
workflow_execution = execution,
workflow_execution = workflow_ execution,
conversation_id = conversation_id ,
user_id = trace_manager . user_id ,
)
)
return execution
self . _workflow_execution_repository . save ( workflow_execution )
return workflow_execution
def handle_node_execution_start (
self ,