diff --git a/api/core/workflow/workflow_entry.py b/api/core/workflow/workflow_entry.py index 361ece3634..e8d7760f01 100644 --- a/api/core/workflow/workflow_entry.py +++ b/api/core/workflow/workflow_entry.py @@ -1,8 +1,8 @@ import logging import time import uuid -from collections.abc import Callable, Generator, Mapping, Sequence -from typing import Any, Optional, TypeAlias, cast +from collections.abc import Generator, Mapping, Sequence +from typing import Any, Optional, cast from configs import dify_config from core.app.apps.base_app_queue_manager import GenerateTaskStoppedError @@ -10,7 +10,6 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.file.models import File from core.workflow.callbacks import WorkflowCallback from core.workflow.constants import ENVIRONMENT_VARIABLE_NODE_ID -from core.workflow.entities.node_entities import WorkflowNodeExecutionMetadataKey from core.workflow.entities.variable_pool import VariablePool from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.graph_engine.entities.event import GraphEngineEvent, GraphRunFailedEvent, InNodeEvent @@ -20,11 +19,10 @@ from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntime from core.workflow.graph_engine.graph_engine import GraphEngine from core.workflow.nodes import NodeType from core.workflow.nodes.base import BaseNode -from core.workflow.nodes.event import NodeEvent, RunCompletedEvent +from core.workflow.nodes.event import NodeEvent from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool from factories import file_factory -from libs import gen_utils from models.enums import UserFrom from models.workflow import ( Workflow, @@ -141,7 +139,6 @@ class WorkflowEntry: node_type = NodeType(node_config_data.get("type")) node_version = node_config_data.get("version", "1") node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version] - metadata_attacher = _attach_execution_metadata_based_on_node_config(node_config_data) # init graph graph = Graph.init(graph_config=workflow.graph_dict) @@ -194,8 +191,6 @@ class WorkflowEntry: generator = node_instance.run() except Exception as e: raise WorkflowNodeRunFailedError(node_instance=node_instance, error=str(e)) - if metadata_attacher: - generator = gen_utils.map_(generator, metadata_attacher) return node_instance, generator @classmethod @@ -381,49 +376,3 @@ class WorkflowEntry: # append variable and value to variable pool if variable_node_id != ENVIRONMENT_VARIABLE_NODE_ID: variable_pool.add([variable_node_id] + variable_key_list, input_value) - - -_NodeOrInNodeEvent: TypeAlias = NodeEvent | InNodeEvent - - -def _attach_execution_metadata( - extra_metadata: dict[WorkflowNodeExecutionMetadataKey, Any], -) -> Callable[[_NodeOrInNodeEvent], _NodeOrInNodeEvent]: - def _execution_metadata_mapper(e: NodeEvent | InNodeEvent) -> NodeEvent | InNodeEvent: - if not isinstance(e, RunCompletedEvent): - return e - run_result = e.run_result - if run_result.metadata is None: - run_result.metadata = {} - for k, v in extra_metadata.items(): - run_result.metadata[k] = v - return e - - return _execution_metadata_mapper - - -def _attach_execution_metadata_based_on_node_config( - node_config: dict, -) -> Callable[[_NodeOrInNodeEvent], _NodeOrInNodeEvent] | None: - in_loop = node_config.get("isInLoop", False) - in_iteration = node_config.get("isInIteration", False) - if in_loop: - loop_id = node_config.get("loop_id") - if loop_id is None: - raise Exception("invalid graph") - return _attach_execution_metadata( - { - WorkflowNodeExecutionMetadataKey.LOOP_ID: loop_id, - } - ) - elif in_iteration: - iteration_id = node_config.get("iteration_id") - if iteration_id is None: - raise Exception("invalid graph") - return _attach_execution_metadata( - { - WorkflowNodeExecutionMetadataKey.ITERATION_ID: iteration_id, - } - ) - else: - return None diff --git a/api/libs/gen_utils.py b/api/libs/gen_utils.py deleted file mode 100644 index 012c48b09f..0000000000 --- a/api/libs/gen_utils.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Utility functions for working with generators.""" - -from collections.abc import Callable, Generator -from typing import TypeVar - -_YieldT = TypeVar("_YieldT") -_YieldR = TypeVar("_YieldR") - - -def map_( - gen: Generator[_YieldT, None, None], - mapper: Callable[[_YieldT], _YieldR], -) -> Generator[_YieldR, None, None]: - for item in gen: - yield mapper(item) - - -def filter_( - gen: Generator[_YieldT, None, None], - mapper: Callable[[_YieldT], bool], -) -> Generator[_YieldT, None, None]: - for item in gen: - if mapper(item): - yield item - - -def wrap( - gen: Generator[_YieldT, None, None], - funcs: list[Callable[[Generator[_YieldT, None, None]], Generator[_YieldT, None, None]]], -) -> Generator[_YieldT, None, None]: - for f in funcs: - gen = f(gen) - return gen diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 3841a2ec56..349c9f4af0 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -15,7 +15,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.file import File from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.variables import Variable -from core.workflow.entities.node_entities import NodeRunResult, WorkflowNodeExecutionMetadataKey +from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey @@ -362,6 +362,13 @@ class WorkflowService: app_id=app_model.id, ) + node_config = draft_workflow.get_node_config_by_id(node_id) + eclosing_node_type_and_id = draft_workflow.get_enclosing_node_type_and_id(node_config) + if eclosing_node_type_and_id: + _, enclosing_node_id = eclosing_node_type_and_id + else: + enclosing_node_id = None + run = WorkflowEntry.single_step_run( workflow=draft_workflow, node_id=node_id, @@ -394,11 +401,6 @@ class WorkflowService: # Convert node_execution to WorkflowNodeExecution after save workflow_node_execution = repository.to_db_model(node_execution) - exec_metadata = workflow_node_execution.execution_metadata_dict or {} - - loop_id = exec_metadata.get(WorkflowNodeExecutionMetadataKey.LOOP_ID, None) - iteration_id = exec_metadata.get(WorkflowNodeExecutionMetadataKey.ITERATION_ID, None) - with Session(bind=db.engine) as session, session.begin(): draft_var_saver = DraftVariableSaver( session=session, @@ -406,7 +408,8 @@ class WorkflowService: node_id=workflow_node_execution.node_id, node_type=NodeType(workflow_node_execution.node_type), invoke_from=InvokeFrom.DEBUGGER, - enclosing_node_id=loop_id or iteration_id or None, + enclosing_node_id=enclosing_node_id, + node_execution_id=node_execution.id, ) draft_var_saver.save(process_data=node_execution.process_data, outputs=node_execution.outputs) session.commit()