test(test_sqlalchemy_repository): Add tests for model conversion methods

Introduces unit tests for _to_db_model and _to_domain_model methods
in the SQLAlchemyWorkflowNodeExecutionRepository. These tests ensure
correct conversion between domain and database models, improving
reliability and maintainability of the repository code. Also, sets
default 'triggered_from' to WORKFLOW_RUN for the repository instance.

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/19430/head
-LAN- 1 year ago
parent 38af41e3be
commit 68616271db
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -2,6 +2,8 @@
Unit tests for the SQLAlchemy implementation of WorkflowNodeExecutionRepository. Unit tests for the SQLAlchemy implementation of WorkflowNodeExecutionRepository.
""" """
import json
from datetime import datetime
from unittest.mock import MagicMock, PropertyMock from unittest.mock import MagicMock, PropertyMock
import pytest import pytest
@ -9,9 +11,12 @@ from pytest_mock import MockerFixture
from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm import Session, sessionmaker
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.entities.node_entities import NodeRunMetadataKey
from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus
from core.workflow.nodes.enums import NodeType
from core.workflow.repository.workflow_node_execution_repository import OrderConfig from core.workflow.repository.workflow_node_execution_repository import OrderConfig
from models.account import Account, Tenant from models.account import Account, Tenant
from models.workflow import WorkflowNodeExecution from models.workflow import WorkflowNodeExecution, WorkflowNodeExecutionStatus, WorkflowNodeExecutionTriggeredFrom
def configure_mock_execution(mock_execution): def configure_mock_execution(mock_execution):
@ -67,7 +72,7 @@ def repository(session, mock_user):
session_factory=session_factory, session_factory=session_factory,
user=mock_user, user=mock_user,
app_id=app_id, app_id=app_id,
triggered_from=None, triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
) )
@ -306,3 +311,118 @@ def test_clear(repository, session, mocker: MockerFixture):
mock_stmt.where.assert_called() mock_stmt.where.assert_called()
session_obj.execute.assert_called_once_with(mock_stmt) session_obj.execute.assert_called_once_with(mock_stmt)
session_obj.commit.assert_called_once() session_obj.commit.assert_called_once()
def test_to_db_model(repository):
"""Test _to_db_model method."""
# Create a domain model
domain_model = NodeExecution(
id="test-id",
workflow_id="test-workflow-id",
node_execution_id="test-node-execution-id",
workflow_run_id="test-workflow-run-id",
index=1,
predecessor_node_id="test-predecessor-id",
node_id="test-node-id",
node_type=NodeType.START,
title="Test Node",
inputs={"input_key": "input_value"},
process_data={"process_key": "process_value"},
outputs={"output_key": "output_value"},
status=NodeExecutionStatus.RUNNING,
error=None,
elapsed_time=1.5,
metadata={NodeRunMetadataKey.TOTAL_TOKENS: 100},
created_at=datetime.now(),
finished_at=None,
)
# Convert to DB model
db_model = repository._to_db_model(domain_model)
# Assert DB model has correct values
assert isinstance(db_model, WorkflowNodeExecution)
assert db_model.id == domain_model.id
assert db_model.tenant_id == repository._tenant_id
assert db_model.app_id == repository._app_id
assert db_model.workflow_id == domain_model.workflow_id
assert db_model.triggered_from == repository._triggered_from
assert db_model.workflow_run_id == domain_model.workflow_run_id
assert db_model.index == domain_model.index
assert db_model.predecessor_node_id == domain_model.predecessor_node_id
assert db_model.node_execution_id == domain_model.node_execution_id
assert db_model.node_id == domain_model.node_id
assert db_model.node_type == domain_model.node_type
assert db_model.title == domain_model.title
assert db_model.inputs_dict == domain_model.inputs
assert db_model.process_data_dict == domain_model.process_data
assert db_model.outputs_dict == domain_model.outputs
assert db_model.execution_metadata_dict == domain_model.metadata
assert db_model.status == domain_model.status
assert db_model.error == domain_model.error
assert db_model.elapsed_time == domain_model.elapsed_time
assert db_model.created_at == domain_model.created_at
assert db_model.created_by_role == repository._creator_user_role
assert db_model.created_by == repository._creator_user_id
assert db_model.finished_at == domain_model.finished_at
def test_to_domain_model(repository):
"""Test _to_domain_model method."""
# Create input dictionaries
inputs_dict = {"input_key": "input_value"}
process_data_dict = {"process_key": "process_value"}
outputs_dict = {"output_key": "output_value"}
metadata_dict = {str(NodeRunMetadataKey.TOTAL_TOKENS): 100}
# Create a DB model using our custom subclass
db_model = WorkflowNodeExecution()
db_model.id = "test-id"
db_model.tenant_id = "test-tenant-id"
db_model.app_id = "test-app-id"
db_model.workflow_id = "test-workflow-id"
db_model.triggered_from = "workflow-run"
db_model.workflow_run_id = "test-workflow-run-id"
db_model.index = 1
db_model.predecessor_node_id = "test-predecessor-id"
db_model.node_execution_id = "test-node-execution-id"
db_model.node_id = "test-node-id"
db_model.node_type = NodeType.START.value
db_model.title = "Test Node"
db_model.inputs = json.dumps(inputs_dict)
db_model.process_data = json.dumps(process_data_dict)
db_model.outputs = json.dumps(outputs_dict)
db_model.status = WorkflowNodeExecutionStatus.RUNNING
db_model.error = None
db_model.elapsed_time = 1.5
db_model.execution_metadata = json.dumps(metadata_dict)
db_model.created_at = datetime.now()
db_model.created_by_role = "account"
db_model.created_by = "test-user-id"
db_model.finished_at = None
# Convert to domain model
domain_model = repository._to_domain_model(db_model)
# Assert domain model has correct values
assert isinstance(domain_model, NodeExecution)
assert domain_model.id == db_model.id
assert domain_model.workflow_id == db_model.workflow_id
assert domain_model.workflow_run_id == db_model.workflow_run_id
assert domain_model.index == db_model.index
assert domain_model.predecessor_node_id == db_model.predecessor_node_id
assert domain_model.node_execution_id == db_model.node_execution_id
assert domain_model.node_id == db_model.node_id
assert domain_model.node_type == NodeType(db_model.node_type)
assert domain_model.title == db_model.title
assert domain_model.inputs == inputs_dict
assert domain_model.process_data == process_data_dict
assert domain_model.outputs == outputs_dict
assert domain_model.status == NodeExecutionStatus(db_model.status)
assert domain_model.error == db_model.error
assert domain_model.elapsed_time == db_model.elapsed_time
assert domain_model.metadata == metadata_dict
assert domain_model.created_at == db_model.created_at
assert domain_model.finished_at == db_model.finished_at

Loading…
Cancel
Save