refactor(api): extract enclosing_node_id directly from graph definition

Instead of attaching it to `execution_metadata` and
extracting from the result.
pull/20699/head
QuantumGhost 12 months ago
parent 59c5254fb9
commit 68408805dc

@ -1,8 +1,8 @@
import logging import logging
import time import time
import uuid import uuid
from collections.abc import Callable, Generator, Mapping, Sequence from collections.abc import Generator, Mapping, Sequence
from typing import Any, Optional, TypeAlias, cast from typing import Any, Optional, cast
from configs import dify_config from configs import dify_config
from core.app.apps.base_app_queue_manager import GenerateTaskStoppedError from core.app.apps.base_app_queue_manager import GenerateTaskStoppedError
@ -10,7 +10,6 @@ from core.app.entities.app_invoke_entities import InvokeFrom
from core.file.models import File from core.file.models import File
from core.workflow.callbacks import WorkflowCallback from core.workflow.callbacks import WorkflowCallback
from core.workflow.constants import ENVIRONMENT_VARIABLE_NODE_ID from core.workflow.constants import ENVIRONMENT_VARIABLE_NODE_ID
from core.workflow.entities.node_entities import WorkflowNodeExecutionMetadataKey
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.graph_engine.entities.event import GraphEngineEvent, GraphRunFailedEvent, InNodeEvent from core.workflow.graph_engine.entities.event import GraphEngineEvent, GraphRunFailedEvent, InNodeEvent
@ -20,11 +19,10 @@ from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntime
from core.workflow.graph_engine.graph_engine import GraphEngine from core.workflow.graph_engine.graph_engine import GraphEngine
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.nodes.base import BaseNode from core.workflow.nodes.base import BaseNode
from core.workflow.nodes.event import NodeEvent, RunCompletedEvent from core.workflow.nodes.event import NodeEvent
from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool
from factories import file_factory from factories import file_factory
from libs import gen_utils
from models.enums import UserFrom from models.enums import UserFrom
from models.workflow import ( from models.workflow import (
Workflow, Workflow,
@ -141,7 +139,6 @@ class WorkflowEntry:
node_type = NodeType(node_config_data.get("type")) node_type = NodeType(node_config_data.get("type"))
node_version = node_config_data.get("version", "1") node_version = node_config_data.get("version", "1")
node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version] node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version]
metadata_attacher = _attach_execution_metadata_based_on_node_config(node_config_data)
# init graph # init graph
graph = Graph.init(graph_config=workflow.graph_dict) graph = Graph.init(graph_config=workflow.graph_dict)
@ -194,8 +191,6 @@ class WorkflowEntry:
generator = node_instance.run() generator = node_instance.run()
except Exception as e: except Exception as e:
raise WorkflowNodeRunFailedError(node_instance=node_instance, error=str(e)) raise WorkflowNodeRunFailedError(node_instance=node_instance, error=str(e))
if metadata_attacher:
generator = gen_utils.map_(generator, metadata_attacher)
return node_instance, generator return node_instance, generator
@classmethod @classmethod
@ -381,49 +376,3 @@ class WorkflowEntry:
# append variable and value to variable pool # append variable and value to variable pool
if variable_node_id != ENVIRONMENT_VARIABLE_NODE_ID: if variable_node_id != ENVIRONMENT_VARIABLE_NODE_ID:
variable_pool.add([variable_node_id] + variable_key_list, input_value) variable_pool.add([variable_node_id] + variable_key_list, input_value)
_NodeOrInNodeEvent: TypeAlias = NodeEvent | InNodeEvent
def _attach_execution_metadata(
extra_metadata: dict[WorkflowNodeExecutionMetadataKey, Any],
) -> Callable[[_NodeOrInNodeEvent], _NodeOrInNodeEvent]:
def _execution_metadata_mapper(e: NodeEvent | InNodeEvent) -> NodeEvent | InNodeEvent:
if not isinstance(e, RunCompletedEvent):
return e
run_result = e.run_result
if run_result.metadata is None:
run_result.metadata = {}
for k, v in extra_metadata.items():
run_result.metadata[k] = v
return e
return _execution_metadata_mapper
def _attach_execution_metadata_based_on_node_config(
node_config: dict,
) -> Callable[[_NodeOrInNodeEvent], _NodeOrInNodeEvent] | None:
in_loop = node_config.get("isInLoop", False)
in_iteration = node_config.get("isInIteration", False)
if in_loop:
loop_id = node_config.get("loop_id")
if loop_id is None:
raise Exception("invalid graph")
return _attach_execution_metadata(
{
WorkflowNodeExecutionMetadataKey.LOOP_ID: loop_id,
}
)
elif in_iteration:
iteration_id = node_config.get("iteration_id")
if iteration_id is None:
raise Exception("invalid graph")
return _attach_execution_metadata(
{
WorkflowNodeExecutionMetadataKey.ITERATION_ID: iteration_id,
}
)
else:
return None

