From 2edc837585c5fbc87329675ec907abbacf6324f4 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Wed, 4 Jun 2025 20:16:28 +0800 Subject: [PATCH] fix(api): adjust `UpdatedVariable` handling It's impossible to use Pydantic to restore subclasses of `Segment` from serialized dictionary (using discriminated union does not help, neither.) So migrate to save `variable_type` and `new_value` separately, instead of store it altogether as a `Segment` class. --- .../nodes/variable_assigner/common/helpers.py | 25 ++++++++++++++++--- .../workflow_draft_variable_service.py | 4 ++- api/services/workflow_service.py | 1 - 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/api/core/workflow/nodes/variable_assigner/common/helpers.py b/api/core/workflow/nodes/variable_assigner/common/helpers.py index 83f00cef7d..0d2822233e 100644 --- a/api/core/workflow/nodes/variable_assigner/common/helpers.py +++ b/api/core/workflow/nodes/variable_assigner/common/helpers.py @@ -5,6 +5,7 @@ from pydantic import BaseModel from core.variables import Segment from core.variables.consts import MIN_SELECTORS_LENGTH +from core.variables.types import SegmentType # Use double underscore (`__`) prefix for internal variables # to minimize risk of collision with user-defined variable names. @@ -14,7 +15,8 @@ _UPDATED_VARIABLES_KEY = "__updated_variables" class UpdatedVariable(BaseModel): name: str selector: Sequence[str] - new_value: Segment + value_type: SegmentType + new_value: Any _T = TypeVar("_T", bound=MutableMapping[str, Any]) @@ -24,7 +26,12 @@ def variable_to_processed_data(selector: Sequence[str], seg: Segment) -> Updated if len(selector) < MIN_SELECTORS_LENGTH: raise Exception("selector too short") node_id, var_name = selector[:2] - return UpdatedVariable(name=var_name, selector=list(selector[:2]), new_value=seg) + return UpdatedVariable( + name=var_name, + selector=list(selector[:2]), + value_type=seg.value_type, + new_value=seg.value, + ) def set_updated_variables(m: _T, updates: Sequence[UpdatedVariable]) -> _T: @@ -33,4 +40,16 @@ def set_updated_variables(m: _T, updates: Sequence[UpdatedVariable]) -> _T: def get_updated_variables(m: Mapping[str, Any]) -> Sequence[UpdatedVariable] | None: - return m.get(_UPDATED_VARIABLES_KEY, None) + updated_values = m.get(_UPDATED_VARIABLES_KEY, None) + if updated_values is None: + return None + result = [] + for items in updated_values: + if isinstance(items, UpdatedVariable): + result.append(items) + elif isinstance(items, dict): + items = UpdatedVariable.model_validate(items) + result.append(items) + else: + raise TypeError(f"Invalid updated variable: {items}, type={type(items)}") + return result diff --git a/api/services/workflow_draft_variable_service.py b/api/services/workflow_draft_variable_service.py index a15450ce27..870523877e 100644 --- a/api/services/workflow_draft_variable_service.py +++ b/api/services/workflow_draft_variable_service.py @@ -467,6 +467,7 @@ class DraftVariableSaver: def _build_from_variable_assigner_mapping(self, process_data: Mapping[str, Any]) -> list[WorkflowDraftVariable]: draft_vars: list[WorkflowDraftVariable] = [] updated_variables = get_updated_variables(process_data) or [] + for item in updated_variables: selector = item.selector if len(selector) < MIN_SELECTORS_LENGTH: @@ -476,11 +477,12 @@ class DraftVariableSaver: # We only save conversation variable here. if selector[0] != CONVERSATION_VARIABLE_NODE_ID: continue + segment = build_segment(item.new_value) draft_vars.append( WorkflowDraftVariable.new_conversation_variable( app_id=self._app_id, name=item.name, - value=item.new_value, + value=segment, ) ) return draft_vars diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 6f2f943603..2300246336 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -30,7 +30,6 @@ from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_M from core.workflow.workflow_entry import WorkflowEntry from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated from extensions.ext_database import db -from factories.variable_factory import segment_to_variable from models.account import Account from models.model import App, AppMode from models.tools import WorkflowToolProvider