From 48128bbffa899ce2a536f489764679219fa883e1 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Fri, 9 May 2025 12:24:38 +0800 Subject: [PATCH] Update tests to use domain model and remove cache-related tests --- ...test_workflow_node_execution_repository.py | 176 ++++++++++++++++++ .../workflow/test_workflow_cycle_manager.py | 82 ++++---- .../test_sqlalchemy_repository.py | 173 ++++++++++++++--- 3 files changed, 356 insertions(+), 75 deletions(-) create mode 100644 api/tests/unit_tests/core/workflow/repository/test_workflow_node_execution_repository.py diff --git a/api/tests/unit_tests/core/workflow/repository/test_workflow_node_execution_repository.py b/api/tests/unit_tests/core/workflow/repository/test_workflow_node_execution_repository.py new file mode 100644 index 0000000000..589cc0e588 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/repository/test_workflow_node_execution_repository.py @@ -0,0 +1,176 @@ +""" +Tests for the WorkflowNodeExecutionRepository. +""" + +from datetime import datetime, timedelta +from unittest.mock import MagicMock + +import pytest + +from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus +from core.workflow.repository.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository +from models.workflow import WorkflowNodeExecutionTriggeredFrom + + +@pytest.fixture +def repository(): + """Create a mock repository instance for testing.""" + repo = MagicMock(spec=WorkflowNodeExecutionRepository) + repo._node_execution_cache = {} + # Add update method to the mock + repo.update = MagicMock() + return repo + + +@pytest.fixture +def sample_node_execution(): + """Create a sample NodeExecution for testing.""" + now = datetime.now() + return NodeExecution( + id="test-id", + node_execution_id="test-node-execution-id", + workflow_id="test-workflow-id", + workflow_run_id="test-workflow-run-id", + triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, + index=1, + predecessor_node_id="test-predecessor-node-id", + node_id="test-node-id", + node_type="test-node-type", + title="Test Node", + inputs={"input1": "value1"}, + process_data={"process1": "value1"}, + outputs={"output1": "value1"}, + status=NodeExecutionStatus.RUNNING, + error=None, + elapsed_time=0.0, + metadata={"key1": "value1"}, + created_at=now, + finished_at=None, + ) + + +def test_save_and_get_by_node_execution_id(repository, sample_node_execution): + """Test saving a NodeExecution and retrieving it by node_execution_id.""" + # Setup mock behavior + repository._node_execution_cache = {} + repository.get_by_node_execution_id = MagicMock(return_value=sample_node_execution) + + # Save the node execution + repository.save(sample_node_execution) + + # Retrieve the node execution + retrieved = repository.get_by_node_execution_id(sample_node_execution.node_execution_id) + + # Verify the retrieved node execution + assert retrieved is not None + assert retrieved == sample_node_execution + + +def test_update(repository, sample_node_execution): + """Test updating a NodeExecution.""" + # Setup mock behavior + repository._node_execution_cache = {} + updated_execution = sample_node_execution.model_copy(deep=True) + updated_execution.status = NodeExecutionStatus.SUCCEEDED + updated_execution.elapsed_time = 1.5 + updated_execution.finished_at = datetime.now() + updated_execution.outputs = {"output1": "updated_value"} + + repository.get_by_node_execution_id = MagicMock(return_value=updated_execution) + + # Save the node execution + repository.save(sample_node_execution) + + # Update the node execution + sample_node_execution.status = NodeExecutionStatus.SUCCEEDED + sample_node_execution.elapsed_time = 1.5 + sample_node_execution.finished_at = updated_execution.finished_at + sample_node_execution.outputs = {"output1": "updated_value"} + + repository.update(sample_node_execution) + + # Retrieve the updated node execution + retrieved = repository.get_by_node_execution_id(sample_node_execution.node_execution_id) + + # Verify the updated node execution + assert retrieved is not None + assert retrieved.status == NodeExecutionStatus.SUCCEEDED + assert retrieved.elapsed_time == 1.5 + assert retrieved.finished_at is not None + assert retrieved.outputs == {"output1": "updated_value"} + + +def test_get_by_workflow_run(repository, sample_node_execution): + """Test retrieving NodeExecutions by workflow_run_id.""" + # Create another node execution with the same workflow_run_id + now = datetime.now() + another_execution = NodeExecution( + id="test-id-2", + node_execution_id="test-node-execution-id-2", + workflow_id=sample_node_execution.workflow_id, + workflow_run_id=sample_node_execution.workflow_run_id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, + index=2, + predecessor_node_id=sample_node_execution.node_id, + node_id="test-node-id-2", + node_type="test-node-type-2", + title="Test Node 2", + inputs={"input2": "value2"}, + process_data={"process2": "value2"}, + outputs={"output2": "value2"}, + status=NodeExecutionStatus.RUNNING, + error=None, + elapsed_time=0.0, + metadata={"key2": "value2"}, + created_at=now + timedelta(seconds=1), + finished_at=None, + ) + + # Setup mock behavior + repository.get_by_workflow_run = MagicMock(return_value=[sample_node_execution, another_execution]) + + # Retrieve node executions by workflow_run_id + executions = repository.get_by_workflow_run(sample_node_execution.workflow_run_id) + + # Verify the retrieved node executions + assert len(executions) == 2 + assert any(e.node_execution_id == sample_node_execution.node_execution_id for e in executions) + assert any(e.node_execution_id == another_execution.node_execution_id for e in executions) + + # Setup mock behavior for ordered executions + repository.get_by_workflow_run = MagicMock(return_value=[sample_node_execution, another_execution]) + + # Test with ordering + order_config = OrderConfig(order_by=["index"], order_direction="asc") + ordered_executions = repository.get_by_workflow_run(sample_node_execution.workflow_run_id, order_config) + assert len(ordered_executions) == 2 + + +def test_get_running_executions(repository, sample_node_execution): + """Test retrieving running NodeExecutions.""" + # Setup mock behavior + repository.get_running_executions = MagicMock(return_value=[sample_node_execution]) + + # Retrieve running node executions + running_executions = repository.get_running_executions(sample_node_execution.workflow_run_id) + + # Verify the retrieved node executions + assert len(running_executions) == 1 + assert running_executions[0].node_execution_id == sample_node_execution.node_execution_id + assert running_executions[0].status == NodeExecutionStatus.RUNNING + + +def test_clear(repository, sample_node_execution): + """Test clearing all NodeExecutions.""" + # Setup mock behavior + repository._node_execution_cache = {sample_node_execution.node_execution_id: sample_node_execution} + repository.get_by_node_execution_id = MagicMock(side_effect=[sample_node_execution, None]) + + # Verify the node execution exists + assert repository.get_by_node_execution_id(sample_node_execution.node_execution_id) is not None + + # Clear all node executions + repository.clear() + + # Verify the node execution no longer exists + assert repository.get_by_node_execution_id(sample_node_execution.node_execution_id) is None diff --git a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py index 6b00b203c4..e49359662b 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py @@ -19,7 +19,6 @@ from core.workflow.workflow_cycle_manager import WorkflowCycleManager from models.enums import CreatedByRole from models.workflow import ( Workflow, - WorkflowNodeExecution, WorkflowNodeExecutionStatus, WorkflowRun, WorkflowRunStatus, @@ -107,7 +106,6 @@ def test_init( ): """Test initialization of WorkflowCycleManager""" assert workflow_cycle_manager._workflow_run is None - assert workflow_cycle_manager._workflow_node_executions == {} assert workflow_cycle_manager._application_generate_entity == mock_app_generate_entity assert workflow_cycle_manager._workflow_system_variables == mock_workflow_system_variables assert workflow_cycle_manager._workflow_node_execution_repository == mock_node_execution_repository @@ -215,8 +213,9 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_run): ) # Verify the result - assert result.tenant_id == mock_workflow_run.tenant_id - assert result.app_id == mock_workflow_run.app_id + # NodeExecution doesn't have tenant_id attribute, it's handled at repository level + # assert result.tenant_id == mock_workflow_run.tenant_id + # assert result.app_id == mock_workflow_run.app_id assert result.workflow_id == mock_workflow_run.workflow_id assert result.workflow_run_id == mock_workflow_run.id assert result.node_execution_id == event.node_execution_id @@ -224,15 +223,13 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_run): assert result.node_type == event.node_type.value assert result.title == event.node_data.title assert result.status == WorkflowNodeExecutionStatus.RUNNING.value - assert result.created_by_role == mock_workflow_run.created_by_role - assert result.created_by == mock_workflow_run.created_by + # NodeExecution doesn't have created_by_role and created_by attributes, they're handled at repository level + # assert result.created_by_role == mock_workflow_run.created_by_role + # assert result.created_by == mock_workflow_run.created_by # Verify save was called workflow_cycle_manager._workflow_node_execution_repository.save.assert_called_once_with(result) - # Verify the node execution was added to the cache - assert workflow_cycle_manager._workflow_node_executions[event.node_execution_id] == result - def test_get_workflow_run(workflow_cycle_manager, mock_session, mock_workflow_run): """Test _get_workflow_run method""" @@ -261,28 +258,24 @@ def test_handle_workflow_node_execution_success(workflow_cycle_manager): event.execution_metadata = {"metadata": "test metadata"} event.start_at = datetime.now(UTC).replace(tzinfo=None) - # Create a mock workflow node execution - node_execution = MagicMock(spec=WorkflowNodeExecution) + # Create a mock node execution + node_execution = MagicMock() node_execution.node_execution_id = "test-node-execution-id" - # Mock _get_workflow_node_execution to return the mock node execution - with patch.object(workflow_cycle_manager, "_get_workflow_node_execution", return_value=node_execution): - # Call the method - result = workflow_cycle_manager._handle_workflow_node_execution_success( - event=event, - ) + # Mock the repository to return the node execution + workflow_cycle_manager._workflow_node_execution_repository.get_by_node_execution_id.return_value = node_execution - # Verify the result - assert result == node_execution - assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED.value - assert result.inputs == json.dumps(event.inputs) - assert result.process_data == json.dumps(event.process_data) - assert result.outputs == json.dumps(event.outputs) - assert result.finished_at is not None - assert result.elapsed_time is not None + # Call the method + result = workflow_cycle_manager._handle_workflow_node_execution_success( + event=event, + ) - # Verify update was called - workflow_cycle_manager._workflow_node_execution_repository.update.assert_called_once_with(node_execution) + # Verify the result + assert result == node_execution + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED.value + + # Verify save was called + workflow_cycle_manager._workflow_node_execution_repository.save.assert_called_once_with(node_execution) def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_session, mock_workflow_run): @@ -322,27 +315,22 @@ def test_handle_workflow_node_execution_failed(workflow_cycle_manager): event.start_at = datetime.now(UTC).replace(tzinfo=None) event.error = "Test error message" - # Create a mock workflow node execution - node_execution = MagicMock(spec=WorkflowNodeExecution) + # Create a mock node execution + node_execution = MagicMock() node_execution.node_execution_id = "test-node-execution-id" - # Mock _get_workflow_node_execution to return the mock node execution - with patch.object(workflow_cycle_manager, "_get_workflow_node_execution", return_value=node_execution): - # Call the method - result = workflow_cycle_manager._handle_workflow_node_execution_failed( - event=event, - ) + # Mock the repository to return the node execution + workflow_cycle_manager._workflow_node_execution_repository.get_by_node_execution_id.return_value = node_execution - # Verify the result - assert result == node_execution - assert result.status == WorkflowNodeExecutionStatus.FAILED.value - assert result.error == "Test error message" - assert result.inputs == json.dumps(event.inputs) - assert result.process_data == json.dumps(event.process_data) - assert result.outputs == json.dumps(event.outputs) - assert result.finished_at is not None - assert result.elapsed_time is not None - assert result.execution_metadata == json.dumps(event.execution_metadata) + # Call the method + result = workflow_cycle_manager._handle_workflow_node_execution_failed( + event=event, + ) - # Verify update was called - workflow_cycle_manager._workflow_node_execution_repository.update.assert_called_once_with(node_execution) + # Verify the result + assert result == node_execution + assert result.status == WorkflowNodeExecutionStatus.FAILED.value + assert result.error == "Test error message" + + # Verify save was called + workflow_cycle_manager._workflow_node_execution_repository.save.assert_called_once_with(node_execution) diff --git a/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py b/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py index 9cda873e90..20dd29bc22 100644 --- a/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py +++ b/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py @@ -2,7 +2,7 @@ Unit tests for the SQLAlchemy implementation of WorkflowNodeExecutionRepository. """ -from unittest.mock import MagicMock +from unittest.mock import MagicMock, PropertyMock import pytest from pytest_mock import MockerFixture @@ -13,6 +13,21 @@ from core.workflow.repository.workflow_node_execution_repository import OrderCon from models.workflow import WorkflowNodeExecution +def configure_mock_execution(mock_execution): + """Configure a mock execution with proper JSON serializable values.""" + # Configure inputs, outputs, process_data, and execution_metadata to return JSON serializable values + type(mock_execution).inputs = PropertyMock(return_value='{"key": "value"}') + type(mock_execution).outputs = PropertyMock(return_value='{"result": "success"}') + type(mock_execution).process_data = PropertyMock(return_value='{"process": "data"}') + type(mock_execution).execution_metadata = PropertyMock(return_value='{"metadata": "info"}') + + # Configure status and triggered_from to be valid enum values + mock_execution.status = "running" + mock_execution.triggered_from = "workflow-run" + + return mock_execution + + @pytest.fixture def session(): """Create a mock SQLAlchemy session.""" @@ -28,14 +43,22 @@ def session(): @pytest.fixture -def repository(session): +def mock_user(): + """Create a mock user for testing.""" + user = MagicMock() + user.tenant_id = "test-tenant" + user.id = "test-user-id" + # Set up to be recognized as an Account + user.__class__.__name__ = "Account" + return user + + +@pytest.fixture +def repository(session, mock_user): """Create a repository instance with test data.""" _, session_factory = session - tenant_id = "test-tenant" app_id = "test-app" - return SQLAlchemyWorkflowNodeExecutionRepository( - session_factory=session_factory, tenant_id=tenant_id, app_id=app_id - ) + return SQLAlchemyWorkflowNodeExecutionRepository(session_factory=session_factory, user=mock_user, app_id=app_id) def test_save(repository, session): @@ -45,16 +68,23 @@ def test_save(repository, session): execution = MagicMock(spec=WorkflowNodeExecution) execution.tenant_id = None execution.app_id = None + execution.inputs = None + execution.process_data = None + execution.outputs = None + execution.metadata = None + + # Mock the _to_db_model method to return the execution itself + # This simulates the behavior of setting tenant_id and app_id + repository._to_db_model = MagicMock(return_value=execution) # Call save method repository.save(execution) - # Assert tenant_id and app_id are set - assert execution.tenant_id == repository._tenant_id - assert execution.app_id == repository._app_id + # Assert _to_db_model was called with the execution + repository._to_db_model.assert_called_once_with(execution) - # Assert session.add was called - session_obj.add.assert_called_once_with(execution) + # Assert session.merge was called (now using merge for both save and update) + session_obj.merge.assert_called_once_with(execution) def test_save_with_existing_tenant_id(repository, session): @@ -64,16 +94,27 @@ def test_save_with_existing_tenant_id(repository, session): execution = MagicMock(spec=WorkflowNodeExecution) execution.tenant_id = "existing-tenant" execution.app_id = None + execution.inputs = None + execution.process_data = None + execution.outputs = None + execution.metadata = None + + # Create a modified execution that will be returned by _to_db_model + modified_execution = MagicMock(spec=WorkflowNodeExecution) + modified_execution.tenant_id = "existing-tenant" # Tenant ID should not change + modified_execution.app_id = repository._app_id # App ID should be set + + # Mock the _to_db_model method to return the modified execution + repository._to_db_model = MagicMock(return_value=modified_execution) # Call save method repository.save(execution) - # Assert tenant_id is not changed and app_id is set - assert execution.tenant_id == "existing-tenant" - assert execution.app_id == repository._app_id + # Assert _to_db_model was called with the execution + repository._to_db_model.assert_called_once_with(execution) - # Assert session.add was called - session_obj.add.assert_called_once_with(execution) + # Assert session.merge was called with the modified execution (now using merge for both save and update) + session_obj.merge.assert_called_once_with(modified_execution) def test_get_by_node_execution_id(repository, session, mocker: MockerFixture): @@ -84,7 +125,16 @@ def test_get_by_node_execution_id(repository, session, mocker: MockerFixture): mock_stmt = mocker.MagicMock() mock_select.return_value = mock_stmt mock_stmt.where.return_value = mock_stmt - session_obj.scalar.return_value = mocker.MagicMock(spec=WorkflowNodeExecution) + + # Create a properly configured mock execution + mock_execution = mocker.MagicMock(spec=WorkflowNodeExecution) + configure_mock_execution(mock_execution) + session_obj.scalar.return_value = mock_execution + + # Create a mock domain model to be returned by _to_domain_model + mock_domain_model = mocker.MagicMock() + # Mock the _to_domain_model method to return our mock domain model + repository._to_domain_model = mocker.MagicMock(return_value=mock_domain_model) # Call method result = repository.get_by_node_execution_id("test-node-execution-id") @@ -92,7 +142,10 @@ def test_get_by_node_execution_id(repository, session, mocker: MockerFixture): # Assert select was called with correct parameters mock_select.assert_called_once() session_obj.scalar.assert_called_once_with(mock_stmt) - assert result is not None + # Assert _to_domain_model was called with the mock execution + repository._to_domain_model.assert_called_once_with(mock_execution) + # Assert the result is our mock domain model + assert result is mock_domain_model def test_get_by_workflow_run(repository, session, mocker: MockerFixture): @@ -104,7 +157,16 @@ def test_get_by_workflow_run(repository, session, mocker: MockerFixture): mock_select.return_value = mock_stmt mock_stmt.where.return_value = mock_stmt mock_stmt.order_by.return_value = mock_stmt - session_obj.scalars.return_value.all.return_value = [mocker.MagicMock(spec=WorkflowNodeExecution)] + + # Create a properly configured mock execution + mock_execution = mocker.MagicMock(spec=WorkflowNodeExecution) + configure_mock_execution(mock_execution) + session_obj.scalars.return_value.all.return_value = [mock_execution] + + # Create a mock domain model to be returned by _to_domain_model + mock_domain_model = mocker.MagicMock() + # Mock the _to_domain_model method to return our mock domain model + repository._to_domain_model = mocker.MagicMock(return_value=mock_domain_model) # Call method order_config = OrderConfig(order_by=["index"], order_direction="desc") @@ -113,7 +175,42 @@ def test_get_by_workflow_run(repository, session, mocker: MockerFixture): # Assert select was called with correct parameters mock_select.assert_called_once() session_obj.scalars.assert_called_once_with(mock_stmt) + # Assert _to_domain_model was called with the mock execution + repository._to_domain_model.assert_called_once_with(mock_execution) + # Assert the result contains our mock domain model assert len(result) == 1 + assert result[0] is mock_domain_model + + +def test_get_db_models_by_workflow_run(repository, session, mocker: MockerFixture): + """Test get_db_models_by_workflow_run method.""" + session_obj, _ = session + # Set up mock + mock_select = mocker.patch("core.repositories.sqlalchemy_workflow_node_execution_repository.select") + mock_stmt = mocker.MagicMock() + mock_select.return_value = mock_stmt + mock_stmt.where.return_value = mock_stmt + mock_stmt.order_by.return_value = mock_stmt + + # Create a properly configured mock execution + mock_execution = mocker.MagicMock(spec=WorkflowNodeExecution) + configure_mock_execution(mock_execution) + session_obj.scalars.return_value.all.return_value = [mock_execution] + + # Call method + order_config = OrderConfig(order_by=["index"], order_direction="desc") + result = repository.get_db_models_by_workflow_run(workflow_run_id="test-workflow-run-id", order_config=order_config) + + # Assert select was called with correct parameters + mock_select.assert_called_once() + session_obj.scalars.assert_called_once_with(mock_stmt) + + # Assert the result contains our mock db model directly (without conversion to domain model) + assert len(result) == 1 + assert result[0] is mock_execution + + # Verify that _to_domain_model was NOT called (since we're returning raw DB models) + assert not hasattr(repository, "_to_domain_model.assert_not_called") or not repository._to_domain_model.called def test_get_running_executions(repository, session, mocker: MockerFixture): @@ -124,7 +221,16 @@ def test_get_running_executions(repository, session, mocker: MockerFixture): mock_stmt = mocker.MagicMock() mock_select.return_value = mock_stmt mock_stmt.where.return_value = mock_stmt - session_obj.scalars.return_value.all.return_value = [mocker.MagicMock(spec=WorkflowNodeExecution)] + + # Create a properly configured mock execution + mock_execution = mocker.MagicMock(spec=WorkflowNodeExecution) + configure_mock_execution(mock_execution) + session_obj.scalars.return_value.all.return_value = [mock_execution] + + # Create a mock domain model to be returned by _to_domain_model + mock_domain_model = mocker.MagicMock() + # Mock the _to_domain_model method to return our mock domain model + repository._to_domain_model = mocker.MagicMock(return_value=mock_domain_model) # Call method result = repository.get_running_executions("test-workflow-run-id") @@ -132,25 +238,36 @@ def test_get_running_executions(repository, session, mocker: MockerFixture): # Assert select was called with correct parameters mock_select.assert_called_once() session_obj.scalars.assert_called_once_with(mock_stmt) + # Assert _to_domain_model was called with the mock execution + repository._to_domain_model.assert_called_once_with(mock_execution) + # Assert the result contains our mock domain model assert len(result) == 1 + assert result[0] is mock_domain_model -def test_update(repository, session): - """Test update method.""" +def test_update_via_save(repository, session): + """Test updating an existing record via save method.""" session_obj, _ = session # Create a mock execution execution = MagicMock(spec=WorkflowNodeExecution) execution.tenant_id = None execution.app_id = None + execution.inputs = None + execution.process_data = None + execution.outputs = None + execution.metadata = None - # Call update method - repository.update(execution) + # Mock the _to_db_model method to return the execution itself + # This simulates the behavior of setting tenant_id and app_id + repository._to_db_model = MagicMock(return_value=execution) + + # Call save method to update an existing record + repository.save(execution) - # Assert tenant_id and app_id are set - assert execution.tenant_id == repository._tenant_id - assert execution.app_id == repository._app_id + # Assert _to_db_model was called with the execution + repository._to_db_model.assert_called_once_with(execution) - # Assert session.merge was called + # Assert session.merge was called (for updates) session_obj.merge.assert_called_once_with(execution)