|
|
|
|
@ -15,6 +15,7 @@ from core.app.entities.queue_entities import (
|
|
|
|
|
QueueNodeExceptionEvent,
|
|
|
|
|
QueueNodeFailedEvent,
|
|
|
|
|
QueueNodeInIterationFailedEvent,
|
|
|
|
|
QueueNodeRetryEvent,
|
|
|
|
|
QueueNodeStartedEvent,
|
|
|
|
|
QueueNodeSucceededEvent,
|
|
|
|
|
QueueParallelBranchRunFailedEvent,
|
|
|
|
|
@ -26,6 +27,7 @@ from core.app.entities.task_entities import (
|
|
|
|
|
IterationNodeNextStreamResponse,
|
|
|
|
|
IterationNodeStartStreamResponse,
|
|
|
|
|
NodeFinishStreamResponse,
|
|
|
|
|
NodeRetryStreamResponse,
|
|
|
|
|
NodeStartStreamResponse,
|
|
|
|
|
ParallelBranchFinishedStreamResponse,
|
|
|
|
|
ParallelBranchStartStreamResponse,
|
|
|
|
|
@ -423,6 +425,52 @@ class WorkflowCycleManage:
|
|
|
|
|
|
|
|
|
|
return workflow_node_execution
|
|
|
|
|
|
|
|
|
|
def _handle_workflow_node_execution_retried(
|
|
|
|
|
self, workflow_run: WorkflowRun, event: QueueNodeRetryEvent
|
|
|
|
|
) -> WorkflowNodeExecution:
|
|
|
|
|
"""
|
|
|
|
|
Workflow node execution failed
|
|
|
|
|
:param event: queue node failed event
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
created_at = event.start_at
|
|
|
|
|
finished_at = datetime.now(UTC).replace(tzinfo=None)
|
|
|
|
|
elapsed_time = (finished_at - created_at).total_seconds()
|
|
|
|
|
inputs = WorkflowEntry.handle_special_values(event.inputs)
|
|
|
|
|
outputs = WorkflowEntry.handle_special_values(event.outputs)
|
|
|
|
|
|
|
|
|
|
workflow_node_execution = WorkflowNodeExecution()
|
|
|
|
|
workflow_node_execution.tenant_id = workflow_run.tenant_id
|
|
|
|
|
workflow_node_execution.app_id = workflow_run.app_id
|
|
|
|
|
workflow_node_execution.workflow_id = workflow_run.workflow_id
|
|
|
|
|
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value
|
|
|
|
|
workflow_node_execution.workflow_run_id = workflow_run.id
|
|
|
|
|
workflow_node_execution.node_execution_id = event.node_execution_id
|
|
|
|
|
workflow_node_execution.node_id = event.node_id
|
|
|
|
|
workflow_node_execution.node_type = event.node_type.value
|
|
|
|
|
workflow_node_execution.title = event.node_data.title
|
|
|
|
|
workflow_node_execution.status = WorkflowNodeExecutionStatus.RETRY.value
|
|
|
|
|
workflow_node_execution.created_by_role = workflow_run.created_by_role
|
|
|
|
|
workflow_node_execution.created_by = workflow_run.created_by
|
|
|
|
|
workflow_node_execution.created_at = created_at
|
|
|
|
|
workflow_node_execution.finished_at = finished_at
|
|
|
|
|
workflow_node_execution.elapsed_time = elapsed_time
|
|
|
|
|
workflow_node_execution.error = event.error
|
|
|
|
|
workflow_node_execution.inputs = json.dumps(inputs) if inputs else None
|
|
|
|
|
workflow_node_execution.outputs = json.dumps(outputs) if outputs else None
|
|
|
|
|
workflow_node_execution.execution_metadata = json.dumps(
|
|
|
|
|
{
|
|
|
|
|
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
workflow_node_execution.index = event.start_index
|
|
|
|
|
|
|
|
|
|
db.session.add(workflow_node_execution)
|
|
|
|
|
db.session.commit()
|
|
|
|
|
db.session.refresh(workflow_node_execution)
|
|
|
|
|
|
|
|
|
|
return workflow_node_execution
|
|
|
|
|
|
|
|
|
|
#################################################
|
|
|
|
|
# to stream responses #
|
|
|
|
|
#################################################
|
|
|
|
|
@ -587,6 +635,51 @@ class WorkflowCycleManage:
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def _workflow_node_retry_to_stream_response(
|
|
|
|
|
self,
|
|
|
|
|
event: QueueNodeRetryEvent,
|
|
|
|
|
task_id: str,
|
|
|
|
|
workflow_node_execution: WorkflowNodeExecution,
|
|
|
|
|
) -> Optional[NodeFinishStreamResponse]:
|
|
|
|
|
"""
|
|
|
|
|
Workflow node finish to stream response.
|
|
|
|
|
:param event: queue node succeeded or failed event
|
|
|
|
|
:param task_id: task id
|
|
|
|
|
:param workflow_node_execution: workflow node execution
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
if workflow_node_execution.node_type in {NodeType.ITERATION.value, NodeType.LOOP.value}:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
return NodeRetryStreamResponse(
|
|
|
|
|
task_id=task_id,
|
|
|
|
|
workflow_run_id=workflow_node_execution.workflow_run_id,
|
|
|
|
|
data=NodeRetryStreamResponse.Data(
|
|
|
|
|
id=workflow_node_execution.id,
|
|
|
|
|
node_id=workflow_node_execution.node_id,
|
|
|
|
|
node_type=workflow_node_execution.node_type,
|
|
|
|
|
index=workflow_node_execution.index,
|
|
|
|
|
title=workflow_node_execution.title,
|
|
|
|
|
predecessor_node_id=workflow_node_execution.predecessor_node_id,
|
|
|
|
|
inputs=workflow_node_execution.inputs_dict,
|
|
|
|
|
process_data=workflow_node_execution.process_data_dict,
|
|
|
|
|
outputs=workflow_node_execution.outputs_dict,
|
|
|
|
|
status=workflow_node_execution.status,
|
|
|
|
|
error=workflow_node_execution.error,
|
|
|
|
|
elapsed_time=workflow_node_execution.elapsed_time,
|
|
|
|
|
execution_metadata=workflow_node_execution.execution_metadata_dict,
|
|
|
|
|
created_at=int(workflow_node_execution.created_at.timestamp()),
|
|
|
|
|
finished_at=int(workflow_node_execution.finished_at.timestamp()),
|
|
|
|
|
files=self._fetch_files_from_node_outputs(workflow_node_execution.outputs_dict or {}),
|
|
|
|
|
parallel_id=event.parallel_id,
|
|
|
|
|
parallel_start_node_id=event.parallel_start_node_id,
|
|
|
|
|
parent_parallel_id=event.parent_parallel_id,
|
|
|
|
|
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
|
|
|
|
|
iteration_id=event.in_iteration_id,
|
|
|
|
|
retry_index=event.retry_index,
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def _workflow_parallel_branch_start_to_stream_response(
|
|
|
|
|
self, task_id: str, workflow_run: WorkflowRun, event: QueueParallelBranchRunStartedEvent
|
|
|
|
|
) -> ParallelBranchStartStreamResponse:
|
|
|
|
|
|