diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 4b8f5ebe27..bd5ad9c51b 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -17,7 +17,8 @@ from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig from core.app.apps.advanced_chat.app_runner import AdvancedChatAppRunner from core.app.apps.advanced_chat.generate_response_converter import AdvancedChatAppGenerateResponseConverter from core.app.apps.advanced_chat.generate_task_pipeline import AdvancedChatAppGenerateTaskPipeline -from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedError, PublishFrom +from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom +from core.app.apps.exc import GenerateTaskStoppedError from core.app.apps.message_based_app_generator import MessageBasedAppGenerator from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom diff --git a/api/core/app/apps/agent_chat/app_generator.py b/api/core/app/apps/agent_chat/app_generator.py index edea6199d3..8665bc9d11 100644 --- a/api/core/app/apps/agent_chat/app_generator.py +++ b/api/core/app/apps/agent_chat/app_generator.py @@ -15,7 +15,8 @@ from core.app.app_config.features.file_upload.manager import FileUploadConfigMan from core.app.apps.agent_chat.app_config_manager import AgentChatAppConfigManager from core.app.apps.agent_chat.app_runner import AgentChatAppRunner from core.app.apps.agent_chat.generate_response_converter import AgentChatAppGenerateResponseConverter -from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedError, PublishFrom +from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom +from core.app.apps.exc import GenerateTaskStoppedError from core.app.apps.message_based_app_generator import MessageBasedAppGenerator from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity, InvokeFrom diff --git a/api/core/app/apps/base_app_queue_manager.py b/api/core/app/apps/base_app_queue_manager.py index 0ba33fbe0d..9da0bae56a 100644 --- a/api/core/app/apps/base_app_queue_manager.py +++ b/api/core/app/apps/base_app_queue_manager.py @@ -169,7 +169,3 @@ class AppQueueManager: raise TypeError( "Critical Error: Passing SQLAlchemy Model instances that cause thread safety issues is not allowed." ) - - -class GenerateTaskStoppedError(Exception): - pass diff --git a/api/core/app/apps/chat/app_generator.py b/api/core/app/apps/chat/app_generator.py index a28c106ce9..0c76cc39ae 100644 --- a/api/core/app/apps/chat/app_generator.py +++ b/api/core/app/apps/chat/app_generator.py @@ -11,10 +11,11 @@ from configs import dify_config from constants import UUID_NIL from core.app.app_config.easy_ui_based_app.model_config.converter import ModelConfigConverter from core.app.app_config.features.file_upload.manager import FileUploadConfigManager -from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedError, PublishFrom +from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.chat.app_config_manager import ChatAppConfigManager from core.app.apps.chat.app_runner import ChatAppRunner from core.app.apps.chat.generate_response_converter import ChatAppGenerateResponseConverter +from core.app.apps.exc import GenerateTaskStoppedError from core.app.apps.message_based_app_generator import MessageBasedAppGenerator from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager from core.app.entities.app_invoke_entities import ChatAppGenerateEntity, InvokeFrom diff --git a/api/core/app/apps/completion/app_generator.py b/api/core/app/apps/completion/app_generator.py index 966a6f1d66..195e7e2e3d 100644 --- a/api/core/app/apps/completion/app_generator.py +++ b/api/core/app/apps/completion/app_generator.py @@ -10,10 +10,11 @@ from pydantic import ValidationError from configs import dify_config from core.app.app_config.easy_ui_based_app.model_config.converter import ModelConfigConverter from core.app.app_config.features.file_upload.manager import FileUploadConfigManager -from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedError, PublishFrom +from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.completion.app_config_manager import CompletionAppConfigManager from core.app.apps.completion.app_runner import CompletionAppRunner from core.app.apps.completion.generate_response_converter import CompletionAppGenerateResponseConverter +from core.app.apps.exc import GenerateTaskStoppedError from core.app.apps.message_based_app_generator import MessageBasedAppGenerator from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager from core.app.entities.app_invoke_entities import CompletionAppGenerateEntity, InvokeFrom diff --git a/api/core/app/apps/exc.py b/api/core/app/apps/exc.py new file mode 100644 index 0000000000..4187118b9b --- /dev/null +++ b/api/core/app/apps/exc.py @@ -0,0 +1,2 @@ +class GenerateTaskStoppedError(Exception): + pass diff --git a/api/core/app/apps/message_based_app_generator.py b/api/core/app/apps/message_based_app_generator.py index e84d59209d..85fafe6980 100644 --- a/api/core/app/apps/message_based_app_generator.py +++ b/api/core/app/apps/message_based_app_generator.py @@ -6,7 +6,8 @@ from typing import Optional, Union, cast from core.app.app_config.entities import EasyUIBasedAppConfig, EasyUIBasedAppModelConfigFrom from core.app.apps.base_app_generator import BaseAppGenerator -from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedError +from core.app.apps.base_app_queue_manager import AppQueueManager +from core.app.apps.exc import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import ( AdvancedChatAppGenerateEntity, AgentChatAppGenerateEntity, diff --git a/api/core/app/apps/message_based_app_queue_manager.py b/api/core/app/apps/message_based_app_queue_manager.py index 363c3c82bb..8507f23f17 100644 --- a/api/core/app/apps/message_based_app_queue_manager.py +++ b/api/core/app/apps/message_based_app_queue_manager.py @@ -1,4 +1,5 @@ -from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedError, PublishFrom +from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom +from core.app.apps.exc import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.queue_entities import ( AppQueueEvent, diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 2f9632e97d..6f560b3253 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -13,7 +13,8 @@ import contexts from configs import dify_config from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.app.apps.base_app_generator import BaseAppGenerator -from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedError, PublishFrom +from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom +from core.app.apps.exc import GenerateTaskStoppedError from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.apps.workflow.app_queue_manager import WorkflowAppQueueManager from core.app.apps.workflow.app_runner import WorkflowAppRunner diff --git a/api/core/app/apps/workflow/app_queue_manager.py b/api/core/app/apps/workflow/app_queue_manager.py index 349b8eb51b..40fc03afb7 100644 --- a/api/core/app/apps/workflow/app_queue_manager.py +++ b/api/core/app/apps/workflow/app_queue_manager.py @@ -1,4 +1,5 @@ -from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedError, PublishFrom +from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom +from core.app.apps.exc import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.queue_entities import ( AppQueueEvent, diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 701b90ae0d..c582778a58 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -12,7 +12,7 @@ from typing import Any, Optional, cast from flask import Flask, current_app from configs import dify_config -from core.app.apps.base_app_queue_manager import GenerateTaskStoppedError +from core.app.apps.exc import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.node_entities import AgentNodeStrategyInit, NodeRunResult from core.workflow.entities.variable_pool import VariablePool, VariableValue @@ -48,11 +48,9 @@ from core.workflow.nodes.agent.entities import AgentNodeData from core.workflow.nodes.answer.answer_stream_processor import AnswerStreamProcessor from core.workflow.nodes.answer.base_stream_processor import StreamProcessor from core.workflow.nodes.base import BaseNode -from core.workflow.nodes.base.entities import BaseNodeData from core.workflow.nodes.end.end_stream_processor import EndStreamProcessor from core.workflow.nodes.enums import ErrorStrategy, FailBranchSourceHandle from core.workflow.nodes.event import RunCompletedEvent, RunRetrieverResourceEvent, RunStreamChunkEvent -from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING from core.workflow.utils import variable_utils from libs.flask_utils import preserve_flask_contexts from models.enums import UserFrom @@ -260,6 +258,10 @@ class GraphEngine: # convert to specific node node_type = NodeType(node_config.get("data", {}).get("type")) node_version = node_config.get("data", {}).get("version", "1") + + # Import here to avoid circular import + from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING + node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version] previous_node_id = previous_route_node_state.node_id if previous_route_node_state else None @@ -414,7 +416,7 @@ class GraphEngine: next_node_id = final_node_id elif ( node_instance.node_data.error_strategy == ErrorStrategy.FAIL_BRANCH - and node_instance.should_continue_on_error + and node_instance.continue_on_error and previous_route_node_state.status == RouteNodeState.Status.EXCEPTION ): break @@ -597,7 +599,7 @@ class GraphEngine: def _run_node( self, - node_instance: BaseNode[BaseNodeData], + node_instance: BaseNode, route_node_state: RouteNodeState, parallel_id: Optional[str] = None, parallel_start_node_id: Optional[str] = None, @@ -660,10 +662,10 @@ class GraphEngine: retries == max_retries and node_instance.node_type == NodeType.HTTP_REQUEST and run_result.outputs - and not node_instance.should_continue_on_error + and not node_instance.continue_on_error ): run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED - if node_instance.should_retry and retries < max_retries: + if node_instance.retry and retries < max_retries: retries += 1 route_node_state.node_run_result = run_result yield NodeRunRetryEvent( @@ -687,7 +689,7 @@ class GraphEngine: route_node_state.set_finished(run_result=run_result) if run_result.status == WorkflowNodeExecutionStatus.FAILED: - if node_instance.should_continue_on_error: + if node_instance.continue_on_error: # if run failed, handle error run_result = self._handle_continue_on_error( node_instance, @@ -736,7 +738,7 @@ class GraphEngine: should_continue_retry = False elif run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED: if ( - node_instance.should_continue_on_error + node_instance.continue_on_error and self.graph.edge_mapping.get(node_instance.node_id) and node_instance.node_data.error_strategy is ErrorStrategy.FAIL_BRANCH ): @@ -886,7 +888,7 @@ class GraphEngine: def _handle_continue_on_error( self, - node_instance: BaseNode[BaseNodeData], + node_instance: BaseNode, error_result: NodeRunResult, variable_pool: VariablePool, handle_exceptions: list[str] = [], diff --git a/api/core/workflow/nodes/base/node.py b/api/core/workflow/nodes/base/node.py index 0fdf77c163..467c63959e 100644 --- a/api/core/workflow/nodes/base/node.py +++ b/api/core/workflow/nodes/base/node.py @@ -1,15 +1,15 @@ import logging from abc import abstractmethod -from collections.abc import Callable, Generator, Mapping, Sequence +from collections.abc import Generator, Mapping, Sequence from typing import TYPE_CHECKING, Any, ClassVar, Optional, Union, cast from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.nodes.enums import CONTINUE_ON_ERROR_NODE_TYPE, RETRY_ON_ERROR_NODE_TYPE, NodeType +from core.workflow.nodes.enums import NodeType from core.workflow.nodes.event import NodeEvent, RunCompletedEvent if TYPE_CHECKING: - from core.workflow.graph_engine import Graph, GraphEngine, GraphInitParams, GraphRuntimeState + from core.workflow.graph_engine import Graph, GraphInitParams, GraphRuntimeState from core.workflow.graph_engine.entities.event import InNodeEvent logger = logging.getLogger(__name__) @@ -49,7 +49,6 @@ class BaseNode: self.node_id = node_id - node_data = self._node_data_cls.model_validate(config.get("data", {})) @abstractmethod def from_dict(self, data: Mapping[str, Any]) -> None: ... @@ -136,7 +135,7 @@ class BaseNode: *, graph_config: Mapping[str, Any], node_id: str, - node_data: GenericNodeData, + node_data: Any, ) -> Mapping[str, Sequence[str]]: """ Extract variable selector to variable mapping @@ -175,19 +174,14 @@ class BaseNode: raise NotImplementedError("subclasses of BaseNode must implement `version` method.") @property - def should_continue_on_error(self) -> bool: - """judge if should continue on error - - Returns: - bool: if should continue on error - """ - return self.node_data.error_strategy is not None and self.node_type in CONTINUE_ON_ERROR_NODE_TYPE + def continue_on_error(self) -> bool: + return False @property - def should_retry(self) -> bool: + def retry(self) -> bool: """judge if should retry Returns: bool: if should retry """ - return self.node_data.retry_config.retry_enabled and self.node_type in RETRY_ON_ERROR_NODE_TYPE + return False diff --git a/api/core/workflow/nodes/code/code_node.py b/api/core/workflow/nodes/code/code_node.py index cdf3913197..97a99b57b4 100644 --- a/api/core/workflow/nodes/code/code_node.py +++ b/api/core/workflow/nodes/code/code_node.py @@ -351,3 +351,11 @@ class CodeNode(BaseNode): node_id + "." + variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables } + + @property + def continue_on_error(self) -> bool: + return self.node_data.error_strategy is not None + + @property + def retry(self) -> bool: + return self.node_data.retry_config.retry_enabled diff --git a/api/core/workflow/nodes/enums.py b/api/core/workflow/nodes/enums.py index 73b43eeaf7..7cf9ab9107 100644 --- a/api/core/workflow/nodes/enums.py +++ b/api/core/workflow/nodes/enums.py @@ -35,7 +35,3 @@ class ErrorStrategy(StrEnum): class FailBranchSourceHandle(StrEnum): FAILED = "fail-branch" SUCCESS = "success-branch" - - -CONTINUE_ON_ERROR_NODE_TYPE = [NodeType.LLM, NodeType.CODE, NodeType.TOOL, NodeType.HTTP_REQUEST] -RETRY_ON_ERROR_NODE_TYPE = CONTINUE_ON_ERROR_NODE_TYPE diff --git a/api/core/workflow/nodes/http_request/node.py b/api/core/workflow/nodes/http_request/node.py index f164794630..b7f248e968 100644 --- a/api/core/workflow/nodes/http_request/node.py +++ b/api/core/workflow/nodes/http_request/node.py @@ -82,7 +82,7 @@ class HttpRequestNode(BaseNode): response = http_executor.invoke() files = self.extract_files(url=http_executor.url, response=response) - if not response.response.is_success and (self.should_continue_on_error or self.should_retry): + if not response.response.is_success and (self.continue_on_error or self.retry): return NodeRunResult( status=WorkflowNodeExecutionStatus.FAILED, outputs={ @@ -221,3 +221,11 @@ class HttpRequestNode(BaseNode): files.append(file) return ArrayFileSegment(value=files) + + @property + def continue_on_error(self) -> bool: + return self.node_data.error_strategy is not None + + @property + def retry(self) -> bool: + return self.node_data.retry_config.retry_enabled diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index f57b401a7f..80587466fb 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -1062,6 +1062,14 @@ class LLMNode(BaseNode): logger.warning("unknown contents type encountered, type=%s", type(contents)) yield str(contents) + @property + def continue_on_error(self) -> bool: + return self.node_data.error_strategy is not None + + @property + def retry(self) -> bool: + return self.node_data.retry_config.retry_enabled + def _combine_message_content_with_role( *, contents: Optional[str | list[PromptMessageContentUnionTypes]] = None, role: PromptMessageRole diff --git a/api/core/workflow/nodes/tool/tool_node.py b/api/core/workflow/nodes/tool/tool_node.py index 8be61e97c6..aa1fa0c282 100644 --- a/api/core/workflow/nodes/tool/tool_node.py +++ b/api/core/workflow/nodes/tool/tool_node.py @@ -398,3 +398,11 @@ class ToolNode(BaseNode): result = {node_id + "." + key: value for key, value in result.items()} return result + + @property + def continue_on_error(self) -> bool: + return self.node_data.error_strategy is not None + + @property + def retry(self) -> bool: + return self.node_data.retry_config.retry_enabled diff --git a/api/core/workflow/workflow_entry.py b/api/core/workflow/workflow_entry.py index 1399efcdb1..89061c746b 100644 --- a/api/core/workflow/workflow_entry.py +++ b/api/core/workflow/workflow_entry.py @@ -5,7 +5,7 @@ from collections.abc import Generator, Mapping, Sequence from typing import Any, Optional, cast from configs import dify_config -from core.app.apps.base_app_queue_manager import GenerateTaskStoppedError +from core.app.apps.exc import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import InvokeFrom from core.file.models import File from core.workflow.callbacks import WorkflowCallback diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 677bc74237..934453f87d 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -479,7 +479,7 @@ class WorkflowService: if not node_run_result: raise ValueError("Node run failed with no run result") # single step debug mode error handling return - if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error: + if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.continue_on_error: node_error_args: dict[str, Any] = { "status": WorkflowNodeExecutionStatus.EXCEPTION, "error": node_run_result.error,