|
|
|
@ -5,6 +5,7 @@ import uuid
|
|
|
|
from collections.abc import Generator, Mapping
|
|
|
|
from collections.abc import Generator, Mapping
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, wait
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, wait
|
|
|
|
from copy import copy, deepcopy
|
|
|
|
from copy import copy, deepcopy
|
|
|
|
|
|
|
|
from datetime import UTC, datetime
|
|
|
|
from typing import Any, Optional, cast
|
|
|
|
from typing import Any, Optional, cast
|
|
|
|
|
|
|
|
|
|
|
|
from flask import Flask, current_app
|
|
|
|
from flask import Flask, current_app
|
|
|
|
@ -25,6 +26,7 @@ from core.workflow.graph_engine.entities.event import (
|
|
|
|
NodeRunExceptionEvent,
|
|
|
|
NodeRunExceptionEvent,
|
|
|
|
NodeRunFailedEvent,
|
|
|
|
NodeRunFailedEvent,
|
|
|
|
NodeRunRetrieverResourceEvent,
|
|
|
|
NodeRunRetrieverResourceEvent,
|
|
|
|
|
|
|
|
NodeRunRetryEvent,
|
|
|
|
NodeRunStartedEvent,
|
|
|
|
NodeRunStartedEvent,
|
|
|
|
NodeRunStreamChunkEvent,
|
|
|
|
NodeRunStreamChunkEvent,
|
|
|
|
NodeRunSucceededEvent,
|
|
|
|
NodeRunSucceededEvent,
|
|
|
|
@ -581,7 +583,7 @@ class GraphEngine:
|
|
|
|
|
|
|
|
|
|
|
|
def _run_node(
|
|
|
|
def _run_node(
|
|
|
|
self,
|
|
|
|
self,
|
|
|
|
node_instance: BaseNode,
|
|
|
|
node_instance: BaseNode[BaseNodeData],
|
|
|
|
route_node_state: RouteNodeState,
|
|
|
|
route_node_state: RouteNodeState,
|
|
|
|
parallel_id: Optional[str] = None,
|
|
|
|
parallel_id: Optional[str] = None,
|
|
|
|
parallel_start_node_id: Optional[str] = None,
|
|
|
|
parallel_start_node_id: Optional[str] = None,
|
|
|
|
@ -607,9 +609,14 @@ class GraphEngine:
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
db.session.close()
|
|
|
|
db.session.close()
|
|
|
|
|
|
|
|
max_retries = node_instance.node_data.retry_config.max_retries
|
|
|
|
|
|
|
|
retry_interval = node_instance.node_data.retry_config.retry_interval_seconds
|
|
|
|
|
|
|
|
retries = 0
|
|
|
|
|
|
|
|
shoudl_continue_retry = True
|
|
|
|
|
|
|
|
while shoudl_continue_retry and retries <= max_retries:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
# run node
|
|
|
|
# run node
|
|
|
|
|
|
|
|
retry_start_at = datetime.now(UTC).replace(tzinfo=None)
|
|
|
|
generator = node_instance.run()
|
|
|
|
generator = node_instance.run()
|
|
|
|
for item in generator:
|
|
|
|
for item in generator:
|
|
|
|
if isinstance(item, GraphEngineEvent):
|
|
|
|
if isinstance(item, GraphEngineEvent):
|
|
|
|
@ -624,6 +631,35 @@ class GraphEngine:
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
if isinstance(item, RunCompletedEvent):
|
|
|
|
if isinstance(item, RunCompletedEvent):
|
|
|
|
run_result = item.run_result
|
|
|
|
run_result = item.run_result
|
|
|
|
|
|
|
|
if run_result.status == WorkflowNodeExecutionStatus.FAILED:
|
|
|
|
|
|
|
|
if (
|
|
|
|
|
|
|
|
retries == max_retries
|
|
|
|
|
|
|
|
and node_instance.node_type == NodeType.HTTP_REQUEST
|
|
|
|
|
|
|
|
and run_result.outputs
|
|
|
|
|
|
|
|
and not node_instance.should_continue_on_error
|
|
|
|
|
|
|
|
):
|
|
|
|
|
|
|
|
run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
|
|
|
|
|
|
|
|
if node_instance.should_retry and retries < max_retries:
|
|
|
|
|
|
|
|
retries += 1
|
|
|
|
|
|
|
|
self.graph_runtime_state.node_run_steps += 1
|
|
|
|
|
|
|
|
route_node_state.node_run_result = run_result
|
|
|
|
|
|
|
|
yield NodeRunRetryEvent(
|
|
|
|
|
|
|
|
id=node_instance.id,
|
|
|
|
|
|
|
|
node_id=node_instance.node_id,
|
|
|
|
|
|
|
|
node_type=node_instance.node_type,
|
|
|
|
|
|
|
|
node_data=node_instance.node_data,
|
|
|
|
|
|
|
|
route_node_state=route_node_state,
|
|
|
|
|
|
|
|
error=run_result.error,
|
|
|
|
|
|
|
|
retry_index=retries,
|
|
|
|
|
|
|
|
parallel_id=parallel_id,
|
|
|
|
|
|
|
|
parallel_start_node_id=parallel_start_node_id,
|
|
|
|
|
|
|
|
parent_parallel_id=parent_parallel_id,
|
|
|
|
|
|
|
|
parent_parallel_start_node_id=parent_parallel_start_node_id,
|
|
|
|
|
|
|
|
start_at=retry_start_at,
|
|
|
|
|
|
|
|
start_index=self.graph_runtime_state.node_run_steps,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
time.sleep(retry_interval)
|
|
|
|
|
|
|
|
continue
|
|
|
|
route_node_state.set_finished(run_result=run_result)
|
|
|
|
route_node_state.set_finished(run_result=run_result)
|
|
|
|
|
|
|
|
|
|
|
|
if run_result.status == WorkflowNodeExecutionStatus.FAILED:
|
|
|
|
if run_result.status == WorkflowNodeExecutionStatus.FAILED:
|
|
|
|
@ -657,6 +693,7 @@ class GraphEngine:
|
|
|
|
parent_parallel_id=parent_parallel_id,
|
|
|
|
parent_parallel_id=parent_parallel_id,
|
|
|
|
parent_parallel_start_node_id=parent_parallel_start_node_id,
|
|
|
|
parent_parallel_start_node_id=parent_parallel_start_node_id,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
shoudl_continue_retry = False
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
yield NodeRunFailedEvent(
|
|
|
|
yield NodeRunFailedEvent(
|
|
|
|
error=route_node_state.failed_reason or "Unknown error.",
|
|
|
|
error=route_node_state.failed_reason or "Unknown error.",
|
|
|
|
@ -670,7 +707,7 @@ class GraphEngine:
|
|
|
|
parent_parallel_id=parent_parallel_id,
|
|
|
|
parent_parallel_id=parent_parallel_id,
|
|
|
|
parent_parallel_start_node_id=parent_parallel_start_node_id,
|
|
|
|
parent_parallel_start_node_id=parent_parallel_start_node_id,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
shoudl_continue_retry = False
|
|
|
|
elif run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
|
|
|
|
elif run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
|
|
|
|
if node_instance.should_continue_on_error and self.graph.edge_mapping.get(
|
|
|
|
if node_instance.should_continue_on_error and self.graph.edge_mapping.get(
|
|
|
|
node_instance.node_id
|
|
|
|
node_instance.node_id
|
|
|
|
@ -702,7 +739,9 @@ class GraphEngine:
|
|
|
|
run_result.metadata = {}
|
|
|
|
run_result.metadata = {}
|
|
|
|
|
|
|
|
|
|
|
|
run_result.metadata[NodeRunMetadataKey.PARALLEL_ID] = parallel_id
|
|
|
|
run_result.metadata[NodeRunMetadataKey.PARALLEL_ID] = parallel_id
|
|
|
|
run_result.metadata[NodeRunMetadataKey.PARALLEL_START_NODE_ID] = parallel_start_node_id
|
|
|
|
run_result.metadata[NodeRunMetadataKey.PARALLEL_START_NODE_ID] = (
|
|
|
|
|
|
|
|
parallel_start_node_id
|
|
|
|
|
|
|
|
)
|
|
|
|
if parent_parallel_id and parent_parallel_start_node_id:
|
|
|
|
if parent_parallel_id and parent_parallel_start_node_id:
|
|
|
|
run_result.metadata[NodeRunMetadataKey.PARENT_PARALLEL_ID] = parent_parallel_id
|
|
|
|
run_result.metadata[NodeRunMetadataKey.PARENT_PARALLEL_ID] = parent_parallel_id
|
|
|
|
run_result.metadata[NodeRunMetadataKey.PARENT_PARALLEL_START_NODE_ID] = (
|
|
|
|
run_result.metadata[NodeRunMetadataKey.PARENT_PARALLEL_START_NODE_ID] = (
|
|
|
|
@ -720,6 +759,7 @@ class GraphEngine:
|
|
|
|
parent_parallel_id=parent_parallel_id,
|
|
|
|
parent_parallel_id=parent_parallel_id,
|
|
|
|
parent_parallel_start_node_id=parent_parallel_start_node_id,
|
|
|
|
parent_parallel_start_node_id=parent_parallel_start_node_id,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
shoudl_continue_retry = False
|
|
|
|
|
|
|
|
|
|
|
|
break
|
|
|
|
break
|
|
|
|
elif isinstance(item, RunStreamChunkEvent):
|
|
|
|
elif isinstance(item, RunStreamChunkEvent):
|
|
|
|
|