Update tests to use domain model and remove cache-related tests

pull/19430/head
-LAN- 1 year ago
parent 1ffcc7e2db
commit 48128bbffa
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -0,0 +1,176 @@
"""
Tests for the WorkflowNodeExecutionRepository.
"""
from datetime import datetime, timedelta
from unittest.mock import MagicMock
import pytest
from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus
from core.workflow.repository.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository
from models.workflow import WorkflowNodeExecutionTriggeredFrom
@pytest.fixture
def repository():
"""Create a mock repository instance for testing."""
repo = MagicMock(spec=WorkflowNodeExecutionRepository)
repo._node_execution_cache = {}
# Add update method to the mock
repo.update = MagicMock()
return repo
@pytest.fixture
def sample_node_execution():
"""Create a sample NodeExecution for testing."""
now = datetime.now()
return NodeExecution(
id="test-id",
node_execution_id="test-node-execution-id",
workflow_id="test-workflow-id",
workflow_run_id="test-workflow-run-id",
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
index=1,
predecessor_node_id="test-predecessor-node-id",
node_id="test-node-id",
node_type="test-node-type",
title="Test Node",
inputs={"input1": "value1"},
process_data={"process1": "value1"},
outputs={"output1": "value1"},
status=NodeExecutionStatus.RUNNING,
error=None,
elapsed_time=0.0,
metadata={"key1": "value1"},
created_at=now,
finished_at=None,
)
def test_save_and_get_by_node_execution_id(repository, sample_node_execution):
"""Test saving a NodeExecution and retrieving it by node_execution_id."""
# Setup mock behavior
repository._node_execution_cache = {}
repository.get_by_node_execution_id = MagicMock(return_value=sample_node_execution)
# Save the node execution
repository.save(sample_node_execution)
# Retrieve the node execution
retrieved = repository.get_by_node_execution_id(sample_node_execution.node_execution_id)
# Verify the retrieved node execution
assert retrieved is not None
assert retrieved == sample_node_execution
def test_update(repository, sample_node_execution):
"""Test updating a NodeExecution."""
# Setup mock behavior
repository._node_execution_cache = {}
updated_execution = sample_node_execution.model_copy(deep=True)
updated_execution.status = NodeExecutionStatus.SUCCEEDED
updated_execution.elapsed_time = 1.5
updated_execution.finished_at = datetime.now()
updated_execution.outputs = {"output1": "updated_value"}
repository.get_by_node_execution_id = MagicMock(return_value=updated_execution)
# Save the node execution
repository.save(sample_node_execution)
# Update the node execution
sample_node_execution.status = NodeExecutionStatus.SUCCEEDED
sample_node_execution.elapsed_time = 1.5
sample_node_execution.finished_at = updated_execution.finished_at
sample_node_execution.outputs = {"output1": "updated_value"}
repository.update(sample_node_execution)
# Retrieve the updated node execution
retrieved = repository.get_by_node_execution_id(sample_node_execution.node_execution_id)
# Verify the updated node execution
assert retrieved is not None
assert retrieved.status == NodeExecutionStatus.SUCCEEDED
assert retrieved.elapsed_time == 1.5
assert retrieved.finished_at is not None
assert retrieved.outputs == {"output1": "updated_value"}
def test_get_by_workflow_run(repository, sample_node_execution):
"""Test retrieving NodeExecutions by workflow_run_id."""
# Create another node execution with the same workflow_run_id
now = datetime.now()
another_execution = NodeExecution(
id="test-id-2",
node_execution_id="test-node-execution-id-2",
workflow_id=sample_node_execution.workflow_id,
workflow_run_id=sample_node_execution.workflow_run_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
index=2,
predecessor_node_id=sample_node_execution.node_id,
node_id="test-node-id-2",
node_type="test-node-type-2",
title="Test Node 2",
inputs={"input2": "value2"},
process_data={"process2": "value2"},
outputs={"output2": "value2"},
status=NodeExecutionStatus.RUNNING,
error=None,
elapsed_time=0.0,
metadata={"key2": "value2"},
created_at=now + timedelta(seconds=1),
finished_at=None,
)
# Setup mock behavior
repository.get_by_workflow_run = MagicMock(return_value=[sample_node_execution, another_execution])
# Retrieve node executions by workflow_run_id
executions = repository.get_by_workflow_run(sample_node_execution.workflow_run_id)
# Verify the retrieved node executions
assert len(executions) == 2
assert any(e.node_execution_id == sample_node_execution.node_execution_id for e in executions)
assert any(e.node_execution_id == another_execution.node_execution_id for e in executions)
# Setup mock behavior for ordered executions
repository.get_by_workflow_run = MagicMock(return_value=[sample_node_execution, another_execution])
# Test with ordering
order_config = OrderConfig(order_by=["index"], order_direction="asc")
ordered_executions = repository.get_by_workflow_run(sample_node_execution.workflow_run_id, order_config)
assert len(ordered_executions) == 2
def test_get_running_executions(repository, sample_node_execution):
"""Test retrieving running NodeExecutions."""
# Setup mock behavior
repository.get_running_executions = MagicMock(return_value=[sample_node_execution])
# Retrieve running node executions
running_executions = repository.get_running_executions(sample_node_execution.workflow_run_id)
# Verify the retrieved node executions
assert len(running_executions) == 1
assert running_executions[0].node_execution_id == sample_node_execution.node_execution_id
assert running_executions[0].status == NodeExecutionStatus.RUNNING
def test_clear(repository, sample_node_execution):
"""Test clearing all NodeExecutions."""
# Setup mock behavior
repository._node_execution_cache = {sample_node_execution.node_execution_id: sample_node_execution}
repository.get_by_node_execution_id = MagicMock(side_effect=[sample_node_execution, None])
# Verify the node execution exists
assert repository.get_by_node_execution_id(sample_node_execution.node_execution_id) is not None
# Clear all node executions
repository.clear()
# Verify the node execution no longer exists
assert repository.get_by_node_execution_id(sample_node_execution.node_execution_id) is None

