test(api): add tests for variable resetting logics

pull/20699/head
QuantumGhost 11 months ago
parent a6efc8204a
commit 719986f763

@ -1,14 +1,18 @@
import json
import unittest import unittest
import uuid import uuid
import pytest import pytest
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from core.variables.variables import StringVariable
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
from core.workflow.nodes import NodeType
from factories.variable_factory import build_segment from factories.variable_factory import build_segment
from libs import datetime_utils
from models import db from models import db
from models.workflow import WorkflowDraftVariable from models.workflow import Workflow, WorkflowDraftVariable, WorkflowNodeExecutionModel
from services.workflow_draft_variable_service import DraftVarLoader, WorkflowDraftVariableService from services.workflow_draft_variable_service import DraftVarLoader, VariableResetError, WorkflowDraftVariableService
@pytest.mark.usefixtures("flask_req_ctx") @pytest.mark.usefixtures("flask_req_ctx")
@ -234,3 +238,262 @@ class TestDraftVariableLoader(unittest.TestCase):
assert sys_var.id == self._sys_var_id assert sys_var.id == self._sys_var_id
node1_var = next(v for v in variables if v.selector[0] == self._node1_id) node1_var = next(v for v in variables if v.selector[0] == self._node1_id)
assert node1_var.id == self._node_var_id assert node1_var.id == self._node_var_id
@pytest.mark.usefixtures("flask_req_ctx")
class TestWorkflowDraftVariableServiceResetVariable(unittest.TestCase):
"""Integration tests for reset_variable functionality using real database"""
_test_app_id: str
_test_tenant_id: str
_test_workflow_id: str
_session: Session
_node_id = "test_reset_node"
_node_exec_id: str
_workflow_node_exec_id: str
def setUp(self):
self._test_app_id = str(uuid.uuid4())
self._test_tenant_id = str(uuid.uuid4())
self._test_workflow_id = str(uuid.uuid4())
self._node_exec_id = str(uuid.uuid4())
self._workflow_node_exec_id = str(uuid.uuid4())
self._session: Session = db.session()
# Create a workflow node execution record with outputs
# Note: The WorkflowNodeExecutionModel.id should match the node_execution_id in WorkflowDraftVariable
self._workflow_node_execution = WorkflowNodeExecutionModel(
id=self._node_exec_id, # This should match the node_execution_id in the variable
tenant_id=self._test_tenant_id,
app_id=self._test_app_id,
workflow_id=self._test_workflow_id,
triggered_from="workflow-run",
workflow_run_id=str(uuid.uuid4()),
index=1,
node_execution_id=self._node_exec_id,
node_id=self._node_id,
node_type=NodeType.LLM.value,
title="Test Node",
inputs='{"input": "test input"}',
process_data='{"test_var": "process_value", "other_var": "other_process"}',
outputs='{"test_var": "output_value", "other_var": "other_output"}',
status="succeeded",
elapsed_time=1.5,
created_by_role="account",
created_by=str(uuid.uuid4()),
)
# Create conversation variables for the workflow
self._conv_variables = [
StringVariable(
id=str(uuid.uuid4()),
name="conv_var_1",
description="Test conversation variable 1",
value="default_value_1",
),
StringVariable(
id=str(uuid.uuid4()),
name="conv_var_2",
description="Test conversation variable 2",
value="default_value_2",
),
]
# Create test variables
self._node_var_with_exec = WorkflowDraftVariable.new_node_variable(
app_id=self._test_app_id,
node_id=self._node_id,
name="test_var",
value=build_segment("old_value"),
node_execution_id=self._node_exec_id,
)
self._node_var_with_exec.last_edited_at = datetime_utils.naive_utc_now()
self._node_var_without_exec = WorkflowDraftVariable.new_node_variable(
app_id=self._test_app_id,
node_id=self._node_id,
name="no_exec_var",
value=build_segment("some_value"),
node_execution_id="temp_exec_id",
)
# Manually set node_execution_id to None after creation
self._node_var_without_exec.node_execution_id = None
self._node_var_missing_exec = WorkflowDraftVariable.new_node_variable(
app_id=self._test_app_id,
node_id=self._node_id,
name="missing_exec_var",
value=build_segment("some_value"),
node_execution_id=str(uuid.uuid4()), # Use a valid UUID that doesn't exist in database
)
self._conv_var = WorkflowDraftVariable.new_conversation_variable(
app_id=self._test_app_id,
name="conv_var_1",
value=build_segment("old_conv_value"),
)
self._conv_var.last_edited_at = datetime_utils.naive_utc_now()
# Add all to database
db.session.add_all(
[
self._workflow_node_execution,
self._node_var_with_exec,
self._node_var_without_exec,
self._node_var_missing_exec,
self._conv_var,
]
)
db.session.flush()
# Store IDs for assertions
self._node_var_with_exec_id = self._node_var_with_exec.id
self._node_var_without_exec_id = self._node_var_without_exec.id
self._node_var_missing_exec_id = self._node_var_missing_exec.id
self._conv_var_id = self._conv_var.id
def _get_test_srv(self) -> WorkflowDraftVariableService:
return WorkflowDraftVariableService(session=self._session)
def _create_mock_workflow(self) -> Workflow:
"""Create a real workflow with conversation variables and graph"""
conversation_vars = self._conv_variables
# Create a simple graph with the test node
graph = {
"nodes": [{"id": "test_reset_node", "type": "llm", "title": "Test Node", "data": {"type": "llm"}}],
"edges": [],
}
workflow = Workflow.new(
tenant_id=str(uuid.uuid4()),
app_id=self._test_app_id,
type="workflow",
version="1.0",
graph=json.dumps(graph),
features="{}",
created_by=str(uuid.uuid4()),
environment_variables=[],
conversation_variables=conversation_vars,
)
return workflow
def tearDown(self):
self._session.rollback()
def test_reset_node_variable_with_valid_execution_record(self):
"""Test resetting a node variable with valid execution record - should restore from execution"""
srv = self._get_test_srv()
mock_workflow = self._create_mock_workflow()
# Get the variable before reset
variable = srv.get_variable(self._node_var_with_exec_id)
assert variable is not None
assert variable.get_value().value == "old_value"
assert variable.last_edited_at is not None
# Reset the variable
result = srv.reset_variable(mock_workflow, variable)
# Should return the updated variable
assert result is not None
assert result.id == self._node_var_with_exec_id
assert result.node_execution_id == self._workflow_node_execution.id
assert result.last_edited_at is None # Should be reset to None
# The returned variable should have the updated value from execution record
assert result.get_value().value == "output_value"
# Verify the variable was updated in database
updated_variable = srv.get_variable(self._node_var_with_exec_id)
assert updated_variable is not None
# The value should be updated from the execution record's outputs
assert updated_variable.get_value().value == "output_value"
assert updated_variable.last_edited_at is None
assert updated_variable.node_execution_id == self._workflow_node_execution.id
def test_reset_node_variable_with_no_execution_id(self):
"""Test resetting a node variable with no execution ID - should delete variable"""
srv = self._get_test_srv()
mock_workflow = self._create_mock_workflow()
# Get the variable before reset
variable = srv.get_variable(self._node_var_without_exec_id)
assert variable is not None
# Reset the variable
result = srv.reset_variable(mock_workflow, variable)
# Should return None (variable deleted)
assert result is None
# Verify the variable was deleted
deleted_variable = srv.get_variable(self._node_var_without_exec_id)
assert deleted_variable is None
def test_reset_node_variable_with_missing_execution_record(self):
"""Test resetting a node variable when execution record doesn't exist"""
srv = self._get_test_srv()
mock_workflow = self._create_mock_workflow()
# Get the variable before reset
variable = srv.get_variable(self._node_var_missing_exec_id)
assert variable is not None
# Reset the variable
result = srv.reset_variable(mock_workflow, variable)
# Should return None (variable deleted)
assert result is None
# Verify the variable was deleted
deleted_variable = srv.get_variable(self._node_var_missing_exec_id)
assert deleted_variable is None
def test_reset_conversation_variable(self):
"""Test resetting a conversation variable"""
srv = self._get_test_srv()
mock_workflow = self._create_mock_workflow()
# Get the variable before reset
variable = srv.get_variable(self._conv_var_id)
assert variable is not None
assert variable.get_value().value == "old_conv_value"
assert variable.last_edited_at is not None
# Reset the variable
result = srv.reset_variable(mock_workflow, variable)
# Should return the updated variable
assert result is not None
assert result.id == self._conv_var_id
assert result.last_edited_at is None # Should be reset to None
# Verify the variable was updated with default value from workflow
updated_variable = srv.get_variable(self._conv_var_id)
assert updated_variable is not None
# The value should be updated from the workflow's conversation variable default
assert updated_variable.get_value().value == "default_value_1"
assert updated_variable.last_edited_at is None
def test_reset_system_variable_raises_error(self):
"""Test that resetting a system variable raises an error"""
srv = self._get_test_srv()
mock_workflow = self._create_mock_workflow()
# Create a system variable
sys_var = WorkflowDraftVariable.new_sys_variable(
app_id=self._test_app_id,
name="sys_var",
value=build_segment("sys_value"),
node_execution_id=self._node_exec_id,
)
db.session.add(sys_var)
db.session.flush()
# Attempt to reset the system variable
with pytest.raises(VariableResetError) as exc_info:
srv.reset_variable(mock_workflow, sys_var)
assert "cannot reset system variable" in str(exc_info.value)
assert sys_var.id in str(exc_info.value)

