fix: remove the single step retry

pull/11903/head
Novice Lee 1 year ago
parent bb7d8ebadc
commit 3e48f54f4e

@ -15,7 +15,7 @@ from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from factories import variable_factory from factories import variable_factory
from fields.workflow_fields import workflow_fields from fields.workflow_fields import workflow_fields
from fields.workflow_run_fields import single_step_node_execution_fields from fields.workflow_run_fields import workflow_run_node_execution_fields
from libs import helper from libs import helper
from libs.helper import TimestampField, uuid_value from libs.helper import TimestampField, uuid_value
from libs.login import current_user, login_required from libs.login import current_user, login_required
@ -285,7 +285,7 @@ class DraftWorkflowNodeRunApi(Resource):
@login_required @login_required
@account_initialization_required @account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@marshal_with(single_step_node_execution_fields) @marshal_with(workflow_run_node_execution_fields)
def post(self, app_model: App, node_id: str): def post(self, app_model: App, node_id: str):
""" """
Run draft workflow node Run draft workflow node

@ -45,6 +45,7 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
) )
retries = 0 retries = 0
stream = kwargs.pop("stream", False)
while retries <= max_retries: while retries <= max_retries:
try: try:
if dify_config.SSRF_PROXY_ALL_URL: if dify_config.SSRF_PROXY_ALL_URL:
@ -63,15 +64,14 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list") logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list")
except httpx.RequestError as e: except httpx.RequestError as e:
logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}")
if max_retries == 0: if max_retries == 0:
raise raise
logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}")
retries += 1 retries += 1
if retries <= max_retries: if retries <= max_retries:
time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1))) time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1)))
if max_retries != 0: raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}")
raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}")
def get(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): def get(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):

@ -116,11 +116,6 @@ workflow_run_node_execution_fields = {
"finished_at": TimestampField, "finished_at": TimestampField,
} }
single_step_node_execution_fields = {
**workflow_run_node_execution_fields,
"retry_events": fields.List(fields.Nested(retry_event_field)),
}
workflow_run_node_execution_list_fields = { workflow_run_node_execution_list_fields = {
"data": fields.List(fields.Nested(workflow_run_node_execution_fields)), "data": fields.List(fields.Nested(workflow_run_node_execution_fields)),
} }

@ -15,7 +15,6 @@ from core.workflow.nodes.base.entities import BaseNodeData
from core.workflow.nodes.base.node import BaseNode from core.workflow.nodes.base.node import BaseNode
from core.workflow.nodes.enums import ErrorStrategy from core.workflow.nodes.enums import ErrorStrategy
from core.workflow.nodes.event import RunCompletedEvent from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.nodes.event.event import SingleStepRetryEvent
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from core.workflow.workflow_entry import WorkflowEntry from core.workflow.workflow_entry import WorkflowEntry
from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
@ -221,95 +220,56 @@ class WorkflowService:
# run draft workflow node # run draft workflow node
start_at = time.perf_counter() start_at = time.perf_counter()
retries = 0
max_retries = 0
should_retry = True
retry_events = []
try: try:
while retries <= max_retries and should_retry: node_instance, generator = WorkflowEntry.single_step_run(
retry_start_at = time.perf_counter() workflow=draft_workflow,
node_instance, generator = WorkflowEntry.single_step_run( node_id=node_id,
workflow=draft_workflow, user_inputs=user_inputs,
node_id=node_id, user_id=account.id,
user_inputs=user_inputs, )
user_id=account.id, node_instance = cast(BaseNode[BaseNodeData], node_instance)
) node_run_result: NodeRunResult | None = None
node_instance = cast(BaseNode[BaseNodeData], node_instance) for event in generator:
max_retries = ( if isinstance(event, RunCompletedEvent):
node_instance.node_data.retry_config.max_retries if node_instance.node_data.retry_config else 0 node_run_result = event.run_result
)
retry_interval = node_instance.node_data.retry_config.retry_interval_seconds # sign output files
node_run_result: NodeRunResult | None = None node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
for event in generator: break
if isinstance(event, RunCompletedEvent):
node_run_result = event.run_result if not node_run_result:
raise ValueError("Node run failed with no run result")
# sign output files # single step debug mode error handling return
node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
break node_error_args = {
"status": WorkflowNodeExecutionStatus.EXCEPTION,
if not node_run_result: "error": node_run_result.error,
raise ValueError("Node run failed with no run result") "inputs": node_run_result.inputs,
# single step debug mode error handling return "metadata": {"error_strategy": node_instance.node_data.error_strategy},
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED: }
if ( if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
retries == max_retries node_run_result = NodeRunResult(
and node_instance.node_type == NodeType.HTTP_REQUEST **node_error_args,
and node_run_result.outputs outputs={
and not node_instance.should_continue_on_error **node_instance.node_data.default_value_dict,
): "error_message": node_run_result.error,
node_run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED "error_type": node_run_result.error_type,
should_retry = False },
else: )
if node_instance.should_retry: else:
node_run_result.status = WorkflowNodeExecutionStatus.RETRY node_run_result = NodeRunResult(
retries += 1 **node_error_args,
node_run_result.retry_index = retries outputs={
retry_events.append( "error_message": node_run_result.error,
SingleStepRetryEvent( "error_type": node_run_result.error_type,
elapsed_time=time.perf_counter() - retry_start_at, },
inputs=WorkflowEntry.handle_special_values(node_run_result.inputs), )
process_data=WorkflowEntry.handle_special_values(node_run_result.process_data), run_succeeded = node_run_result.status in (
outputs=WorkflowEntry.handle_special_values(node_run_result.outputs), WorkflowNodeExecutionStatus.SUCCEEDED,
metadata=node_run_result.metadata, WorkflowNodeExecutionStatus.EXCEPTION,
llm_usage=node_run_result.llm_usage, )
error=node_run_result.error, error = node_run_result.error if not run_succeeded else None
retry_index=node_run_result.retry_index,
)
)
time.sleep(retry_interval)
else:
should_retry = False
if node_instance.should_continue_on_error:
node_error_args = {
"status": WorkflowNodeExecutionStatus.EXCEPTION,
"error": node_run_result.error,
"inputs": node_run_result.inputs,
"metadata": {"error_strategy": node_instance.node_data.error_strategy},
}
if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
node_run_result = NodeRunResult(
**node_error_args,
outputs={
**node_instance.node_data.default_value_dict,
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
else:
node_run_result = NodeRunResult(
**node_error_args,
outputs={
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
run_succeeded = node_run_result.status in (
WorkflowNodeExecutionStatus.SUCCEEDED,
WorkflowNodeExecutionStatus.EXCEPTION,
)
error = node_run_result.error if not run_succeeded else None
except WorkflowNodeRunFailedError as e: except WorkflowNodeRunFailedError as e:
node_instance = e.node_instance node_instance = e.node_instance
run_succeeded = False run_succeeded = False
@ -358,7 +318,6 @@ class WorkflowService:
db.session.add(workflow_node_execution) db.session.add(workflow_node_execution)
db.session.commit() db.session.commit()
workflow_node_execution.retry_events = retry_events
return workflow_node_execution return workflow_node_execution

Loading…
Cancel
Save