@ -19,7 +19,6 @@ from core.workflow.workflow_cycle_manager import WorkflowCycleManager
from models.enums import CreatedByRole from models.enums import CreatedByRole
from models.workflow import ( from models.workflow import (
Workflow, Workflow,
WorkflowNodeExecution,
WorkflowNodeExecutionStatus, WorkflowNodeExecutionStatus,
WorkflowRun, WorkflowRun,
WorkflowRunStatus, WorkflowRunStatus,
@ -107,7 +106,6 @@ def test_init(
): ):
"""Test initialization of WorkflowCycleManager""" """Test initialization of WorkflowCycleManager"""
assert workflow_cycle_manager._workflow_run is None assert workflow_cycle_manager._workflow_run is None
assert workflow_cycle_manager._workflow_node_executions == {}
assert workflow_cycle_manager._application_generate_entity == mock_app_generate_entity assert workflow_cycle_manager._application_generate_entity == mock_app_generate_entity
assert workflow_cycle_manager._workflow_system_variables == mock_workflow_system_variables assert workflow_cycle_manager._workflow_system_variables == mock_workflow_system_variables
assert workflow_cycle_manager._workflow_node_execution_repository == mock_node_execution_repository assert workflow_cycle_manager._workflow_node_execution_repository == mock_node_execution_repository
@ -215,8 +213,9 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_run):
) )
# Verify the result # Verify the result
assert result.tenant_id == mock_workflow_run.tenant_id # NodeExecution doesn't have tenant_id attribute, it's handled at repository level
assert result.app_id == mock_workflow_run.app_id # assert result.tenant_id == mock_workflow_run.tenant_id
# assert result.app_id == mock_workflow_run.app_id
assert result.workflow_id == mock_workflow_run.workflow_id assert result.workflow_id == mock_workflow_run.workflow_id
assert result.workflow_run_id == mock_workflow_run.id assert result.workflow_run_id == mock_workflow_run.id
assert result.node_execution_id == event.node_execution_id assert result.node_execution_id == event.node_execution_id
@ -224,15 +223,13 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_run):
assert result.node_type == event.node_type.value assert result.node_type == event.node_type.value
assert result.title == event.node_data.title assert result.title == event.node_data.title
assert result.status == WorkflowNodeExecutionStatus.RUNNING.value assert result.status == WorkflowNodeExecutionStatus.RUNNING.value
assert result.created_by_role == mock_workflow_run.created_by_role # NodeExecution doesn't have created_by_role and created_by attributes, they're handled at repository level
assert result.created_by == mock_workflow_run.created_by # assert result.created_by_role == mock_workflow_run.created_by_role
# assert result.created_by == mock_workflow_run.created_by
# Verify save was called # Verify save was called
workflow_cycle_manager._workflow_node_execution_repository.save.assert_called_once_with(result) workflow_cycle_manager._workflow_node_execution_repository.save.assert_called_once_with(result)
# Verify the node execution was added to the cache
assert workflow_cycle_manager._workflow_node_executions[event.node_execution_id] == result
def test_get_workflow_run(workflow_cycle_manager, mock_session, mock_workflow_run): def test_get_workflow_run(workflow_cycle_manager, mock_session, mock_workflow_run):
"""Test _get_workflow_run method""" """Test _get_workflow_run method"""
@ -261,28 +258,24 @@ def test_handle_workflow_node_execution_success(workflow_cycle_manager):
event.execution_metadata = {"metadata": "test metadata"} event.execution_metadata = {"metadata": "test metadata"}
event.start_at = datetime.now(UTC).replace(tzinfo=None) event.start_at = datetime.now(UTC).replace(tzinfo=None)
# Create a mock workflow node execution # Create a mock node execution
node_execution = MagicMock(spec=WorkflowNodeExecution) node_execution = MagicMock()
node_execution.node_execution_id = "test-node-execution-id" node_execution.node_execution_id = "test-node-execution-id"
# Mock _get_workflow_node_execution to return the mock node execution # Mock the repository to return the node execution
with patch.object(workflow_cycle_manager, "_get_workflow_node_execution", return_value=node_execution): workflow_cycle_manager._workflow_node_execution_repository.get_by_node_execution_id.return_value = node_execution
# Call the method
result = workflow_cycle_manager._handle_workflow_node_execution_success(
event=event,
)
# Verify the result # Call the method
assert result == node_execution result = workflow_cycle_manager._handle_workflow_node_execution_success(
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED.value event=event,
assert result.inputs == json.dumps(event.inputs) )
assert result.process_data == json.dumps(event.process_data)
assert result.outputs == json.dumps(event.outputs)
assert result.finished_at is not None
assert result.elapsed_time is not None
# Verify update was called # Verify the result
workflow_cycle_manager._workflow_node_execution_repository.update.assert_called_once_with(node_execution) assert result == node_execution
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED.value
# Verify save was called
workflow_cycle_manager._workflow_node_execution_repository.save.assert_called_once_with(node_execution)
def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_session, mock_workflow_run): def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_session, mock_workflow_run):
@ -322,27 +315,22 @@ def test_handle_workflow_node_execution_failed(workflow_cycle_manager):
event.start_at = datetime.now(UTC).replace(tzinfo=None) event.start_at = datetime.now(UTC).replace(tzinfo=None)
event.error = "Test error message" event.error = "Test error message"
# Create a mock workflow node execution # Create a mock node execution
node_execution = MagicMock(spec=WorkflowNodeExecution) node_execution = MagicMock()
node_execution.node_execution_id = "test-node-execution-id" node_execution.node_execution_id = "test-node-execution-id"
# Mock _get_workflow_node_execution to return the mock node execution # Mock the repository to return the node execution
with patch.object(workflow_cycle_manager, "_get_workflow_node_execution", return_value=node_execution): workflow_cycle_manager._workflow_node_execution_repository.get_by_node_execution_id.return_value = node_execution
# Call the method
result = workflow_cycle_manager._handle_workflow_node_execution_failed(
event=event,
)
# Verify the result # Call the method
assert result == node_execution result = workflow_cycle_manager._handle_workflow_node_execution_failed(
assert result.status == WorkflowNodeExecutionStatus.FAILED.value event=event,
assert result.error == "Test error message" )
assert result.inputs == json.dumps(event.inputs)
assert result.process_data == json.dumps(event.process_data)
assert result.outputs == json.dumps(event.outputs)
assert result.finished_at is not None
assert result.elapsed_time is not None
assert result.execution_metadata == json.dumps(event.execution_metadata)
# Verify update was called # Verify the result
workflow_cycle_manager._workflow_node_execution_repository.update.assert_called_once_with(node_execution) assert result == node_execution
assert result.status == WorkflowNodeExecutionStatus.FAILED.value
assert result.error == "Test error message"
# Verify save was called
workflow_cycle_manager._workflow_node_execution_repository.save.assert_called_once_with(node_execution)

