refactor(workflow_cycle_manager): Refactors `_handle_node_execution_start`

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/20067/head
-LAN- 1 year ago
parent 5510c9cfa8
commit cfeb79b399
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -336,20 +336,15 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session: workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start(
workflow_run = self._workflow_cycle_manager._get_workflow_run( workflow_execution_id=self._workflow_run_id, event=event
session=session, workflow_run_id=self._workflow_run_id )
)
workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start(
workflow_run=workflow_run, event=event
)
node_start_resp = self._workflow_cycle_manager._workflow_node_start_to_stream_response( node_start_resp = self._workflow_cycle_manager._workflow_node_start_to_stream_response(
event=event, event=event,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
) )
session.commit()
if node_start_resp: if node_start_resp:
yield node_start_resp yield node_start_resp

@ -298,19 +298,14 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session: workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start(
workflow_run = self._workflow_cycle_manager._get_workflow_run( workflow_execution_id=self._workflow_run_id, event=event
session=session, workflow_run_id=self._workflow_run_id )
) node_start_response = self._workflow_cycle_manager._workflow_node_start_to_stream_response(
workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start( event=event,
workflow_run=workflow_run, event=event task_id=self._application_generate_entity.task_id,
) workflow_node_execution=workflow_node_execution,
node_start_response = self._workflow_cycle_manager._workflow_node_start_to_stream_response( )
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
session.commit()
if node_start_response: if node_start_response:
yield node_start_response yield node_start_response

@ -247,7 +247,14 @@ class WorkflowCycleManager:
return execution return execution
def _handle_node_execution_start(self, *, workflow_run: WorkflowRun, event: QueueNodeStartedEvent) -> NodeExecution: def _handle_node_execution_start(
self,
*,
workflow_execution_id: str,
event: QueueNodeStartedEvent,
) -> NodeExecution:
workflow_execution = self._get_workflow_execution(workflow_execution_id)
# Create a domain model # Create a domain model
created_at = datetime.now(UTC).replace(tzinfo=None) created_at = datetime.now(UTC).replace(tzinfo=None)
metadata = { metadata = {
@ -258,8 +265,8 @@ class WorkflowCycleManager:
domain_execution = NodeExecution( domain_execution = NodeExecution(
id=str(uuid4()), id=str(uuid4()),
workflow_id=workflow_run.workflow_id, workflow_id=workflow_execution.workflow_id,
workflow_run_id=workflow_run.id, workflow_run_id=workflow_execution.id,
predecessor_node_id=event.predecessor_node_id, predecessor_node_id=event.predecessor_node_id,
index=event.node_run_index, index=event.node_run_index,
node_execution_id=event.node_execution_id, node_execution_id=event.node_execution_id,

Loading…
Cancel
Save