@ -10,10 +10,10 @@ from sqlalchemy.orm import Session
from core . app . apps . advanced_chat . app_config_manager import AdvancedChatAppConfigManager
from core . app . apps . workflow . app_config_manager import WorkflowAppConfigManager
from core . model_runtime . utils . encoders import jsonable_encoder
from core . repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core . variables import Variable
from core . workflow . entities . node_entities import NodeRunResult
from core . workflow . entities . node_execution_entities import NodeExecution , NodeExecutionStatus
from core . workflow . errors import WorkflowNodeRunFailedError
from core . workflow . graph_engine . entities . event import InNodeEvent
from core . workflow . nodes import NodeType
@ -26,7 +26,6 @@ from core.workflow.workflow_entry import WorkflowEntry
from events . app_event import app_draft_workflow_was_synced , app_published_workflow_was_updated
from extensions . ext_database import db
from models . account import Account
from models . enums import CreatorUserRole
from models . model import App , AppMode
from models . tools import WorkflowToolProvider
from models . workflow import (
@ -268,35 +267,37 @@ class WorkflowService:
# run draft workflow node
start_at = time . perf_counter ( )
workflow_ node_execution = self . _handle_node_run_result (
getter = lambda : WorkflowEntry . single_step_run (
node_execution = self . _handle_node_run_result (
invoke_node_fn = lambda : WorkflowEntry . single_step_run (
workflow = draft_workflow ,
node_id = node_id ,
user_inputs = user_inputs ,
user_id = account . id ,
) ,
start_at = start_at ,
tenant_id = app_model . tenant_id ,
node_id = node_id ,
)
workflow_node_execution . app_id = app_model . id
workflow_node_execution . created_by = account . id
workflow_node_execution . workflow_id = draft_workflow . id
# Set workflow_id on the NodeExecution
node_execution . workflow_id = draft_workflow . id
# Create repository and save the node execution
repository = SQLAlchemyWorkflowNodeExecutionRepository (
session_factory = db . engine ,
user = account ,
app_id = app_model . id ,
triggered_from = WorkflowNodeExecutionTriggeredFrom . SINGLE_STEP ,
)
repository . save ( workflow_node_execution )
repository . save ( node_execution )
# Convert node_execution to WorkflowNodeExecution after save
workflow_node_execution = repository . to_db_model ( node_execution )
return workflow_node_execution
def run_free_workflow_node (
self , node_data : dict , tenant_id : str , user_id : str , node_id : str , user_inputs : dict [ str , Any ]
) - > Workflow NodeExecution:
) - > NodeExecution:
"""
Run draft workflow node
"""
@ -304,7 +305,7 @@ class WorkflowService:
start_at = time . perf_counter ( )
workflow_node_execution = self . _handle_node_run_result (
getter = lambda : WorkflowEntry . run_free_node (
invoke_node_fn = lambda : WorkflowEntry . run_free_node (
node_id = node_id ,
node_data = node_data ,
tenant_id = tenant_id ,
@ -312,7 +313,6 @@ class WorkflowService:
user_inputs = user_inputs ,
) ,
start_at = start_at ,
tenant_id = tenant_id ,
node_id = node_id ,
)
@ -320,21 +320,12 @@ class WorkflowService:
def _handle_node_run_result (
self ,
getter : Callable [ [ ] , tuple [ BaseNode , Generator [ NodeEvent | InNodeEvent , None , None ] ] ] ,
invoke_node_fn : Callable [ [ ] , tuple [ BaseNode , Generator [ NodeEvent | InNodeEvent , None , None ] ] ] ,
start_at : float ,
tenant_id : str ,
node_id : str ,
) - > WorkflowNodeExecution :
"""
Handle node run result
: param getter : Callable [ [ ] , tuple [ BaseNode , Generator [ RunEvent | InNodeEvent , None , None ] ] ]
: param start_at : float
: param tenant_id : str
: param node_id : str
"""
) - > NodeExecution :
try :
node_instance , generator = getter ( )
node_instance , generator = invoke_node_fn ( )
node_run_result : NodeRunResult | None = None
for event in generator :
@ -383,20 +374,21 @@ class WorkflowService:
node_run_result = None
error = e . error
workflow_node_execution = WorkflowNodeExecution ( )
workflow_node_execution . id = str ( uuid4 ( ) )
workflow_node_execution . tenant_id = tenant_id
workflow_node_execution . triggered_from = WorkflowNodeExecutionTriggeredFrom . SINGLE_STEP . value
workflow_node_execution . index = 1
workflow_node_execution . node_id = node_id
workflow_node_execution . node_type = node_instance . node_type
workflow_node_execution . title = node_instance . node_data . title
workflow_node_execution . elapsed_time = time . perf_counter ( ) - start_at
workflow_node_execution . created_by_role = CreatorUserRole . ACCOUNT . value
workflow_node_execution . created_at = datetime . now ( UTC ) . replace ( tzinfo = None )
workflow_node_execution . finished_at = datetime . now ( UTC ) . replace ( tzinfo = None )
# Create a NodeExecution domain model
node_execution = NodeExecution (
id = str ( uuid4 ( ) ) ,
workflow_id = " " , # This is a single-step execution, so no workflow ID
index = 1 ,
node_id = node_id ,
node_type = node_instance . node_type ,
title = node_instance . node_data . title ,
elapsed_time = time . perf_counter ( ) - start_at ,
created_at = datetime . now ( UTC ) . replace ( tzinfo = None ) ,
finished_at = datetime . now ( UTC ) . replace ( tzinfo = None ) ,
)
if run_succeeded and node_run_result :
# create workflow node execution
# Set inputs, process_data, and outputs as dictionaries (not JSON strings)
inputs = WorkflowEntry . handle_special_values ( node_run_result . inputs ) if node_run_result . inputs else None
process_data = (
WorkflowEntry . handle_special_values ( node_run_result . process_data )
@ -405,23 +397,23 @@ class WorkflowService:
)
outputs = WorkflowEntry . handle_special_values ( node_run_result . outputs ) if node_run_result . outputs else None
workflow_ node_execution. inputs = json. dumps ( inputs)
workflow_ node_execution. process_data = json. dumps ( process_data)
workflow_ node_execution. outputs = json. dumps ( outputs)
workflow_node_execution. execution_metadata = (
json . dumps ( jsonable_encoder ( node_run_result . metadata ) ) if node_run_result . metadata else None
)
node_execution. inputs = inputs
node_execution. process_data = process_data
node_execution. outputs = outputs
node_execution. metadata = node_run_result . metadata
# Map status from WorkflowNodeExecutionStatus to NodeExecutionStatus
if node_run_result . status == WorkflowNodeExecutionStatus . SUCCEEDED :
workflow_ node_execution. status = Workflow NodeExecutionStatus. SUCCEEDED . value
node_execution. status = NodeExecutionStatus. SUCCEEDED
elif node_run_result . status == WorkflowNodeExecutionStatus . EXCEPTION :
workflow_ node_execution. status = Workflow NodeExecutionStatus. EXCEPTION . value
workflow_ node_execution. error = node_run_result . error
node_execution. status = NodeExecutionStatus. EXCEPTION
node_execution. error = node_run_result . error
else :
# create workflow node execution
workflow_ node_execution. status = Workflow NodeExecutionStatus. FAILED . value
workflow_ node_execution. error = error
# Set failed status and error
node_execution. status = NodeExecutionStatus. FAILED
node_execution. error = error
return workflow_ node_execution
return node_execution
def convert_to_workflow ( self , app_model : App , account : Account , args : dict ) - > App :
"""