@ -2,7 +2,7 @@
Unit tests for the SQLAlchemy implementation of WorkflowNodeExecutionRepository. Unit tests for the SQLAlchemy implementation of WorkflowNodeExecutionRepository.
""" """
from unittest.mock import MagicMock from unittest.mock import MagicMock, PropertyMock
import pytest import pytest
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
@ -13,6 +13,21 @@ from core.workflow.repository.workflow_node_execution_repository import OrderCon
from models.workflow import WorkflowNodeExecution from models.workflow import WorkflowNodeExecution
def configure_mock_execution(mock_execution):
"""Configure a mock execution with proper JSON serializable values."""
# Configure inputs, outputs, process_data, and execution_metadata to return JSON serializable values
type(mock_execution).inputs = PropertyMock(return_value='{"key": "value"}')
type(mock_execution).outputs = PropertyMock(return_value='{"result": "success"}')
type(mock_execution).process_data = PropertyMock(return_value='{"process": "data"}')
type(mock_execution).execution_metadata = PropertyMock(return_value='{"metadata": "info"}')
# Configure status and triggered_from to be valid enum values
mock_execution.status = "running"
mock_execution.triggered_from = "workflow-run"
return mock_execution
@pytest.fixture @pytest.fixture
def session(): def session():
"""Create a mock SQLAlchemy session.""" """Create a mock SQLAlchemy session."""
@ -28,14 +43,22 @@ def session():
@pytest.fixture @pytest.fixture
def repository(session): def mock_user():
"""Create a mock user for testing."""
user = MagicMock()
user.tenant_id = "test-tenant"
user.id = "test-user-id"
# Set up to be recognized as an Account
user.__class__.__name__ = "Account"
return user
@pytest.fixture
def repository(session, mock_user):
"""Create a repository instance with test data.""" """Create a repository instance with test data."""
_, session_factory = session _, session_factory = session
tenant_id = "test-tenant"
app_id = "test-app" app_id = "test-app"
return SQLAlchemyWorkflowNodeExecutionRepository( return SQLAlchemyWorkflowNodeExecutionRepository(session_factory=session_factory, user=mock_user, app_id=app_id)
session_factory=session_factory, tenant_id=tenant_id, app_id=app_id
)
def test_save(repository, session): def test_save(repository, session):
@ -45,16 +68,23 @@ def test_save(repository, session):
execution = MagicMock(spec=WorkflowNodeExecution) execution = MagicMock(spec=WorkflowNodeExecution)
execution.tenant_id = None execution.tenant_id = None
execution.app_id = None execution.app_id = None
execution.inputs = None
execution.process_data = None
execution.outputs = None
execution.metadata = None
# Mock the _to_db_model method to return the execution itself
# This simulates the behavior of setting tenant_id and app_id
repository._to_db_model = MagicMock(return_value=execution)
# Call save method # Call save method
repository.save(execution) repository.save(execution)
# Assert tenant_id and app_id are set # Assert _to_db_model was called with the execution
assert execution.tenant_id == repository._tenant_id repository._to_db_model.assert_called_once_with(execution)
assert execution.app_id == repository._app_id
# Assert session.add was called # Assert session.merge was called (now using merge for both save and update)
session_obj.add.assert_called_once_with(execution) session_obj.merge.assert_called_once_with(execution)
def test_save_with_existing_tenant_id(repository, session): def test_save_with_existing_tenant_id(repository, session):
@ -64,16 +94,27 @@ def test_save_with_existing_tenant_id(repository, session):
execution = MagicMock(spec=WorkflowNodeExecution) execution = MagicMock(spec=WorkflowNodeExecution)
execution.tenant_id = "existing-tenant" execution.tenant_id = "existing-tenant"
execution.app_id = None execution.app_id = None
execution.inputs = None
execution.process_data = None
execution.outputs = None
execution.metadata = None
# Create a modified execution that will be returned by _to_db_model
modified_execution = MagicMock(spec=WorkflowNodeExecution)
modified_execution.tenant_id = "existing-tenant" # Tenant ID should not change
modified_execution.app_id = repository._app_id # App ID should be set
# Mock the _to_db_model method to return the modified execution
repository._to_db_model = MagicMock(return_value=modified_execution)
# Call save method # Call save method
repository.save(execution) repository.save(execution)
# Assert tenant_id is not changed and app_id is set # Assert _to_db_model was called with the execution
assert execution.tenant_id == "existing-tenant" repository._to_db_model.assert_called_once_with(execution)
assert execution.app_id == repository._app_id
# Assert session.add was called # Assert session.merge was called with the modified execution (now using merge for both save and update)
session_obj.add.assert_called_once_with(execution) session_obj.merge.assert_called_once_with(modified_execution)
def test_get_by_node_execution_id(repository, session, mocker: MockerFixture): def test_get_by_node_execution_id(repository, session, mocker: MockerFixture):
@ -84,7 +125,16 @@ def test_get_by_node_execution_id(repository, session, mocker: MockerFixture):
mock_stmt = mocker.MagicMock() mock_stmt = mocker.MagicMock()
mock_select.return_value = mock_stmt mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt mock_stmt.where.return_value = mock_stmt
session_obj.scalar.return_value = mocker.MagicMock(spec=WorkflowNodeExecution)
# Create a properly configured mock execution
mock_execution = mocker.MagicMock(spec=WorkflowNodeExecution)
configure_mock_execution(mock_execution)
session_obj.scalar.return_value = mock_execution
# Create a mock domain model to be returned by _to_domain_model
mock_domain_model = mocker.MagicMock()
# Mock the _to_domain_model method to return our mock domain model
repository._to_domain_model = mocker.MagicMock(return_value=mock_domain_model)
# Call method # Call method
result = repository.get_by_node_execution_id("test-node-execution-id") result = repository.get_by_node_execution_id("test-node-execution-id")
@ -92,7 +142,10 @@ def test_get_by_node_execution_id(repository, session, mocker: MockerFixture):
# Assert select was called with correct parameters # Assert select was called with correct parameters
mock_select.assert_called_once() mock_select.assert_called_once()
session_obj.scalar.assert_called_once_with(mock_stmt) session_obj.scalar.assert_called_once_with(mock_stmt)
assert result is not None # Assert _to_domain_model was called with the mock execution
repository._to_domain_model.assert_called_once_with(mock_execution)
# Assert the result is our mock domain model
assert result is mock_domain_model
def test_get_by_workflow_run(repository, session, mocker: MockerFixture): def test_get_by_workflow_run(repository, session, mocker: MockerFixture):
@ -104,7 +157,16 @@ def test_get_by_workflow_run(repository, session, mocker: MockerFixture):
mock_select.return_value = mock_stmt mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt mock_stmt.where.return_value = mock_stmt
mock_stmt.order_by.return_value = mock_stmt mock_stmt.order_by.return_value = mock_stmt
session_obj.scalars.return_value.all.return_value = [mocker.MagicMock(spec=WorkflowNodeExecution)]
# Create a properly configured mock execution
mock_execution = mocker.MagicMock(spec=WorkflowNodeExecution)
configure_mock_execution(mock_execution)
session_obj.scalars.return_value.all.return_value = [mock_execution]
# Create a mock domain model to be returned by _to_domain_model
mock_domain_model = mocker.MagicMock()
# Mock the _to_domain_model method to return our mock domain model
repository._to_domain_model = mocker.MagicMock(return_value=mock_domain_model)
# Call method # Call method
order_config = OrderConfig(order_by=["index"], order_direction="desc") order_config = OrderConfig(order_by=["index"], order_direction="desc")
@ -113,7 +175,42 @@ def test_get_by_workflow_run(repository, session, mocker: MockerFixture):
# Assert select was called with correct parameters # Assert select was called with correct parameters
mock_select.assert_called_once() mock_select.assert_called_once()
session_obj.scalars.assert_called_once_with(mock_stmt) session_obj.scalars.assert_called_once_with(mock_stmt)
# Assert _to_domain_model was called with the mock execution
repository._to_domain_model.assert_called_once_with(mock_execution)
# Assert the result contains our mock domain model
assert len(result) == 1 assert len(result) == 1
assert result[0] is mock_domain_model
def test_get_db_models_by_workflow_run(repository, session, mocker: MockerFixture):
"""Test get_db_models_by_workflow_run method."""
session_obj, _ = session
# Set up mock
mock_select = mocker.patch("core.repositories.sqlalchemy_workflow_node_execution_repository.select")
mock_stmt = mocker.MagicMock()
mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt
mock_stmt.order_by.return_value = mock_stmt
# Create a properly configured mock execution
mock_execution = mocker.MagicMock(spec=WorkflowNodeExecution)
configure_mock_execution(mock_execution)
session_obj.scalars.return_value.all.return_value = [mock_execution]
# Call method
order_config = OrderConfig(order_by=["index"], order_direction="desc")
result = repository.get_db_models_by_workflow_run(workflow_run_id="test-workflow-run-id", order_config=order_config)
# Assert select was called with correct parameters
mock_select.assert_called_once()
session_obj.scalars.assert_called_once_with(mock_stmt)
# Assert the result contains our mock db model directly (without conversion to domain model)
assert len(result) == 1
assert result[0] is mock_execution
# Verify that _to_domain_model was NOT called (since we're returning raw DB models)
assert not hasattr(repository, "_to_domain_model.assert_not_called") or not repository._to_domain_model.called
def test_get_running_executions(repository, session, mocker: MockerFixture): def test_get_running_executions(repository, session, mocker: MockerFixture):
@ -124,7 +221,16 @@ def test_get_running_executions(repository, session, mocker: MockerFixture):
mock_stmt = mocker.MagicMock() mock_stmt = mocker.MagicMock()
mock_select.return_value = mock_stmt mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt mock_stmt.where.return_value = mock_stmt
session_obj.scalars.return_value.all.return_value = [mocker.MagicMock(spec=WorkflowNodeExecution)]
# Create a properly configured mock execution
mock_execution = mocker.MagicMock(spec=WorkflowNodeExecution)
configure_mock_execution(mock_execution)
session_obj.scalars.return_value.all.return_value = [mock_execution]
# Create a mock domain model to be returned by _to_domain_model
mock_domain_model = mocker.MagicMock()
# Mock the _to_domain_model method to return our mock domain model
repository._to_domain_model = mocker.MagicMock(return_value=mock_domain_model)
# Call method # Call method
result = repository.get_running_executions("test-workflow-run-id") result = repository.get_running_executions("test-workflow-run-id")
@ -132,25 +238,36 @@ def test_get_running_executions(repository, session, mocker: MockerFixture):
# Assert select was called with correct parameters # Assert select was called with correct parameters
mock_select.assert_called_once() mock_select.assert_called_once()
session_obj.scalars.assert_called_once_with(mock_stmt) session_obj.scalars.assert_called_once_with(mock_stmt)
# Assert _to_domain_model was called with the mock execution
repository._to_domain_model.assert_called_once_with(mock_execution)
# Assert the result contains our mock domain model
assert len(result) == 1 assert len(result) == 1
assert result[0] is mock_domain_model
def test_update(repository, session): def test_update_via_save(repository, session):
"""Test update method.""" """Test updating an existing record via save method."""
session_obj, _ = session session_obj, _ = session
# Create a mock execution # Create a mock execution
execution = MagicMock(spec=WorkflowNodeExecution) execution = MagicMock(spec=WorkflowNodeExecution)
execution.tenant_id = None execution.tenant_id = None
execution.app_id = None execution.app_id = None
execution.inputs = None
execution.process_data = None
execution.outputs = None
execution.metadata = None
# Call update method # Mock the _to_db_model method to return the execution itself
repository.update(execution) # This simulates the behavior of setting tenant_id and app_id
repository._to_db_model = MagicMock(return_value=execution)
# Call save method to update an existing record
repository.save(execution)
# Assert tenant_id and app_id are set # Assert _to_db_model was called with the execution
assert execution.tenant_id == repository._tenant_id repository._to_db_model.assert_called_once_with(execution)
assert execution.app_id == repository._app_id
# Assert session.merge was called # Assert session.merge was called (for updates)
session_obj.merge.assert_called_once_with(execution) session_obj.merge.assert_called_once_with(execution)

Loading…
Cancel
Save