@ -1,33 +0,0 @@
"""Utility functions for working with generators."""
from collections.abc import Callable, Generator
from typing import TypeVar
_YieldT = TypeVar("_YieldT")
_YieldR = TypeVar("_YieldR")
def map_(
gen: Generator[_YieldT, None, None],
mapper: Callable[[_YieldT], _YieldR],
) -> Generator[_YieldR, None, None]:
for item in gen:
yield mapper(item)
def filter_(
gen: Generator[_YieldT, None, None],
mapper: Callable[[_YieldT], bool],
) -> Generator[_YieldT, None, None]:
for item in gen:
if mapper(item):
yield item
def wrap(
gen: Generator[_YieldT, None, None],
funcs: list[Callable[[Generator[_YieldT, None, None]], Generator[_YieldT, None, None]]],
) -> Generator[_YieldT, None, None]:
for f in funcs:
gen = f(gen)
return gen

@ -15,7 +15,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom
from core.file import File from core.file import File
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.variables import Variable from core.variables import Variable
from core.workflow.entities.node_entities import NodeRunResult, WorkflowNodeExecutionMetadataKey from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
@ -362,6 +362,13 @@ class WorkflowService:
app_id=app_model.id, app_id=app_model.id,
) )
node_config = draft_workflow.get_node_config_by_id(node_id)
eclosing_node_type_and_id = draft_workflow.get_enclosing_node_type_and_id(node_config)
if eclosing_node_type_and_id:
_, enclosing_node_id = eclosing_node_type_and_id
else:
enclosing_node_id = None
run = WorkflowEntry.single_step_run( run = WorkflowEntry.single_step_run(
workflow=draft_workflow, workflow=draft_workflow,
node_id=node_id, node_id=node_id,
@ -394,11 +401,6 @@ class WorkflowService:
# Convert node_execution to WorkflowNodeExecution after save # Convert node_execution to WorkflowNodeExecution after save
workflow_node_execution = repository.to_db_model(node_execution) workflow_node_execution = repository.to_db_model(node_execution)
exec_metadata = workflow_node_execution.execution_metadata_dict or {}
loop_id = exec_metadata.get(WorkflowNodeExecutionMetadataKey.LOOP_ID, None)
iteration_id = exec_metadata.get(WorkflowNodeExecutionMetadataKey.ITERATION_ID, None)
with Session(bind=db.engine) as session, session.begin(): with Session(bind=db.engine) as session, session.begin():
draft_var_saver = DraftVariableSaver( draft_var_saver = DraftVariableSaver(
session=session, session=session,
@ -406,7 +408,8 @@ class WorkflowService:
node_id=workflow_node_execution.node_id, node_id=workflow_node_execution.node_id,
node_type=NodeType(workflow_node_execution.node_type), node_type=NodeType(workflow_node_execution.node_type),
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
enclosing_node_id=loop_id or iteration_id or None, enclosing_node_id=enclosing_node_id,
node_execution_id=node_execution.id,
) )
draft_var_saver.save(process_data=node_execution.process_data, outputs=node_execution.outputs) draft_var_saver.save(process_data=node_execution.process_data, outputs=node_execution.outputs)
session.commit() session.commit()

Loading…
Cancel
Save