From 11bc5c0dec688ac542179d3e4785102af0bbcd68 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Mon, 7 Jul 2025 12:58:45 +0800 Subject: [PATCH] refactor(api): Change system variable from dict to pydantic model This allows us to serialize and deserialize `SystemVariable` without manully implmenting relevant logic. --- api/core/app/apps/advanced_chat/app_runner.py | 24 ++--- .../advanced_chat/generate_task_pipeline.py | 22 ++--- api/core/app/apps/workflow/app_runner.py | 17 ++-- .../apps/workflow/generate_task_pipeline.py | 16 ++-- api/core/app/apps/workflow_app_runner.py | 5 +- api/core/prompt/advanced_prompt_transform.py | 2 +- api/core/workflow/system_variable.py | 89 +++++++++++++++++++ api/core/workflow/workflow_cycle_manager.py | 18 ++-- api/services/workflow_service.py | 36 +++----- .../workflow/nodes/test_code.py | 4 +- .../workflow/nodes/test_http.py | 4 +- .../workflow/nodes/test_llm.py | 16 ++-- .../nodes/test_parameter_extractor.py | 11 +-- .../workflow/nodes/test_template_transform.py | 4 +- .../workflow/nodes/test_tool.py | 4 +- .../graph_engine/test_graph_engine.py | 47 +++++----- .../core/workflow/nodes/answer/test_answer.py | 4 +- .../answer/test_answer_stream_processor.py | 14 +-- .../test_http_request_executor.py | 23 +++-- .../http_request/test_http_request_node.py | 7 +- .../nodes/iteration/test_iteration.py | 50 +++++------ .../core/workflow/nodes/llm/test_node.py | 13 +-- .../core/workflow/nodes/test_answer.py | 4 +- .../workflow/nodes/test_continue_on_error.py | 13 +-- .../core/workflow/nodes/test_if_else.py | 8 +- .../workflow/nodes/tool/test_tool_node.py | 3 +- .../v1/test_variable_assigner_v1.py | 8 +- .../v2/test_variable_assigner_v2.py | 10 +-- .../workflow/test_workflow_cycle_manager.py | 18 ++-- .../workflow/utils/test_variable_utils.py | 12 +-- 30 files changed, 305 insertions(+), 201 deletions(-) create mode 100644 api/core/workflow/system_variable.py diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 840a3c9d3b..f53989ba36 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -18,7 +18,7 @@ from core.app.entities.queue_entities import ( from core.moderation.base import ModerationError from core.workflow.callbacks import WorkflowCallback, WorkflowLoggingCallback from core.workflow.entities.variable_pool import VariablePool -from core.workflow.enums import SystemVariableKey +from core.workflow.system_variable import SystemVariable from core.workflow.variable_loader import VariableLoader from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db @@ -64,7 +64,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): if not workflow: raise ValueError("Workflow not initialized") - user_id = None + user_id: str | None = None if self.application_generate_entity.invoke_from in {InvokeFrom.WEB_APP, InvokeFrom.SERVICE_API}: end_user = db.session.query(EndUser).filter(EndUser.id == self.application_generate_entity.user_id).first() if end_user: @@ -136,16 +136,16 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): session.commit() # Create a variable pool. - system_inputs = { - SystemVariableKey.QUERY: query, - SystemVariableKey.FILES: files, - SystemVariableKey.CONVERSATION_ID: self.conversation.id, - SystemVariableKey.USER_ID: user_id, - SystemVariableKey.DIALOGUE_COUNT: self._dialogue_count, - SystemVariableKey.APP_ID: app_config.app_id, - SystemVariableKey.WORKFLOW_ID: app_config.workflow_id, - SystemVariableKey.WORKFLOW_EXECUTION_ID: self.application_generate_entity.workflow_run_id, - } + system_inputs = SystemVariable( + query=query, + files=files, + conversation_id=self.conversation.id, + user_id=user_id, + dialogue_count=self._dialogue_count, + app_id=app_config.app_id, + workflow_id=app_config.workflow_id, + workflow_execution_id=self.application_generate_entity.workflow_run_id, + ) # init variable pool variable_pool = VariablePool( diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 4c52fc3e83..1dc9796d5b 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -61,12 +61,12 @@ from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.model_runtime.entities.llm_entities import LLMUsage from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.entities.workflow_execution import WorkflowExecutionStatus, WorkflowType -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes import NodeType from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.system_variable import SystemVariable from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager from events.message_event import message_was_created from extensions.ext_database import db @@ -116,16 +116,16 @@ class AdvancedChatAppGenerateTaskPipeline: self._workflow_cycle_manager = WorkflowCycleManager( application_generate_entity=application_generate_entity, - workflow_system_variables={ - SystemVariableKey.QUERY: message.query, - SystemVariableKey.FILES: application_generate_entity.files, - SystemVariableKey.CONVERSATION_ID: conversation.id, - SystemVariableKey.USER_ID: user_session_id, - SystemVariableKey.DIALOGUE_COUNT: dialogue_count, - SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id, - SystemVariableKey.WORKFLOW_ID: workflow.id, - SystemVariableKey.WORKFLOW_EXECUTION_ID: application_generate_entity.workflow_run_id, - }, + workflow_system_variables=SystemVariable( + query=message.query, + files=application_generate_entity.files, + conversation_id=conversation.id, + user_id=user_session_id, + dialogue_count=dialogue_count, + app_id=application_generate_entity.app_config.app_id, + workflow_id=workflow.id, + workflow_execution_id=application_generate_entity.workflow_run_id, + ), workflow_info=CycleManagerWorkflowInfo( workflow_id=workflow.id, workflow_type=WorkflowType(workflow.type), diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 07aeb57fa3..3a66ffa578 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -11,7 +11,7 @@ from core.app.entities.app_invoke_entities import ( ) from core.workflow.callbacks import WorkflowCallback, WorkflowLoggingCallback from core.workflow.entities.variable_pool import VariablePool -from core.workflow.enums import SystemVariableKey +from core.workflow.system_variable import SystemVariable from core.workflow.variable_loader import VariableLoader from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db @@ -95,13 +95,14 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): files = self.application_generate_entity.files # Create a variable pool. - system_inputs = { - SystemVariableKey.FILES: files, - SystemVariableKey.USER_ID: user_id, - SystemVariableKey.APP_ID: app_config.app_id, - SystemVariableKey.WORKFLOW_ID: app_config.workflow_id, - SystemVariableKey.WORKFLOW_EXECUTION_ID: self.application_generate_entity.workflow_execution_id, - } + + system_inputs = SystemVariable( + files=files, + user_id=user_id, + app_id=app_config.app_id, + workflow_id=app_config.workflow_id, + workflow_execution_id=self.application_generate_entity.workflow_execution_id, + ) variable_pool = VariablePool( system_variables=system_inputs, diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 2a85cd5e3d..cf11fe954f 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -55,10 +55,10 @@ from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTas from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType -from core.workflow.enums import SystemVariableKey from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.system_variable import SystemVariable from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager from extensions.ext_database import db from models.account import Account @@ -109,13 +109,13 @@ class WorkflowAppGenerateTaskPipeline: self._workflow_cycle_manager = WorkflowCycleManager( application_generate_entity=application_generate_entity, - workflow_system_variables={ - SystemVariableKey.FILES: application_generate_entity.files, - SystemVariableKey.USER_ID: user_session_id, - SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id, - SystemVariableKey.WORKFLOW_ID: workflow.id, - SystemVariableKey.WORKFLOW_EXECUTION_ID: application_generate_entity.workflow_execution_id, - }, + workflow_system_variables=SystemVariable( + files=application_generate_entity.files, + user_id=user_session_id, + app_id=application_generate_entity.app_config.app_id, + workflow_id=workflow.id, + workflow_execution_id=application_generate_entity.workflow_execution_id, + ), workflow_info=CycleManagerWorkflowInfo( workflow_id=workflow.id, workflow_type=WorkflowType(workflow.type), diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 17b9ac5827..2f4d234ecd 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -62,6 +62,7 @@ from core.workflow.graph_engine.entities.event import ( from core.workflow.graph_engine.entities.graph import Graph from core.workflow.nodes import NodeType from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING +from core.workflow.system_variable import SystemVariable from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db @@ -166,7 +167,7 @@ class WorkflowBasedAppRunner(AppRunner): # init variable pool variable_pool = VariablePool( - system_variables={}, + system_variables=SystemVariable.empty(), user_inputs={}, environment_variables=workflow.environment_variables, ) @@ -263,7 +264,7 @@ class WorkflowBasedAppRunner(AppRunner): # init variable pool variable_pool = VariablePool( - system_variables={}, + system_variables=SystemVariable.empty(), user_inputs={}, environment_variables=workflow.environment_variables, ) diff --git a/api/core/prompt/advanced_prompt_transform.py b/api/core/prompt/advanced_prompt_transform.py index 25964ae063..0f0fe65f27 100644 --- a/api/core/prompt/advanced_prompt_transform.py +++ b/api/core/prompt/advanced_prompt_transform.py @@ -158,7 +158,7 @@ class AdvancedPromptTransform(PromptTransform): if prompt_item.edition_type == "basic" or not prompt_item.edition_type: if self.with_variable_tmpl: - vp = VariablePool() + vp = VariablePool.empty() for k, v in inputs.items(): if k.startswith("#"): vp.add(k[1:-1].split("."), v) diff --git a/api/core/workflow/system_variable.py b/api/core/workflow/system_variable.py new file mode 100644 index 0000000000..df90c16596 --- /dev/null +++ b/api/core/workflow/system_variable.py @@ -0,0 +1,89 @@ +from collections.abc import Sequence +from typing import Any + +from pydantic import AliasChoices, BaseModel, ConfigDict, Field, model_validator + +from core.file.models import File +from core.workflow.enums import SystemVariableKey + + +class SystemVariable(BaseModel): + """A model for managing system variables. + + Fields with a value of `None` are treated as absent and will not be included + in the variable pool. + """ + + model_config = ConfigDict( + extra="forbid", + serialize_by_alias=True, + validate_by_alias=True, + ) + + user_id: str | None = None + + # Ideally, `app_id` and `workflow_id` should be required and not `None`. + # However, there are scenarios in the codebase where these fields are not set. + # To maintain compatibility, they are marked as optional here. + app_id: str | None = None + workflow_id: str | None = None + + files: Sequence[File] = Field(default_factory=list) + + # NOTE: The `workflow_execution_id` field was previously named `workflow_run_id`. + # To maintain compatibility with existing workflows, it must be serialized + # as `workflow_run_id` in dictionaries or JSON objects, and also referenced + # as `workflow_run_id` in the variable pool. + workflow_execution_id: str | None = Field( + validation_alias=AliasChoices("workflow_execution_id", "workflow_run_id"), + serialization_alias="workflow_run_id", + default=None, + ) + # Chatflow related fields. + query: str | None = None + conversation_id: str | None = None + dialogue_count: int | None = None + + @model_validator(mode="before") + @classmethod + def validate_json_fields(cls, data): + if isinstance(data, dict): + # For JSON validation, only allow workflow_run_id + if "workflow_execution_id" in data and "workflow_run_id" not in data: + # This is likely from direct instantiation, allow it + return data + elif "workflow_execution_id" in data and "workflow_run_id" in data: + # Both present, remove workflow_execution_id + data = data.copy() + data.pop("workflow_execution_id") + return data + return data + + @classmethod + def empty(cls) -> "SystemVariable": + return cls() + + def to_dict(self) -> dict[SystemVariableKey, Any]: + # NOTE: This method is provided for compatibility with legacy code. + # New code should use the `SystemVariable` object directly instead of converting + # it to a dictionary, as this conversion results in the loss of type information + # for each key, making static analysis more difficult. + + d: dict[SystemVariableKey, Any] = { + SystemVariableKey.FILES: self.files, + } + if self.user_id is not None: + d[SystemVariableKey.USER_ID] = self.user_id + if self.app_id is not None: + d[SystemVariableKey.APP_ID] = self.app_id + if self.workflow_id is not None: + d[SystemVariableKey.WORKFLOW_ID] = self.workflow_id + if self.workflow_execution_id is not None: + d[SystemVariableKey.WORKFLOW_EXECUTION_ID] = self.workflow_execution_id + if self.query is not None: + d[SystemVariableKey.QUERY] = self.query + if self.conversation_id is not None: + d[SystemVariableKey.CONVERSATION_ID] = self.conversation_id + if self.dialogue_count is not None: + d[SystemVariableKey.DIALOGUE_COUNT] = self.dialogue_count + return d diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 0aab2426af..3089ca1846 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -26,6 +26,7 @@ from core.workflow.entities.workflow_node_execution import ( from core.workflow.enums import SystemVariableKey from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.system_variable import SystemVariable from core.workflow.workflow_entry import WorkflowEntry from libs.datetime_utils import naive_utc_now @@ -43,7 +44,7 @@ class WorkflowCycleManager: self, *, application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity], - workflow_system_variables: dict[SystemVariableKey, Any], + workflow_system_variables: SystemVariable, workflow_info: CycleManagerWorkflowInfo, workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, @@ -56,17 +57,22 @@ class WorkflowCycleManager: def handle_workflow_run_start(self) -> WorkflowExecution: inputs = {**self._application_generate_entity.inputs} - for key, value in (self._workflow_system_variables or {}).items(): - if key.value == "conversation": - continue - inputs[f"sys.{key.value}"] = value + + # Iterate over SystemVariable fields using Pydantic's model_fields + if self._workflow_system_variables: + for field_name, value in self._workflow_system_variables.to_dict(): + if field_name == SystemVariableKey.CONVERSATION_ID: + continue + inputs[f"sys.{field_name}"] = value # handle special values inputs = dict(WorkflowEntry.handle_special_values(inputs) or {}) # init workflow run # TODO: This workflow_run_id should always not be None, maybe we can use a more elegant way to handle this - execution_id = str(self._workflow_system_variables.get(SystemVariableKey.WORKFLOW_EXECUTION_ID) or uuid4()) + execution_id = str( + self._workflow_system_variables.workflow_execution_id if self._workflow_system_variables else None + ) or str(uuid4()) execution = WorkflowExecution.new( id_=execution_id, workflow_id=self._workflow_info.workflow_id, diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 2be57fd51c..b013f0085b 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -18,7 +18,6 @@ from core.variables import Variable from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus -from core.workflow.enums import SystemVariableKey from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.graph_engine.entities.event import InNodeEvent from core.workflow.nodes import NodeType @@ -28,6 +27,7 @@ from core.workflow.nodes.event import RunCompletedEvent from core.workflow.nodes.event.types import NodeEvent from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from core.workflow.nodes.start.entities import StartNodeData +from core.workflow.system_variable import SystemVariable from core.workflow.workflow_entry import WorkflowEntry from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated from extensions.ext_database import db @@ -357,7 +357,7 @@ class WorkflowService: else: variable_pool = VariablePool( - system_variables={}, + system_variables=SystemVariable.empty(), user_inputs=user_inputs, environment_variables=draft_workflow.environment_variables, conversation_variables=[], @@ -671,33 +671,25 @@ def _setup_variable_pool( ): # Only inject system variables for START node type. if node_type == NodeType.START: - # Create a variable pool. - system_inputs: dict[SystemVariableKey, Any] = { - # From inputs: - SystemVariableKey.FILES: files, - SystemVariableKey.USER_ID: user_id, - # From workflow model - SystemVariableKey.APP_ID: workflow.app_id, - SystemVariableKey.WORKFLOW_ID: workflow.id, - # Randomly generated. - SystemVariableKey.WORKFLOW_EXECUTION_ID: str(uuid.uuid4()), - } + system_variable = SystemVariable( + user_id=user_id, + app_id=workflow.app_id, + workflow_id=workflow.id, + files=files or [], + workflow_execution_id=str(uuid.uuid4()), + ) # Only add chatflow-specific variables for non-workflow types if workflow.type != WorkflowType.WORKFLOW.value: - system_inputs.update( - { - SystemVariableKey.QUERY: query, - SystemVariableKey.CONVERSATION_ID: conversation_id, - SystemVariableKey.DIALOGUE_COUNT: 0, - } - ) + system_variable.query = query + system_variable.conversation_id = conversation_id + system_variable.dialogue_count = 0 else: - system_inputs = {} + system_variable = SystemVariable.empty() # init variable pool variable_pool = VariablePool( - system_variables=system_inputs, + system_variables=system_variable, user_inputs=user_inputs, environment_variables=workflow.environment_variables, conversation_variables=conversation_variables, diff --git a/api/tests/integration_tests/workflow/nodes/test_code.py b/api/tests/integration_tests/workflow/nodes/test_code.py index 13d78c2d83..90bb04f649 100644 --- a/api/tests/integration_tests/workflow/nodes/test_code.py +++ b/api/tests/integration_tests/workflow/nodes/test_code.py @@ -9,12 +9,12 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.code.entities import CodeNodeData +from core.workflow.system_variable import SystemVariable from models.enums import UserFrom from models.workflow import WorkflowType from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock @@ -50,7 +50,7 @@ def init_code_node(code_config: dict): # construct variable pool variable_pool = VariablePool( - system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + system_variables=SystemVariable(user_id="aaa", files=[]), user_inputs={}, environment_variables=[], conversation_variables=[], diff --git a/api/tests/integration_tests/workflow/nodes/test_http.py b/api/tests/integration_tests/workflow/nodes/test_http.py index 1ab0cc2451..50e726febf 100644 --- a/api/tests/integration_tests/workflow/nodes/test_http.py +++ b/api/tests/integration_tests/workflow/nodes/test_http.py @@ -6,11 +6,11 @@ import pytest from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.variable_pool import VariablePool -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.http_request.node import HttpRequestNode +from core.workflow.system_variable import SystemVariable from models.enums import UserFrom from models.workflow import WorkflowType from tests.integration_tests.workflow.nodes.__mock.http import setup_http_mock @@ -44,7 +44,7 @@ def init_http_node(config: dict): # construct variable pool variable_pool = VariablePool( - system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + system_variables=SystemVariable(user_id="aaa", files=[]), user_inputs={}, environment_variables=[], conversation_variables=[], diff --git a/api/tests/integration_tests/workflow/nodes/test_llm.py b/api/tests/integration_tests/workflow/nodes/test_llm.py index 389d1071f3..314f9eaab1 100644 --- a/api/tests/integration_tests/workflow/nodes/test_llm.py +++ b/api/tests/integration_tests/workflow/nodes/test_llm.py @@ -14,12 +14,12 @@ from core.model_runtime.entities.llm_entities import LLMResult, LLMUsage from core.model_runtime.entities.message_entities import AssistantPromptMessage from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.event import RunCompletedEvent from core.workflow.nodes.llm.node import LLMNode +from core.workflow.system_variable import SystemVariable from extensions.ext_database import db from models.enums import UserFrom from models.workflow import WorkflowType @@ -63,12 +63,14 @@ def init_llm_node(config: dict) -> LLMNode: # construct variable pool variable_pool = VariablePool( - system_variables={ - SystemVariableKey.QUERY: "what's the weather today?", - SystemVariableKey.FILES: [], - SystemVariableKey.CONVERSATION_ID: "abababa", - SystemVariableKey.USER_ID: "aaa", - }, + system_variables=SystemVariable( + user_id="aaa", + app_id=app_id, + workflow_id=workflow_id, + files=[], + query="what's the weather today?", + conversation_id="abababa", + ), user_inputs={}, environment_variables=[], conversation_variables=[], diff --git a/api/tests/integration_tests/workflow/nodes/test_parameter_extractor.py b/api/tests/integration_tests/workflow/nodes/test_parameter_extractor.py index 0df8e8b146..dd8466afa6 100644 --- a/api/tests/integration_tests/workflow/nodes/test_parameter_extractor.py +++ b/api/tests/integration_tests/workflow/nodes/test_parameter_extractor.py @@ -8,11 +8,11 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.model_runtime.entities import AssistantPromptMessage from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.parameter_extractor.parameter_extractor_node import ParameterExtractorNode +from core.workflow.system_variable import SystemVariable from extensions.ext_database import db from models.enums import UserFrom from tests.integration_tests.workflow.nodes.__mock.model import get_mocked_fetch_model_config @@ -64,12 +64,9 @@ def init_parameter_extractor_node(config: dict): # construct variable pool variable_pool = VariablePool( - system_variables={ - SystemVariableKey.QUERY: "what's the weather in SF", - SystemVariableKey.FILES: [], - SystemVariableKey.CONVERSATION_ID: "abababa", - SystemVariableKey.USER_ID: "aaa", - }, + system_variables=SystemVariable( + user_id="aaa", files=[], query="what's the weather in SF", conversation_id="abababa" + ), user_inputs={}, environment_variables=[], conversation_variables=[], diff --git a/api/tests/integration_tests/workflow/nodes/test_template_transform.py b/api/tests/integration_tests/workflow/nodes/test_template_transform.py index a5f2677a59..1f617fc92d 100644 --- a/api/tests/integration_tests/workflow/nodes/test_template_transform.py +++ b/api/tests/integration_tests/workflow/nodes/test_template_transform.py @@ -6,11 +6,11 @@ import pytest from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode +from core.workflow.system_variable import SystemVariable from models.enums import UserFrom from models.workflow import WorkflowType from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock @@ -61,7 +61,7 @@ def test_execute_code(setup_code_executor_mock): # construct variable pool variable_pool = VariablePool( - system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + system_variables=SystemVariable(user_id="aaa", files=[]), user_inputs={}, environment_variables=[], conversation_variables=[], diff --git a/api/tests/integration_tests/workflow/nodes/test_tool.py b/api/tests/integration_tests/workflow/nodes/test_tool.py index 039beedafe..6907e0163e 100644 --- a/api/tests/integration_tests/workflow/nodes/test_tool.py +++ b/api/tests/integration_tests/workflow/nodes/test_tool.py @@ -6,12 +6,12 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.tools.utils.configuration import ToolParameterConfigurationManager from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.event.event import RunCompletedEvent from core.workflow.nodes.tool.tool_node import ToolNode +from core.workflow.system_variable import SystemVariable from models.enums import UserFrom from models.workflow import WorkflowType @@ -44,7 +44,7 @@ def init_tool_node(config: dict): # construct variable pool variable_pool = VariablePool( - system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + system_variables=SystemVariable(user_id="aaa", files=[]), user_inputs={}, environment_variables=[], conversation_variables=[], diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py index c288a5fa13..ed4e42425e 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py @@ -8,7 +8,6 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.node_entities import NodeRunResult, WorkflowNodeExecutionMetadataKey from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.event import ( BaseNodeEvent, GraphRunFailedEvent, @@ -27,6 +26,7 @@ from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.event import RunCompletedEvent, RunStreamChunkEvent from core.workflow.nodes.llm.node import LLMNode from core.workflow.nodes.question_classifier.question_classifier_node import QuestionClassifierNode +from core.workflow.system_variable import SystemVariable from models.enums import UserFrom from models.workflow import WorkflowType @@ -171,7 +171,8 @@ def test_run_parallel_in_workflow(mock_close, mock_remove): graph = Graph.init(graph_config=graph_config) variable_pool = VariablePool( - system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, user_inputs={"query": "hi"} + system_variables=SystemVariable(user_id="aaa", app_id="1", workflow_id="1", files=[]), + user_inputs={"query": "hi"}, ) graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()) @@ -293,12 +294,12 @@ def test_run_parallel_in_chatflow(mock_close, mock_remove): graph = Graph.init(graph_config=graph_config) variable_pool = VariablePool( - system_variables={ - SystemVariableKey.QUERY: "what's the weather in SF", - SystemVariableKey.FILES: [], - SystemVariableKey.CONVERSATION_ID: "abababa", - SystemVariableKey.USER_ID: "aaa", - }, + system_variables=SystemVariable( + user_id="aaa", + files=[], + query="what's the weather in SF", + conversation_id="abababa", + ), user_inputs={}, ) @@ -474,12 +475,12 @@ def test_run_branch(mock_close, mock_remove): graph = Graph.init(graph_config=graph_config) variable_pool = VariablePool( - system_variables={ - SystemVariableKey.QUERY: "hi", - SystemVariableKey.FILES: [], - SystemVariableKey.CONVERSATION_ID: "abababa", - SystemVariableKey.USER_ID: "aaa", - }, + system_variables=SystemVariable( + user_id="aaa", + files=[], + query="hi", + conversation_id="abababa", + ), user_inputs={"uid": "takato"}, ) @@ -804,18 +805,22 @@ def test_condition_parallel_correct_output(mock_close, mock_remove, app): # construct variable pool pool = VariablePool( - system_variables={ - SystemVariableKey.QUERY: "dify", - SystemVariableKey.FILES: [], - SystemVariableKey.CONVERSATION_ID: "abababa", - SystemVariableKey.USER_ID: "1", - }, + system_variables=SystemVariable( + user_id="1", + files=[], + query="dify", + conversation_id="abababa", + ), user_inputs={}, environment_variables=[], ) pool.add(["pe", "list_output"], ["dify-1", "dify-2"]) variable_pool = VariablePool( - system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, user_inputs={"query": "hi"} + system_variables=SystemVariable( + user_id="aaa", + files=[], + ), + user_inputs={"query": "hi"}, ) graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()) diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index b7f78d91fa..85ff4f9c05 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -5,11 +5,11 @@ from unittest.mock import MagicMock from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.answer.answer_node import AnswerNode +from core.workflow.system_variable import SystemVariable from extensions.ext_database import db from models.enums import UserFrom from models.workflow import WorkflowType @@ -51,7 +51,7 @@ def test_execute_answer(): # construct variable pool pool = VariablePool( - system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + system_variables=SystemVariable(user_id="aaa", files=[]), user_inputs={}, environment_variables=[], ) diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py index c3a3818655..137e8b889d 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py @@ -3,7 +3,6 @@ from collections.abc import Generator from datetime import UTC, datetime from core.workflow.entities.variable_pool import VariablePool -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.event import ( GraphEngineEvent, NodeRunStartedEvent, @@ -15,6 +14,7 @@ from core.workflow.graph_engine.entities.runtime_route_state import RouteNodeSta from core.workflow.nodes.answer.answer_stream_processor import AnswerStreamProcessor from core.workflow.nodes.enums import NodeType from core.workflow.nodes.start.entities import StartNodeData +from core.workflow.system_variable import SystemVariable def _recursive_process(graph: Graph, next_node_id: str) -> Generator[GraphEngineEvent, None, None]: @@ -180,12 +180,12 @@ def test_process(): graph = Graph.init(graph_config=graph_config) variable_pool = VariablePool( - system_variables={ - SystemVariableKey.QUERY: "what's the weather in SF", - SystemVariableKey.FILES: [], - SystemVariableKey.CONVERSATION_ID: "abababa", - SystemVariableKey.USER_ID: "aaa", - }, + system_variables=SystemVariable( + user_id="aaa", + files=[], + query="what's the weather in SF", + conversation_id="abababa", + ), user_inputs={}, ) diff --git a/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py b/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py index d066fc1e33..bb6d72f51e 100644 --- a/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py +++ b/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py @@ -7,12 +7,13 @@ from core.workflow.nodes.http_request import ( ) from core.workflow.nodes.http_request.entities import HttpRequestNodeTimeout from core.workflow.nodes.http_request.executor import Executor +from core.workflow.system_variable import SystemVariable def test_executor_with_json_body_and_number_variable(): # Prepare the variable pool variable_pool = VariablePool( - system_variables={}, + system_variables=SystemVariable.empty(), user_inputs={}, ) variable_pool.add(["pre_node_id", "number"], 42) @@ -65,7 +66,7 @@ def test_executor_with_json_body_and_number_variable(): def test_executor_with_json_body_and_object_variable(): # Prepare the variable pool variable_pool = VariablePool( - system_variables={}, + system_variables=SystemVariable.empty(), user_inputs={}, ) variable_pool.add(["pre_node_id", "object"], {"name": "John Doe", "age": 30, "email": "john@example.com"}) @@ -120,7 +121,7 @@ def test_executor_with_json_body_and_object_variable(): def test_executor_with_json_body_and_nested_object_variable(): # Prepare the variable pool variable_pool = VariablePool( - system_variables={}, + system_variables=SystemVariable.empty(), user_inputs={}, ) variable_pool.add(["pre_node_id", "object"], {"name": "John Doe", "age": 30, "email": "john@example.com"}) @@ -174,7 +175,7 @@ def test_executor_with_json_body_and_nested_object_variable(): def test_extract_selectors_from_template_with_newline(): - variable_pool = VariablePool() + variable_pool = VariablePool(system_variables=SystemVariable.empty()) variable_pool.add(("node_id", "custom_query"), "line1\nline2") node_data = HttpRequestNodeData( title="Test JSON Body with Nested Object Variable", @@ -201,7 +202,7 @@ def test_extract_selectors_from_template_with_newline(): def test_executor_with_form_data(): # Prepare the variable pool variable_pool = VariablePool( - system_variables={}, + system_variables=SystemVariable.empty(), user_inputs={}, ) variable_pool.add(["pre_node_id", "text_field"], "Hello, World!") @@ -280,7 +281,11 @@ def test_init_headers(): authorization=HttpRequestNodeAuthorization(type="no-auth"), ) timeout = HttpRequestNodeTimeout(connect=10, read=30, write=30) - return Executor(node_data=node_data, timeout=timeout, variable_pool=VariablePool()) + return Executor( + node_data=node_data, + timeout=timeout, + variable_pool=VariablePool(system_variables=SystemVariable.empty()), + ) executor = create_executor("aa\n cc:") executor._init_headers() @@ -310,7 +315,11 @@ def test_init_params(): authorization=HttpRequestNodeAuthorization(type="no-auth"), ) timeout = HttpRequestNodeTimeout(connect=10, read=30, write=30) - return Executor(node_data=node_data, timeout=timeout, variable_pool=VariablePool()) + return Executor( + node_data=node_data, + timeout=timeout, + variable_pool=VariablePool(system_variables=SystemVariable.empty()), + ) # Test basic key-value pairs executor = create_executor("key1:value1\nkey2:value2") diff --git a/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.py b/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.py index 7fd32a4826..33f9251a72 100644 --- a/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.py @@ -15,6 +15,7 @@ from core.workflow.nodes.http_request import ( HttpRequestNodeBody, HttpRequestNodeData, ) +from core.workflow.system_variable import SystemVariable from models.enums import UserFrom from models.workflow import WorkflowType @@ -40,7 +41,7 @@ def test_http_request_node_binary_file(monkeypatch): ), ) variable_pool = VariablePool( - system_variables={}, + system_variables=SystemVariable.empty(), user_inputs={}, ) variable_pool.add( @@ -128,7 +129,7 @@ def test_http_request_node_form_with_file(monkeypatch): ), ) variable_pool = VariablePool( - system_variables={}, + system_variables=SystemVariable.empty(), user_inputs={}, ) variable_pool.add( @@ -223,7 +224,7 @@ def test_http_request_node_form_with_multiple_files(monkeypatch): ) variable_pool = VariablePool( - system_variables={}, + system_variables=SystemVariable.empty(), user_inputs={}, ) diff --git a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py index 362072a3db..17c23b7735 100644 --- a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py +++ b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py @@ -7,7 +7,6 @@ from core.variables.segments import ArrayAnySegment, ArrayStringSegment from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState @@ -15,6 +14,7 @@ from core.workflow.nodes.event import RunCompletedEvent from core.workflow.nodes.iteration.entities import ErrorHandleMode from core.workflow.nodes.iteration.iteration_node import IterationNode from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode +from core.workflow.system_variable import SystemVariable from models.enums import UserFrom from models.workflow import WorkflowType @@ -151,12 +151,12 @@ def test_run(): # construct variable pool pool = VariablePool( - system_variables={ - SystemVariableKey.QUERY: "dify", - SystemVariableKey.FILES: [], - SystemVariableKey.CONVERSATION_ID: "abababa", - SystemVariableKey.USER_ID: "1", - }, + system_variables=SystemVariable( + user_id="1", + files=[], + query="dify", + conversation_id="abababa", + ), user_inputs={}, environment_variables=[], ) @@ -368,12 +368,12 @@ def test_run_parallel(): # construct variable pool pool = VariablePool( - system_variables={ - SystemVariableKey.QUERY: "dify", - SystemVariableKey.FILES: [], - SystemVariableKey.CONVERSATION_ID: "abababa", - SystemVariableKey.USER_ID: "1", - }, + system_variables=SystemVariable( + user_id="1", + files=[], + query="dify", + conversation_id="abababa", + ), user_inputs={}, environment_variables=[], ) @@ -584,12 +584,12 @@ def test_iteration_run_in_parallel_mode(): # construct variable pool pool = VariablePool( - system_variables={ - SystemVariableKey.QUERY: "dify", - SystemVariableKey.FILES: [], - SystemVariableKey.CONVERSATION_ID: "abababa", - SystemVariableKey.USER_ID: "1", - }, + system_variables=SystemVariable( + user_id="1", + files=[], + query="dify", + conversation_id="abababa", + ), user_inputs={}, environment_variables=[], ) @@ -808,12 +808,12 @@ def test_iteration_run_error_handle(): # construct variable pool pool = VariablePool( - system_variables={ - SystemVariableKey.QUERY: "dify", - SystemVariableKey.FILES: [], - SystemVariableKey.CONVERSATION_ID: "abababa", - SystemVariableKey.USER_ID: "1", - }, + system_variables=SystemVariable( + user_id="1", + files=[], + query="dify", + conversation_id="abababa", + ), user_inputs={}, environment_variables=[], ) diff --git a/api/tests/unit_tests/core/workflow/nodes/llm/test_node.py b/api/tests/unit_tests/core/workflow/nodes/llm/test_node.py index 336c2befcc..fefad0ec95 100644 --- a/api/tests/unit_tests/core/workflow/nodes/llm/test_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/llm/test_node.py @@ -36,6 +36,7 @@ from core.workflow.nodes.llm.entities import ( ) from core.workflow.nodes.llm.file_saver import LLMFileSaver from core.workflow.nodes.llm.node import LLMNode +from core.workflow.system_variable import SystemVariable from models.enums import UserFrom from models.provider import ProviderType from models.workflow import WorkflowType @@ -104,7 +105,7 @@ def graph() -> Graph: @pytest.fixture def graph_runtime_state() -> GraphRuntimeState: variable_pool = VariablePool( - system_variables={}, + system_variables=SystemVariable.empty(), user_inputs={}, ) return GraphRuntimeState( @@ -181,7 +182,7 @@ def test_fetch_files_with_file_segment(): related_id="1", storage_key="", ) - variable_pool = VariablePool() + variable_pool = VariablePool.empty() variable_pool.add(["sys", "files"], file) result = llm_utils.fetch_files(variable_pool=variable_pool, selector=["sys", "files"]) @@ -209,7 +210,7 @@ def test_fetch_files_with_array_file_segment(): storage_key="", ), ] - variable_pool = VariablePool() + variable_pool = VariablePool.empty() variable_pool.add(["sys", "files"], ArrayFileSegment(value=files)) result = llm_utils.fetch_files(variable_pool=variable_pool, selector=["sys", "files"]) @@ -217,7 +218,7 @@ def test_fetch_files_with_array_file_segment(): def test_fetch_files_with_none_segment(): - variable_pool = VariablePool() + variable_pool = VariablePool.empty() variable_pool.add(["sys", "files"], NoneSegment()) result = llm_utils.fetch_files(variable_pool=variable_pool, selector=["sys", "files"]) @@ -225,7 +226,7 @@ def test_fetch_files_with_none_segment(): def test_fetch_files_with_array_any_segment(): - variable_pool = VariablePool() + variable_pool = VariablePool.empty() variable_pool.add(["sys", "files"], ArrayAnySegment(value=[])) result = llm_utils.fetch_files(variable_pool=variable_pool, selector=["sys", "files"]) @@ -233,7 +234,7 @@ def test_fetch_files_with_array_any_segment(): def test_fetch_files_with_non_existent_variable(): - variable_pool = VariablePool() + variable_pool = VariablePool.empty() result = llm_utils.fetch_files(variable_pool=variable_pool, selector=["sys", "files"]) assert result == [] diff --git a/api/tests/unit_tests/core/workflow/nodes/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/test_answer.py index abc822e98b..44c31b212e 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_answer.py @@ -5,11 +5,11 @@ from unittest.mock import MagicMock from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.answer.answer_node import AnswerNode +from core.workflow.system_variable import SystemVariable from extensions.ext_database import db from models.enums import UserFrom from models.workflow import WorkflowType @@ -53,7 +53,7 @@ def test_execute_answer(): # construct variable pool variable_pool = VariablePool( - system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + system_variables=SystemVariable(user_id="aaa", files=[]), user_inputs={}, environment_variables=[], conversation_variables=[], diff --git a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py index a6c553faf0..27eec08cae 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py @@ -17,6 +17,7 @@ from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntime from core.workflow.graph_engine.graph_engine import GraphEngine from core.workflow.nodes.event.event import RunCompletedEvent, RunStreamChunkEvent from core.workflow.nodes.llm.node import LLMNode +from core.workflow.system_variable import SystemVariable from models.enums import UserFrom from models.workflow import WorkflowType @@ -167,12 +168,12 @@ class ContinueOnErrorTestHelper: """Helper method to create a graph engine instance for testing""" graph = Graph.init(graph_config=graph_config) variable_pool = VariablePool( - system_variables={ - SystemVariableKey.QUERY: "clear", - SystemVariableKey.FILES: [], - SystemVariableKey.CONVERSATION_ID: "abababa", - SystemVariableKey.USER_ID: "aaa", - }, + system_variables=SystemVariable( + user_id="aaa", + files=[], + query="clear", + conversation_id="abababa", + ), user_inputs=user_inputs or {"uid": "takato"}, ) graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()) diff --git a/api/tests/unit_tests/core/workflow/nodes/test_if_else.py b/api/tests/unit_tests/core/workflow/nodes/test_if_else.py index c4e411f9d6..167a92484d 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_if_else.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_if_else.py @@ -7,12 +7,12 @@ from core.file import File, FileTransferMethod, FileType from core.variables import ArrayFileSegment from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.if_else.entities import IfElseNodeData from core.workflow.nodes.if_else.if_else_node import IfElseNode +from core.workflow.system_variable import SystemVariable from core.workflow.utils.condition.entities import Condition, SubCondition, SubVariableCondition from extensions.ext_database import db from models.enums import UserFrom @@ -37,9 +37,7 @@ def test_execute_if_else_result_true(): ) # construct variable pool - pool = VariablePool( - system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, user_inputs={} - ) + pool = VariablePool(system_variables=SystemVariable(user_id="aaa", files=[]), user_inputs={}) pool.add(["start", "array_contains"], ["ab", "def"]) pool.add(["start", "array_not_contains"], ["ac", "def"]) pool.add(["start", "contains"], "cabcde") @@ -157,7 +155,7 @@ def test_execute_if_else_result_false(): # construct variable pool pool = VariablePool( - system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + system_variables=SystemVariable(user_id="aaa", files=[]), user_inputs={}, environment_variables=[], ) diff --git a/api/tests/unit_tests/core/workflow/nodes/tool/test_tool_node.py b/api/tests/unit_tests/core/workflow/nodes/tool/test_tool_node.py index e121f6338c..2776e57777 100644 --- a/api/tests/unit_tests/core/workflow/nodes/tool/test_tool_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/tool/test_tool_node.py @@ -15,6 +15,7 @@ from core.workflow.nodes.enums import ErrorStrategy from core.workflow.nodes.event import RunCompletedEvent from core.workflow.nodes.tool import ToolNode from core.workflow.nodes.tool.entities import ToolNodeData +from core.workflow.system_variable import SystemVariable from models import UserFrom, WorkflowType @@ -34,7 +35,7 @@ def _create_tool_node(): version="1", ) variable_pool = VariablePool( - system_variables={}, + system_variables=SystemVariable.empty(), user_inputs={}, ) node = ToolNode( diff --git a/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py b/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py index deb3e29b86..62e3e37104 100644 --- a/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py +++ b/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py @@ -7,12 +7,12 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.variables import ArrayStringVariable, StringVariable from core.workflow.conversation_variable_updater import ConversationVariableUpdater from core.workflow.entities.variable_pool import VariablePool -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode from core.workflow.nodes.variable_assigner.v1.node_data import WriteMode +from core.workflow.system_variable import SystemVariable from models.enums import UserFrom from models.workflow import WorkflowType @@ -68,7 +68,7 @@ def test_overwrite_string_variable(): # construct variable pool variable_pool = VariablePool( - system_variables={SystemVariableKey.CONVERSATION_ID: conversation_id}, + system_variables=SystemVariable(conversation_id=conversation_id), user_inputs={}, environment_variables=[], conversation_variables=[conversation_variable], @@ -165,7 +165,7 @@ def test_append_variable_to_array(): conversation_id = str(uuid.uuid4()) variable_pool = VariablePool( - system_variables={SystemVariableKey.CONVERSATION_ID: conversation_id}, + system_variables=SystemVariable(conversation_id=conversation_id), user_inputs={}, environment_variables=[], conversation_variables=[conversation_variable], @@ -256,7 +256,7 @@ def test_clear_array(): conversation_id = str(uuid.uuid4()) variable_pool = VariablePool( - system_variables={SystemVariableKey.CONVERSATION_ID: conversation_id}, + system_variables=SystemVariable(conversation_id=conversation_id), user_inputs={}, environment_variables=[], conversation_variables=[conversation_variable], diff --git a/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py b/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py index 7c5597dd89..a3a90b0599 100644 --- a/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py +++ b/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py @@ -5,12 +5,12 @@ from uuid import uuid4 from core.app.entities.app_invoke_entities import InvokeFrom from core.variables import ArrayStringVariable from core.workflow.entities.variable_pool import VariablePool -from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.variable_assigner.v2 import VariableAssignerNode from core.workflow.nodes.variable_assigner.v2.enums import InputType, Operation +from core.workflow.system_variable import SystemVariable from models.enums import UserFrom from models.workflow import WorkflowType @@ -109,7 +109,7 @@ def test_remove_first_from_array(): ) variable_pool = VariablePool( - system_variables={SystemVariableKey.CONVERSATION_ID: "conversation_id"}, + system_variables=SystemVariable(conversation_id="conversation_id"), user_inputs={}, environment_variables=[], conversation_variables=[conversation_variable], @@ -196,7 +196,7 @@ def test_remove_last_from_array(): ) variable_pool = VariablePool( - system_variables={SystemVariableKey.CONVERSATION_ID: "conversation_id"}, + system_variables=SystemVariable(conversation_id="conversation_id"), user_inputs={}, environment_variables=[], conversation_variables=[conversation_variable], @@ -275,7 +275,7 @@ def test_remove_first_from_empty_array(): ) variable_pool = VariablePool( - system_variables={SystemVariableKey.CONVERSATION_ID: "conversation_id"}, + system_variables=SystemVariable(conversation_id="conversation_id"), user_inputs={}, environment_variables=[], conversation_variables=[conversation_variable], @@ -354,7 +354,7 @@ def test_remove_last_from_empty_array(): ) variable_pool = VariablePool( - system_variables={SystemVariableKey.CONVERSATION_ID: "conversation_id"}, + system_variables=SystemVariable(conversation_id="conversation_id"), user_inputs={}, environment_variables=[], conversation_variables=[conversation_variable], diff --git a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py index 646de8bf3a..642bc810ba 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py @@ -18,10 +18,10 @@ from core.workflow.entities.workflow_node_execution import ( WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus, ) -from core.workflow.enums import SystemVariableKey from core.workflow.nodes import NodeType from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.system_variable import SystemVariable from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager from models.enums import CreatorUserRole from models.model import AppMode @@ -67,14 +67,14 @@ def real_app_generate_entity(): @pytest.fixture def real_workflow_system_variables(): - return { - SystemVariableKey.QUERY: "test query", - SystemVariableKey.CONVERSATION_ID: "test-conversation-id", - SystemVariableKey.USER_ID: "test-user-id", - SystemVariableKey.APP_ID: "test-app-id", - SystemVariableKey.WORKFLOW_ID: "test-workflow-id", - SystemVariableKey.WORKFLOW_EXECUTION_ID: "test-workflow-run-id", - } + return SystemVariable( + query="test query", + conversation_id="test-conversation-id", + user_id="test-user-id", + app_id="test-app-id", + workflow_id="test-workflow-id", + workflow_execution_id="test-workflow-run-id", + ) @pytest.fixture diff --git a/api/tests/unit_tests/core/workflow/utils/test_variable_utils.py b/api/tests/unit_tests/core/workflow/utils/test_variable_utils.py index f1cb937bb3..54bf6558bf 100644 --- a/api/tests/unit_tests/core/workflow/utils/test_variable_utils.py +++ b/api/tests/unit_tests/core/workflow/utils/test_variable_utils.py @@ -10,7 +10,7 @@ class TestAppendVariablesRecursively: def test_append_simple_dict_value(self): """Test appending a simple dictionary value""" - pool = VariablePool() + pool = VariablePool.empty() node_id = "test_node" variable_key_list = ["output"] variable_value = {"name": "John", "age": 30} @@ -33,7 +33,7 @@ class TestAppendVariablesRecursively: def test_append_object_segment_value(self): """Test appending an ObjectSegment value""" - pool = VariablePool() + pool = VariablePool.empty() node_id = "test_node" variable_key_list = ["result"] @@ -60,7 +60,7 @@ class TestAppendVariablesRecursively: def test_append_nested_dict_value(self): """Test appending a nested dictionary value""" - pool = VariablePool() + pool = VariablePool.empty() node_id = "test_node" variable_key_list = ["data"] @@ -97,7 +97,7 @@ class TestAppendVariablesRecursively: def test_append_non_dict_value(self): """Test appending a non-dictionary value (should not recurse)""" - pool = VariablePool() + pool = VariablePool.empty() node_id = "test_node" variable_key_list = ["simple"] variable_value = "simple_string" @@ -114,7 +114,7 @@ class TestAppendVariablesRecursively: def test_append_segment_non_object_value(self): """Test appending a Segment that is not ObjectSegment (should not recurse)""" - pool = VariablePool() + pool = VariablePool.empty() node_id = "test_node" variable_key_list = ["text"] variable_value = StringSegment(value="Hello World") @@ -132,7 +132,7 @@ class TestAppendVariablesRecursively: def test_append_empty_dict_value(self): """Test appending an empty dictionary value""" - pool = VariablePool() + pool = VariablePool.empty() node_id = "test_node" variable_key_list = ["empty"] variable_value: dict[str, Any] = {}