feat: add single step retry

pull/11871/head
Novice Lee 1 year ago
parent 9e0c28791d
commit 127291a90f

@ -449,6 +449,7 @@ class WorkflowBasedAppRunner(AppRunner):
else {}, else {},
in_iteration_id=event.in_iteration_id, in_iteration_id=event.in_iteration_id,
retry_index=event.retry_index, retry_index=event.retry_index,
start_index=event.start_index,
) )
) )

@ -342,6 +342,7 @@ class QueueNodeRetryEvent(AppQueueEvent):
error: str error: str
retry_index: int # retry index retry_index: int # retry index
start_index: int # start index
class QueueNodeInIterationFailedEvent(AppQueueEvent): class QueueNodeInIterationFailedEvent(AppQueueEvent):

@ -436,8 +436,10 @@ class WorkflowCycleManage:
created_at = event.start_at created_at = event.start_at
finished_at = datetime.now(UTC).replace(tzinfo=None) finished_at = datetime.now(UTC).replace(tzinfo=None)
elapsed_time = (finished_at - created_at).total_seconds() elapsed_time = (finished_at - created_at).total_seconds()
workflow_node_execution = WorkflowNodeExecution() inputs = WorkflowEntry.handle_special_values(event.inputs)
outputs = WorkflowEntry.handle_special_values(event.outputs)
workflow_node_execution = WorkflowNodeExecution()
workflow_node_execution.tenant_id = workflow_run.tenant_id workflow_node_execution.tenant_id = workflow_run.tenant_id
workflow_node_execution.app_id = workflow_run.app_id workflow_node_execution.app_id = workflow_run.app_id
workflow_node_execution.workflow_id = workflow_run.workflow_id workflow_node_execution.workflow_id = workflow_run.workflow_id
@ -454,31 +456,18 @@ class WorkflowCycleManage:
workflow_node_execution.finished_at = finished_at workflow_node_execution.finished_at = finished_at
workflow_node_execution.elapsed_time = elapsed_time workflow_node_execution.elapsed_time = elapsed_time
workflow_node_execution.error = event.error workflow_node_execution.error = event.error
workflow_node_execution.inputs = json.dumps(inputs) if inputs else None
workflow_node_execution.outputs = json.dumps(outputs) if outputs else None
workflow_node_execution.execution_metadata = json.dumps( workflow_node_execution.execution_metadata = json.dumps(
{ {
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id, NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
} }
) )
workflow_node_execution.index = event.start_index
with Session(db.engine, expire_on_commit=False) as session: db.session.add(workflow_node_execution)
failed_execution = ( db.session.commit()
session.query(WorkflowNodeExecution) db.session.refresh(workflow_node_execution)
.filter(
WorkflowNodeExecution.tenant_id == workflow_run.tenant_id,
WorkflowNodeExecution.app_id == workflow_run.app_id,
WorkflowNodeExecution.workflow_id == workflow_run.workflow_id,
WorkflowNodeExecution.workflow_run_id == workflow_run.id,
WorkflowNodeExecution.status == WorkflowNodeExecutionStatus.RUNNING.value,
)
.first()
)
node_run_index = failed_execution.index
workflow_node_execution.index = node_run_index
session.add(workflow_node_execution)
session.commit()
session.refresh(workflow_node_execution)
return workflow_node_execution return workflow_node_execution

@ -45,3 +45,6 @@ class NodeRunResult(BaseModel):
error: Optional[str] = None # error message if status is failed error: Optional[str] = None # error message if status is failed
error_type: Optional[str] = None # error type if status is failed error_type: Optional[str] = None # error type if status is failed
# single step node run retry
retry_index: int = 0

@ -101,6 +101,7 @@ class NodeRunRetryEvent(BaseNodeEvent):
error: str = Field(..., description="error") error: str = Field(..., description="error")
retry_index: int = Field(..., description="which retry attempt is about to be performed") retry_index: int = Field(..., description="which retry attempt is about to be performed")
start_at: datetime = Field(..., description="retry start time") start_at: datetime = Field(..., description="retry start time")
start_index: int = Field(..., description="retry start index")
########################################### ###########################################

