refactor(workflow_cycle_manager): Refactor `_workflow_parallel_branch_finished_to_stream_response`

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/20067/head
-LAN- 1 year ago
parent 96c612c92d
commit 5da9bc636e
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -403,18 +403,13 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session: parallel_finish_resp = (
workflow_run = self._workflow_cycle_manager._get_workflow_run( self._workflow_cycle_manager._workflow_parallel_branch_finished_to_stream_response(
session=session, workflow_run_id=self._workflow_run_id task_id=self._application_generate_entity.task_id,
) workflow_execution_id=self._workflow_run_id,
parallel_finish_resp = ( event=event,
self._workflow_cycle_manager._workflow_parallel_branch_finished_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)
) )
)
yield parallel_finish_resp yield parallel_finish_resp
elif isinstance(event, QueueIterationStartEvent): elif isinstance(event, QueueIterationStartEvent):

@ -356,18 +356,13 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session: parallel_finish_resp = (
workflow_run = self._workflow_cycle_manager._get_workflow_run( self._workflow_cycle_manager._workflow_parallel_branch_finished_to_stream_response(
session=session, workflow_run_id=self._workflow_run_id task_id=self._application_generate_entity.task_id,
) workflow_execution_id=self._workflow_run_id,
parallel_finish_resp = ( event=event,
self._workflow_cycle_manager._workflow_parallel_branch_finished_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)
) )
)
yield parallel_finish_resp yield parallel_finish_resp

@ -654,15 +654,13 @@ class WorkflowCycleManager:
def _workflow_parallel_branch_finished_to_stream_response( def _workflow_parallel_branch_finished_to_stream_response(
self, self,
*, *,
session: Session,
task_id: str, task_id: str,
workflow_run: WorkflowRun, workflow_execution_id: str,
event: QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent, event: QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent,
) -> ParallelBranchFinishedStreamResponse: ) -> ParallelBranchFinishedStreamResponse:
_ = session
return ParallelBranchFinishedStreamResponse( return ParallelBranchFinishedStreamResponse(
task_id=task_id, task_id=task_id,
workflow_run_id=workflow_run.id, workflow_run_id=workflow_execution_id,
data=ParallelBranchFinishedStreamResponse.Data( data=ParallelBranchFinishedStreamResponse.Data(
parallel_id=event.parallel_id, parallel_id=event.parallel_id,
parallel_branch_id=event.parallel_start_node_id, parallel_branch_id=event.parallel_start_node_id,

Loading…
Cancel
Save