@ -6,7 +6,7 @@ from typing import Any, Optional, Union, cast
from uuid import uuid4
from uuid import uuid4
from sqlalchemy import func , select
from sqlalchemy import func , select
from sqlalchemy . orm import Session
from sqlalchemy . orm import Session , sessionmaker
from core . app . entities . app_invoke_entities import AdvancedChatAppGenerateEntity , InvokeFrom , WorkflowAppGenerateEntity
from core . app . entities . app_invoke_entities import AdvancedChatAppGenerateEntity , InvokeFrom , WorkflowAppGenerateEntity
from core . app . entities . queue_entities import (
from core . app . entities . queue_entities import (
@ -49,12 +49,14 @@ from core.file import FILE_MODEL_IDENTITY, File
from core . model_runtime . utils . encoders import jsonable_encoder
from core . model_runtime . utils . encoders import jsonable_encoder
from core . ops . entities . trace_entity import TraceTaskName
from core . ops . entities . trace_entity import TraceTaskName
from core . ops . ops_trace_manager import TraceQueueManager , TraceTask
from core . ops . ops_trace_manager import TraceQueueManager , TraceTask
from core . repository import RepositoryFactory
from core . tools . tool_manager import ToolManager
from core . tools . tool_manager import ToolManager
from core . workflow . entities . node_entities import NodeRunMetadataKey
from core . workflow . entities . node_entities import NodeRunMetadataKey
from core . workflow . enums import SystemVariableKey
from core . workflow . enums import SystemVariableKey
from core . workflow . nodes import NodeType
from core . workflow . nodes import NodeType
from core . workflow . nodes . tool . entities import ToolNodeData
from core . workflow . nodes . tool . entities import ToolNodeData
from core . workflow . workflow_entry import WorkflowEntry
from core . workflow . workflow_entry import WorkflowEntry
from extensions . ext_database import db
from models . account import Account
from models . account import Account
from models . enums import CreatedByRole , WorkflowRunTriggeredFrom
from models . enums import CreatedByRole , WorkflowRunTriggeredFrom
from models . model import EndUser
from models . model import EndUser
@ -80,6 +82,21 @@ class WorkflowCycleManage:
self . _application_generate_entity = application_generate_entity
self . _application_generate_entity = application_generate_entity
self . _workflow_system_variables = workflow_system_variables
self . _workflow_system_variables = workflow_system_variables
# Initialize the session factory and repository
# We use the global db engine instead of the session passed to methods
# Disable expire_on_commit to avoid the need for merging objects
self . _session_factory = sessionmaker ( bind = db . engine , expire_on_commit = False )
self . _workflow_node_execution_repository = RepositoryFactory . create_workflow_node_execution_repository (
params = {
" tenant_id " : self . _application_generate_entity . app_config . tenant_id ,
" app_id " : self . _application_generate_entity . app_config . app_id ,
" session_factory " : self . _session_factory ,
}
)
# We'll still keep the cache for backward compatibility and performance
# but use the repository for database operations
def _handle_workflow_run_start (
def _handle_workflow_run_start (
self ,
self ,
* ,
* ,
@ -254,19 +271,15 @@ class WorkflowCycleManage:
workflow_run . finished_at = datetime . now ( UTC ) . replace ( tzinfo = None )
workflow_run . finished_at = datetime . now ( UTC ) . replace ( tzinfo = None )
workflow_run . exceptions_count = exceptions_count
workflow_run . exceptions_count = exceptions_count
stmt = select ( WorkflowNodeExecution . node_execution_id ) . where (
# Use the instance repository to find running executions for a workflow run
WorkflowNodeExecution . tenant_id == workflow_run . tenant_id ,
running_workflow_node_executions = self . _workflow_node_execution_repository . get_running_executions (
WorkflowNodeExecution . app_id == workflow_run . app_id ,
workflow_run_id = workflow_run . id
WorkflowNodeExecution . workflow_id == workflow_run . workflow_id ,
WorkflowNodeExecution . triggered_from == WorkflowNodeExecutionTriggeredFrom . WORKFLOW_RUN . value ,
WorkflowNodeExecution . workflow_run_id == workflow_run . id ,
WorkflowNodeExecution . status == WorkflowNodeExecutionStatus . RUNNING . value ,
)
)
ids = session . scalars ( stmt ) . all ( )
# U se self._get_workflow_node_execution here to make sure the cache is updated
# Update the cache with the retrieved executions
running_workflow_node_executions = [
for execution in running_workflow_node_executions :
self . _get_workflow_node_execution ( session = session , node_execution_id = id ) for id in ids if id
if execution . node_execution_id :
]
self . _workflow_node_executions [ execution . node_execution_id ] = execution
for workflow_node_execution in running_workflow_node_executions :
for workflow_node_execution in running_workflow_node_executions :
now = datetime . now ( UTC ) . replace ( tzinfo = None )
now = datetime . now ( UTC ) . replace ( tzinfo = None )
@ -288,7 +301,7 @@ class WorkflowCycleManage:
return workflow_run
return workflow_run
def _handle_node_execution_start (
def _handle_node_execution_start (
self , * , session: Session , workflow_run: WorkflowRun , event : QueueNodeStartedEvent
self , * , workflow_run: WorkflowRun , event : QueueNodeStartedEvent
) - > WorkflowNodeExecution :
) - > WorkflowNodeExecution :
workflow_node_execution = WorkflowNodeExecution ( )
workflow_node_execution = WorkflowNodeExecution ( )
workflow_node_execution . id = str ( uuid4 ( ) )
workflow_node_execution . id = str ( uuid4 ( ) )
@ -315,17 +328,14 @@ class WorkflowCycleManage:
)
)
workflow_node_execution . created_at = datetime . now ( UTC ) . replace ( tzinfo = None )
workflow_node_execution . created_at = datetime . now ( UTC ) . replace ( tzinfo = None )
session . add ( workflow_node_execution )
# Use the instance repository to save the workflow node execution
self . _workflow_node_execution_repository . save ( workflow_node_execution )
self . _workflow_node_executions [ event . node_execution_id ] = workflow_node_execution
self . _workflow_node_executions [ event . node_execution_id ] = workflow_node_execution
return workflow_node_execution
return workflow_node_execution
def _handle_workflow_node_execution_success (
def _handle_workflow_node_execution_success ( self , * , event : QueueNodeSucceededEvent ) - > WorkflowNodeExecution :
self , * , session : Session , event : QueueNodeSucceededEvent
workflow_node_execution = self . _get_workflow_node_execution ( node_execution_id = event . node_execution_id )
) - > WorkflowNodeExecution :
workflow_node_execution = self . _get_workflow_node_execution (
session = session , node_execution_id = event . node_execution_id
)
inputs = WorkflowEntry . handle_special_values ( event . inputs )
inputs = WorkflowEntry . handle_special_values ( event . inputs )
process_data = WorkflowEntry . handle_special_values ( event . process_data )
process_data = WorkflowEntry . handle_special_values ( event . process_data )
outputs = WorkflowEntry . handle_special_values ( event . outputs )
outputs = WorkflowEntry . handle_special_values ( event . outputs )
@ -344,13 +354,13 @@ class WorkflowCycleManage:
workflow_node_execution . finished_at = finished_at
workflow_node_execution . finished_at = finished_at
workflow_node_execution . elapsed_time = elapsed_time
workflow_node_execution . elapsed_time = elapsed_time
workflow_node_execution = session . merge ( workflow_node_execution )
# Use the instance repository to update the workflow node execution
self . _workflow_node_execution_repository . update ( workflow_node_execution )
return workflow_node_execution
return workflow_node_execution
def _handle_workflow_node_execution_failed (
def _handle_workflow_node_execution_failed (
self ,
self ,
* ,
* ,
session : Session ,
event : QueueNodeFailedEvent
event : QueueNodeFailedEvent
| QueueNodeInIterationFailedEvent
| QueueNodeInIterationFailedEvent
| QueueNodeInLoopFailedEvent
| QueueNodeInLoopFailedEvent
@ -361,9 +371,7 @@ class WorkflowCycleManage:
: param event : queue node failed event
: param event : queue node failed event
: return :
: return :
"""
"""
workflow_node_execution = self . _get_workflow_node_execution (
workflow_node_execution = self . _get_workflow_node_execution ( node_execution_id = event . node_execution_id )
session = session , node_execution_id = event . node_execution_id
)
inputs = WorkflowEntry . handle_special_values ( event . inputs )
inputs = WorkflowEntry . handle_special_values ( event . inputs )
process_data = WorkflowEntry . handle_special_values ( event . process_data )
process_data = WorkflowEntry . handle_special_values ( event . process_data )
@ -387,14 +395,14 @@ class WorkflowCycleManage:
workflow_node_execution . elapsed_time = elapsed_time
workflow_node_execution . elapsed_time = elapsed_time
workflow_node_execution . execution_metadata = execution_metadata
workflow_node_execution . execution_metadata = execution_metadata
workflow_node_execution = session . merge ( workflow_node_execution )
return workflow_node_execution
return workflow_node_execution
def _handle_workflow_node_execution_retried (
def _handle_workflow_node_execution_retried (
self , * , session: Session , workflow_run: WorkflowRun , event : QueueNodeRetryEvent
self , * , workflow_run: WorkflowRun , event : QueueNodeRetryEvent
) - > WorkflowNodeExecution :
) - > WorkflowNodeExecution :
"""
"""
Workflow node execution failed
Workflow node execution failed
: param workflow_run : workflow run
: param event : queue node failed event
: param event : queue node failed event
: return :
: return :
"""
"""
@ -439,15 +447,12 @@ class WorkflowCycleManage:
workflow_node_execution . execution_metadata = execution_metadata
workflow_node_execution . execution_metadata = execution_metadata
workflow_node_execution . index = event . node_run_index
workflow_node_execution . index = event . node_run_index
session . add ( workflow_node_execution )
# Use the instance repository to save the workflow node execution
self . _workflow_node_execution_repository . save ( workflow_node_execution )
self . _workflow_node_executions [ event . node_execution_id ] = workflow_node_execution
self . _workflow_node_executions [ event . node_execution_id ] = workflow_node_execution
return workflow_node_execution
return workflow_node_execution
#################################################
# to stream responses #
#################################################
def _workflow_start_to_stream_response (
def _workflow_start_to_stream_response (
self ,
self ,
* ,
* ,
@ -455,7 +460,6 @@ class WorkflowCycleManage:
task_id : str ,
task_id : str ,
workflow_run : WorkflowRun ,
workflow_run : WorkflowRun ,
) - > WorkflowStartStreamResponse :
) - > WorkflowStartStreamResponse :
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
_ = session
return WorkflowStartStreamResponse (
return WorkflowStartStreamResponse (
task_id = task_id ,
task_id = task_id ,
@ -521,14 +525,10 @@ class WorkflowCycleManage:
def _workflow_node_start_to_stream_response (
def _workflow_node_start_to_stream_response (
self ,
self ,
* ,
* ,
session : Session ,
event : QueueNodeStartedEvent ,
event : QueueNodeStartedEvent ,
task_id : str ,
task_id : str ,
workflow_node_execution : WorkflowNodeExecution ,
workflow_node_execution : WorkflowNodeExecution ,
) - > Optional [ NodeStartStreamResponse ] :
) - > Optional [ NodeStartStreamResponse ] :
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
if workflow_node_execution . node_type in { NodeType . ITERATION . value , NodeType . LOOP . value } :
if workflow_node_execution . node_type in { NodeType . ITERATION . value , NodeType . LOOP . value } :
return None
return None
if not workflow_node_execution . workflow_run_id :
if not workflow_node_execution . workflow_run_id :
@ -571,7 +571,6 @@ class WorkflowCycleManage:
def _workflow_node_finish_to_stream_response (
def _workflow_node_finish_to_stream_response (
self ,
self ,
* ,
* ,
session : Session ,
event : QueueNodeSucceededEvent
event : QueueNodeSucceededEvent
| QueueNodeFailedEvent
| QueueNodeFailedEvent
| QueueNodeInIterationFailedEvent
| QueueNodeInIterationFailedEvent
@ -580,8 +579,6 @@ class WorkflowCycleManage:
task_id : str ,
task_id : str ,
workflow_node_execution : WorkflowNodeExecution ,
workflow_node_execution : WorkflowNodeExecution ,
) - > Optional [ NodeFinishStreamResponse ] :
) - > Optional [ NodeFinishStreamResponse ] :
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
if workflow_node_execution . node_type in { NodeType . ITERATION . value , NodeType . LOOP . value } :
if workflow_node_execution . node_type in { NodeType . ITERATION . value , NodeType . LOOP . value } :
return None
return None
if not workflow_node_execution . workflow_run_id :
if not workflow_node_execution . workflow_run_id :
@ -621,13 +618,10 @@ class WorkflowCycleManage:
def _workflow_node_retry_to_stream_response (
def _workflow_node_retry_to_stream_response (
self ,
self ,
* ,
* ,
session : Session ,
event : QueueNodeRetryEvent ,
event : QueueNodeRetryEvent ,
task_id : str ,
task_id : str ,
workflow_node_execution : WorkflowNodeExecution ,
workflow_node_execution : WorkflowNodeExecution ,
) - > Optional [ Union [ NodeRetryStreamResponse , NodeFinishStreamResponse ] ] :
) - > Optional [ Union [ NodeRetryStreamResponse , NodeFinishStreamResponse ] ] :
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
if workflow_node_execution . node_type in { NodeType . ITERATION . value , NodeType . LOOP . value } :
if workflow_node_execution . node_type in { NodeType . ITERATION . value , NodeType . LOOP . value } :
return None
return None
if not workflow_node_execution . workflow_run_id :
if not workflow_node_execution . workflow_run_id :
@ -668,7 +662,6 @@ class WorkflowCycleManage:
def _workflow_parallel_branch_start_to_stream_response (
def _workflow_parallel_branch_start_to_stream_response (
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueParallelBranchRunStartedEvent
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueParallelBranchRunStartedEvent
) - > ParallelBranchStartStreamResponse :
) - > ParallelBranchStartStreamResponse :
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
_ = session
return ParallelBranchStartStreamResponse (
return ParallelBranchStartStreamResponse (
task_id = task_id ,
task_id = task_id ,
@ -692,7 +685,6 @@ class WorkflowCycleManage:
workflow_run : WorkflowRun ,
workflow_run : WorkflowRun ,
event : QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent ,
event : QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent ,
) - > ParallelBranchFinishedStreamResponse :
) - > ParallelBranchFinishedStreamResponse :
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
_ = session
return ParallelBranchFinishedStreamResponse (
return ParallelBranchFinishedStreamResponse (
task_id = task_id ,
task_id = task_id ,
@ -713,7 +705,6 @@ class WorkflowCycleManage:
def _workflow_iteration_start_to_stream_response (
def _workflow_iteration_start_to_stream_response (
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueIterationStartEvent
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueIterationStartEvent
) - > IterationNodeStartStreamResponse :
) - > IterationNodeStartStreamResponse :
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
_ = session
return IterationNodeStartStreamResponse (
return IterationNodeStartStreamResponse (
task_id = task_id ,
task_id = task_id ,
@ -735,7 +726,6 @@ class WorkflowCycleManage:
def _workflow_iteration_next_to_stream_response (
def _workflow_iteration_next_to_stream_response (
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueIterationNextEvent
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueIterationNextEvent
) - > IterationNodeNextStreamResponse :
) - > IterationNodeNextStreamResponse :
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
_ = session
return IterationNodeNextStreamResponse (
return IterationNodeNextStreamResponse (
task_id = task_id ,
task_id = task_id ,
@ -759,7 +749,6 @@ class WorkflowCycleManage:
def _workflow_iteration_completed_to_stream_response (
def _workflow_iteration_completed_to_stream_response (
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueIterationCompletedEvent
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueIterationCompletedEvent
) - > IterationNodeCompletedStreamResponse :
) - > IterationNodeCompletedStreamResponse :
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
_ = session
return IterationNodeCompletedStreamResponse (
return IterationNodeCompletedStreamResponse (
task_id = task_id ,
task_id = task_id ,
@ -790,7 +779,6 @@ class WorkflowCycleManage:
def _workflow_loop_start_to_stream_response (
def _workflow_loop_start_to_stream_response (
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueLoopStartEvent
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueLoopStartEvent
) - > LoopNodeStartStreamResponse :
) - > LoopNodeStartStreamResponse :
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
_ = session
return LoopNodeStartStreamResponse (
return LoopNodeStartStreamResponse (
task_id = task_id ,
task_id = task_id ,
@ -812,7 +800,6 @@ class WorkflowCycleManage:
def _workflow_loop_next_to_stream_response (
def _workflow_loop_next_to_stream_response (
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueLoopNextEvent
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueLoopNextEvent
) - > LoopNodeNextStreamResponse :
) - > LoopNodeNextStreamResponse :
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
_ = session
return LoopNodeNextStreamResponse (
return LoopNodeNextStreamResponse (
task_id = task_id ,
task_id = task_id ,
@ -836,7 +823,6 @@ class WorkflowCycleManage:
def _workflow_loop_completed_to_stream_response (
def _workflow_loop_completed_to_stream_response (
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueLoopCompletedEvent
self , * , session : Session , task_id : str , workflow_run : WorkflowRun , event : QueueLoopCompletedEvent
) - > LoopNodeCompletedStreamResponse :
) - > LoopNodeCompletedStreamResponse :
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
_ = session
return LoopNodeCompletedStreamResponse (
return LoopNodeCompletedStreamResponse (
task_id = task_id ,
task_id = task_id ,
@ -934,11 +920,22 @@ class WorkflowCycleManage:
return workflow_run
return workflow_run
def _get_workflow_node_execution ( self , session : Session , node_execution_id : str ) - > WorkflowNodeExecution :
def _get_workflow_node_execution ( self , node_execution_id : str ) - > WorkflowNodeExecution :
if node_execution_id not in self . _workflow_node_executions :
# First check the cache for performance
if node_execution_id in self . _workflow_node_executions :
cached_execution = self . _workflow_node_executions [ node_execution_id ]
# No need to merge with session since expire_on_commit=False
return cached_execution
# If not in cache, use the instance repository to get by node_execution_id
execution = self . _workflow_node_execution_repository . get_by_node_execution_id ( node_execution_id )
if not execution :
raise ValueError ( f " Workflow node execution not found: { node_execution_id } " )
raise ValueError ( f " Workflow node execution not found: { node_execution_id } " )
cached_workflow_node_execution = self . _workflow_node_executions [ node_execution_id ]
return session . merge ( cached_workflow_node_execution )
# Update cache
self . _workflow_node_executions [ node_execution_id ] = execution
return execution
def _handle_agent_log ( self , task_id : str , event : QueueAgentLogEvent ) - > AgentLogStreamResponse :
def _handle_agent_log ( self , task_id : str , event : QueueAgentLogEvent ) - > AgentLogStreamResponse :
"""
"""