@ -610,7 +610,7 @@ class GraphEngine:
db.session.close() db.session.close()
max_retries = node_instance.node_data.retry_config.max_retries max_retries = node_instance.node_data.retry_config.max_retries
retry_interval = node_instance.node_data.retry_config.retry_interval retry_interval = node_instance.node_data.retry_config.retry_interval_seconds
retries = 0 retries = 0
shoudl_continue_retry = True shoudl_continue_retry = True
while shoudl_continue_retry and retries <= max_retries: while shoudl_continue_retry and retries <= max_retries:
@ -641,6 +641,8 @@ class GraphEngine:
run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
if node_instance.should_retry and retries < max_retries: if node_instance.should_retry and retries < max_retries:
retries += 1 retries += 1
self.graph_runtime_state.node_run_steps += 1
route_node_state.node_run_result = run_result
yield NodeRunRetryEvent( yield NodeRunRetryEvent(
id=node_instance.id, id=node_instance.id,
node_id=node_instance.node_id, node_id=node_instance.node_id,
@ -654,8 +656,9 @@ class GraphEngine:
parent_parallel_id=parent_parallel_id, parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id, parent_parallel_start_node_id=parent_parallel_start_node_id,
start_at=retry_start_at, start_at=retry_start_at,
start_index=self.graph_runtime_state.node_run_steps,
) )
time.sleep(retry_interval / 1000) time.sleep(retry_interval)
continue continue
route_node_state.set_finished(run_result=run_result) route_node_state.set_finished(run_result=run_result)

@ -113,6 +113,10 @@ class RetryConfig(BaseModel):
retry_interval: int = 0 # retry interval in milliseconds retry_interval: int = 0 # retry interval in milliseconds
retry_enabled: bool = False # whether retry is enabled retry_enabled: bool = False # whether retry is enabled
@property
def retry_interval_seconds(self) -> float:
return self.retry_interval / 1000
class BaseNodeData(ABC, BaseModel): class BaseNodeData(ABC, BaseModel):
title: str title: str

@ -4,6 +4,7 @@ from pydantic import BaseModel, Field
from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.entities.llm_entities import LLMUsage
from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.node_entities import NodeRunResult
from models.workflow import WorkflowNodeExecutionStatus
class RunCompletedEvent(BaseModel): class RunCompletedEvent(BaseModel):
@ -36,3 +37,17 @@ class RunRetryEvent(BaseModel):
error: str = Field(..., description="error") error: str = Field(..., description="error")
retry_index: int = Field(..., description="Retry attempt number") retry_index: int = Field(..., description="Retry attempt number")
start_at: datetime = Field(..., description="Retry start time") start_at: datetime = Field(..., description="Retry start time")
class SingleStepRetryEvent(BaseModel):
"""Single step retry event"""
status: str = WorkflowNodeExecutionStatus.RETRY.value
inputs: dict | None = Field(..., description="input")
error: str = Field(..., description="error")
outputs: dict = Field(..., description="output")
retry_index: int = Field(..., description="Retry attempt number")
error: str = Field(..., description="error")
elapsed_time: float = Field(..., description="elapsed time")
execution_metadata: dict | None = Field(..., description="execution metadata")