@ -1,13 +1,21 @@
import dataclasses import dataclasses
import secrets import secrets
from unittest import mock from unittest import mock
from unittest.mock import Mock, patch
import pytest
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from services.workflow_draft_variable_service import DraftVariableSaver from models.enums import DraftVariableType
from models.workflow import Workflow, WorkflowDraftVariable, WorkflowNodeExecutionModel
from services.workflow_draft_variable_service import (
DraftVariableSaver,
VariableResetError,
WorkflowDraftVariableService,
)
class TestDraftVariableSaver: class TestDraftVariableSaver:
@ -24,6 +32,7 @@ class TestDraftVariableSaver:
node_id="test_node_id", node_id="test_node_id",
node_type=NodeType.START, node_type=NodeType.START,
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
node_execution_id="test_execution_id",
) )
assert saver._should_variable_be_visible("123_456", NodeType.IF_ELSE, "output") == False assert saver._should_variable_be_visible("123_456", NodeType.IF_ELSE, "output") == False
assert saver._should_variable_be_visible("123", NodeType.START, "output") == True assert saver._should_variable_be_visible("123", NodeType.START, "output") == True
@ -70,9 +79,142 @@ class TestDraftVariableSaver:
node_id=_NODE_ID, node_id=_NODE_ID,
node_type=NodeType.START, node_type=NodeType.START,
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
node_execution_id="test_execution_id",
) )
for idx, c in enumerate(cases, 1): for idx, c in enumerate(cases, 1):
fail_msg = f"Test case {c.name} failed, index={idx}" fail_msg = f"Test case {c.name} failed, index={idx}"
node_id, name = saver._normalize_variable_for_start_node(c.input_name) node_id, name = saver._normalize_variable_for_start_node(c.input_name)
assert node_id == c.expected_node_id, fail_msg assert node_id == c.expected_node_id, fail_msg
assert name == c.expected_name, fail_msg assert name == c.expected_name, fail_msg
class TestWorkflowDraftVariableService:
def _get_test_app_id(self):
suffix = secrets.token_hex(6)
return f"test_app_id_{suffix}"
def test_reset_conversation_variable(self):
"""Test resetting a conversation variable"""
mock_session = Mock(spec=Session)
service = WorkflowDraftVariableService(mock_session)
mock_workflow = Mock(spec=Workflow)
mock_workflow.app_id = self._get_test_app_id()
# Create mock variable
mock_variable = Mock(spec=WorkflowDraftVariable)
mock_variable.get_variable_type.return_value = DraftVariableType.CONVERSATION
mock_variable.id = "var-id"
mock_variable.name = "test_var"
# Mock the _reset_conv_var method
expected_result = Mock(spec=WorkflowDraftVariable)
with patch.object(service, "_reset_conv_var", return_value=expected_result) as mock_reset_conv:
result = service.reset_variable(mock_workflow, mock_variable)
mock_reset_conv.assert_called_once_with(mock_workflow, mock_variable)
assert result == expected_result
def test_reset_node_variable_with_no_execution_id(self):
"""Test resetting a node variable with no execution ID - should delete variable"""
mock_session = Mock(spec=Session)
service = WorkflowDraftVariableService(mock_session)
mock_workflow = Mock(spec=Workflow)
mock_workflow.app_id = self._get_test_app_id()
# Create mock variable with no execution ID
mock_variable = Mock(spec=WorkflowDraftVariable)
mock_variable.get_variable_type.return_value = DraftVariableType.NODE
mock_variable.node_execution_id = None
mock_variable.id = "var-id"
mock_variable.name = "test_var"
result = service._reset_node_var(mock_workflow, mock_variable)
# Should delete the variable and return None
mock_session.delete.assert_called_once_with(instance=mock_variable)
mock_session.flush.assert_called_once()
assert result is None
def test_reset_node_variable_with_missing_execution_record(self):
"""Test resetting a node variable when execution record doesn't exist"""
mock_session = Mock(spec=Session)
service = WorkflowDraftVariableService(mock_session)
mock_workflow = Mock(spec=Workflow)
mock_workflow.app_id = self._get_test_app_id()
# Create mock variable with execution ID
mock_variable = Mock(spec=WorkflowDraftVariable)
mock_variable.get_variable_type.return_value = DraftVariableType.NODE
mock_variable.node_execution_id = "exec-id"
mock_variable.id = "var-id"
mock_variable.name = "test_var"
# Mock session.scalars to return None (no execution record found)
mock_scalars = Mock()
mock_scalars.first.return_value = None
mock_session.scalars.return_value = mock_scalars
result = service._reset_node_var(mock_workflow, mock_variable)
# Should delete the variable and return None
mock_session.delete.assert_called_once_with(instance=mock_variable)
mock_session.flush.assert_called_once()
assert result is None
def test_reset_node_variable_with_valid_execution_record(self):
"""Test resetting a node variable with valid execution record - should restore from execution"""
mock_session = Mock(spec=Session)
service = WorkflowDraftVariableService(mock_session)
mock_workflow = Mock(spec=Workflow)
mock_workflow.app_id = self._get_test_app_id()
# Create mock variable with execution ID
mock_variable = Mock(spec=WorkflowDraftVariable)
mock_variable.get_variable_type.return_value = DraftVariableType.NODE
mock_variable.node_execution_id = "exec-id"
mock_variable.id = "var-id"
mock_variable.name = "test_var"
mock_variable.node_id = "node-id"
# Create mock execution record
mock_execution = Mock(spec=WorkflowNodeExecutionModel)
mock_execution.process_data_dict = {"test_var": "process_value"}
mock_execution.outputs_dict = {"test_var": "output_value"}
# Mock session.scalars to return the execution record
mock_scalars = Mock()
mock_scalars.first.return_value = mock_execution
mock_session.scalars.return_value = mock_scalars
# Mock workflow methods
mock_node_config = {"type": "test_node"}
mock_workflow.get_node_config_by_id.return_value = mock_node_config
mock_workflow.get_node_type_from_node_config.return_value = NodeType.LLM
result = service._reset_node_var(mock_workflow, mock_variable)
# Verify variable.set_value was called with the correct value
mock_variable.set_value.assert_called_once()
# Verify last_edited_at was reset
assert mock_variable.last_edited_at is None
# Verify session.flush was called
mock_session.flush.assert_called()
# Should return the updated variable
assert result == mock_variable
def test_reset_system_variable_raises_error(self):
"""Test that resetting a system variable raises an error"""
mock_session = Mock(spec=Session)
service = WorkflowDraftVariableService(mock_session)
mock_workflow = Mock(spec=Workflow)
mock_workflow.app_id = self._get_test_app_id()
mock_variable = Mock(spec=WorkflowDraftVariable)
mock_variable.get_variable_type.return_value = DraftVariableType.SYS # Not a valid enum value for this test
mock_variable.id = "var-id"
with pytest.raises(VariableResetError) as exc_info:
service.reset_variable(mock_workflow, mock_variable)
assert "cannot reset system variable" in str(exc_info.value)
assert "variable_id=var-id" in str(exc_info.value)

Loading…
Cancel
Save