@ -81,6 +81,17 @@ workflow_run_detail_fields = {
"exceptions_count": fields.Integer, "exceptions_count": fields.Integer,
} }
retry_event_field = {
"error": fields.String,
"retry_index": fields.Integer,
"inputs": fields.Raw(attribute="inputs"),
"elapsed_time": fields.Float,
"execution_metadata": fields.Raw(attribute="execution_metadata_dict"),
"status": fields.String,
"outputs": fields.Raw(attribute="outputs"),
}
workflow_run_node_execution_fields = { workflow_run_node_execution_fields = {
"id": fields.String, "id": fields.String,
"index": fields.Integer, "index": fields.Integer,
@ -101,6 +112,7 @@ workflow_run_node_execution_fields = {
"created_by_account": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True), "created_by_account": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True),
"created_by_end_user": fields.Nested(simple_end_user_fields, attribute="created_by_end_user", allow_null=True), "created_by_end_user": fields.Nested(simple_end_user_fields, attribute="created_by_end_user", allow_null=True),
"finished_at": TimestampField, "finished_at": TimestampField,
"retry_events": fields.List(fields.Nested(retry_event_field)),
} }
workflow_run_node_execution_list_fields = { workflow_run_node_execution_list_fields = {

@ -15,6 +15,7 @@ from core.workflow.nodes.base.entities import BaseNodeData
from core.workflow.nodes.base.node import BaseNode from core.workflow.nodes.base.node import BaseNode
from core.workflow.nodes.enums import ErrorStrategy from core.workflow.nodes.enums import ErrorStrategy
from core.workflow.nodes.event import RunCompletedEvent from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.nodes.event.event import SingleStepRetryEvent
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from core.workflow.workflow_entry import WorkflowEntry from core.workflow.workflow_entry import WorkflowEntry
from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
@ -220,56 +221,99 @@ class WorkflowService:
# run draft workflow node # run draft workflow node
start_at = time.perf_counter() start_at = time.perf_counter()
retries = 0
max_retries = 0
should_retry = True
retry_events = []
try: try:
node_instance, generator = WorkflowEntry.single_step_run( while retries <= max_retries and should_retry:
workflow=draft_workflow, retry_start_at = time.perf_counter()
node_id=node_id, node_instance, generator = WorkflowEntry.single_step_run(
user_inputs=user_inputs, workflow=draft_workflow,
user_id=account.id, node_id=node_id,
) user_inputs=user_inputs,
node_instance = cast(BaseNode[BaseNodeData], node_instance) user_id=account.id,
node_run_result: NodeRunResult | None = None )
for event in generator: node_instance = cast(BaseNode[BaseNodeData], node_instance)
if isinstance(event, RunCompletedEvent): max_retries = (
node_run_result = event.run_result node_instance.node_data.retry_config.max_retries if node_instance.node_data.retry_config else 0
)
# sign output files retry_interval = node_instance.node_data.retry_config.retry_interval_seconds
node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) node_run_result: NodeRunResult | None = None
break for event in generator:
if isinstance(event, RunCompletedEvent):
if not node_run_result: node_run_result = event.run_result
raise ValueError("Node run failed with no run result")
# single step debug mode error handling return # sign output files
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error: node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
node_error_args = { break
"status": WorkflowNodeExecutionStatus.EXCEPTION,
"error": node_run_result.error, if not node_run_result:
"inputs": node_run_result.inputs, raise ValueError("Node run failed with no run result")
"metadata": {"error_strategy": node_instance.node_data.error_strategy}, # single step debug mode error handling return
} if node_run_result.status == WorkflowNodeExecutionStatus.FAILED:
if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE: if (
node_run_result = NodeRunResult( retries == max_retries
**node_error_args, and node_instance.node_type == NodeType.HTTP_REQUEST
outputs={ and node_run_result.outputs
**node_instance.node_data.default_value_dict, and not node_instance.should_continue_on_error
"error_message": node_run_result.error, ):
"error_type": node_run_result.error_type, node_run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
}, should_retry = False
) else:
else: if node_instance.should_retry:
node_run_result = NodeRunResult( node_run_result.status = WorkflowNodeExecutionStatus.RETRY
**node_error_args, retries += 1
outputs={ node_run_result.retry_index = retries
"error_message": node_run_result.error, retry_events.append(
"error_type": node_run_result.error_type, SingleStepRetryEvent(
}, inputs=WorkflowEntry.handle_special_values(node_run_result.inputs)
) if node_run_result.inputs
run_succeeded = node_run_result.status in ( else None,
WorkflowNodeExecutionStatus.SUCCEEDED, error=node_run_result.error,
WorkflowNodeExecutionStatus.EXCEPTION, outputs=WorkflowEntry.handle_special_values(node_run_result.outputs)
) if node_run_result.outputs
error = node_run_result.error if not run_succeeded else None else None,
retry_index=node_run_result.retry_index,
elapsed_time=time.perf_counter() - retry_start_at,
execution_metadata=WorkflowEntry.handle_special_values(node_run_result.metadata)
if node_run_result.metadata
else None,
)
)
time.sleep(retry_interval)
else:
should_retry = False
if node_instance.should_continue_on_error:
node_error_args = {
"status": WorkflowNodeExecutionStatus.EXCEPTION,
"error": node_run_result.error,
"inputs": node_run_result.inputs,
"metadata": {"error_strategy": node_instance.node_data.error_strategy},
}
if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
node_run_result = NodeRunResult(
**node_error_args,
outputs={
**node_instance.node_data.default_value_dict,
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
else:
node_run_result = NodeRunResult(
**node_error_args,
outputs={
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
run_succeeded = node_run_result.status in (
WorkflowNodeExecutionStatus.SUCCEEDED,
WorkflowNodeExecutionStatus.EXCEPTION,
)
error = node_run_result.error if not run_succeeded else None
except WorkflowNodeRunFailedError as e: except WorkflowNodeRunFailedError as e:
node_instance = e.node_instance node_instance = e.node_instance
run_succeeded = False run_succeeded = False
@ -318,6 +362,7 @@ class WorkflowService:
db.session.add(workflow_node_execution) db.session.add(workflow_node_execution)
db.session.commit() db.session.commit()
workflow_node_execution.retry_events = retry_events
return workflow_node_execution return workflow_node_execution

Loading…
Cancel
Save