refactor: Refactors repository initialization and usage

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/19176/head
-LAN- 1 year ago
parent 1abf00e443
commit f952c6576e
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -54,7 +54,6 @@ def initialize_extensions(app: DifyApp):
ext_otel, ext_otel,
ext_proxy_fix, ext_proxy_fix,
ext_redis, ext_redis,
ext_repositories,
ext_sentry, ext_sentry,
ext_set_secretkey, ext_set_secretkey,
ext_storage, ext_storage,
@ -75,7 +74,6 @@ def initialize_extensions(app: DifyApp):
ext_migrate, ext_migrate,
ext_redis, ext_redis,
ext_storage, ext_storage,
ext_repositories,
ext_celery, ext_celery,
ext_login, ext_login,
ext_mail, ext_mail,

@ -25,7 +25,7 @@ from core.app.entities.task_entities import ChatbotAppBlockingResponse, ChatbotA
from core.model_runtime.errors.invoke import InvokeAuthorizationError from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
from core.workflow.repository import RepositoryFactory from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from extensions.ext_database import db from extensions.ext_database import db
from factories import file_factory from factories import file_factory
@ -163,12 +163,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
# Create workflow node execution repository # Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
params={ session_factory=session_factory,
"tenant_id": application_generate_entity.app_config.tenant_id, tenant_id=application_generate_entity.app_config.tenant_id,
"app_id": application_generate_entity.app_config.app_id, app_id=application_generate_entity.app_config.app_id,
"session_factory": session_factory,
}
) )
return self._generate( return self._generate(
@ -231,12 +229,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
# Create workflow node execution repository # Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
params={ session_factory=session_factory,
"tenant_id": application_generate_entity.app_config.tenant_id, tenant_id=application_generate_entity.app_config.tenant_id,
"app_id": application_generate_entity.app_config.app_id, app_id=application_generate_entity.app_config.app_id,
"session_factory": session_factory,
}
) )
return self._generate( return self._generate(
@ -297,12 +293,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
# Create workflow node execution repository # Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
params={ session_factory=session_factory,
"tenant_id": application_generate_entity.app_config.tenant_id, tenant_id=application_generate_entity.app_config.tenant_id,
"app_id": application_generate_entity.app_config.app_id, app_id=application_generate_entity.app_config.app_id,
"session_factory": session_factory,
}
) )
return self._generate( return self._generate(

@ -23,7 +23,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat
from core.app.entities.task_entities import WorkflowAppBlockingResponse, WorkflowAppStreamResponse from core.app.entities.task_entities import WorkflowAppBlockingResponse, WorkflowAppStreamResponse
from core.model_runtime.errors.invoke import InvokeAuthorizationError from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager from core.ops.ops_trace_manager import TraceQueueManager
from core.workflow.repository import RepositoryFactory from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from extensions.ext_database import db from extensions.ext_database import db
from factories import file_factory from factories import file_factory
@ -138,12 +138,10 @@ class WorkflowAppGenerator(BaseAppGenerator):
# Create workflow node execution repository # Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
params={ session_factory=session_factory,
"tenant_id": application_generate_entity.app_config.tenant_id, tenant_id=application_generate_entity.app_config.tenant_id,
"app_id": application_generate_entity.app_config.app_id, app_id=application_generate_entity.app_config.app_id,
"session_factory": session_factory,
}
) )
return self._generate( return self._generate(
@ -264,12 +262,10 @@ class WorkflowAppGenerator(BaseAppGenerator):
# Create workflow node execution repository # Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
params={ session_factory=session_factory,
"tenant_id": application_generate_entity.app_config.tenant_id, tenant_id=application_generate_entity.app_config.tenant_id,
"app_id": application_generate_entity.app_config.app_id, app_id=application_generate_entity.app_config.app_id,
"session_factory": session_factory,
}
) )
return self._generate( return self._generate(
@ -329,12 +325,10 @@ class WorkflowAppGenerator(BaseAppGenerator):
# Create workflow node execution repository # Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
params={ session_factory=session_factory,
"tenant_id": application_generate_entity.app_config.tenant_id, tenant_id=application_generate_entity.app_config.tenant_id,
"app_id": application_generate_entity.app_config.app_id, app_id=application_generate_entity.app_config.app_id,
"session_factory": session_factory,
}
) )
return self._generate( return self._generate(

@ -29,7 +29,7 @@ from core.ops.langfuse_trace.entities.langfuse_trace_entity import (
UnitEnum, UnitEnum,
) )
from core.ops.utils import filter_none_values from core.ops.utils import filter_none_values
from core.workflow.repository.repository_factory import RepositoryFactory from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from extensions.ext_database import db from extensions.ext_database import db
from models.model import EndUser from models.model import EndUser
@ -113,8 +113,8 @@ class LangFuseDataTrace(BaseTraceInstance):
# through workflow_run_id get all_nodes_execution using repository # through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine) session_factory = sessionmaker(bind=db.engine)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
params={"tenant_id": trace_info.tenant_id, "session_factory": session_factory}, session_factory=session_factory, tenant_id=trace_info.tenant_id
) )
# Get all executions for this workflow run # Get all executions for this workflow run

@ -28,7 +28,7 @@ from core.ops.langsmith_trace.entities.langsmith_trace_entity import (
LangSmithRunUpdateModel, LangSmithRunUpdateModel,
) )
from core.ops.utils import filter_none_values, generate_dotted_order from core.ops.utils import filter_none_values, generate_dotted_order
from core.workflow.repository.repository_factory import RepositoryFactory from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from extensions.ext_database import db from extensions.ext_database import db
from models.model import EndUser, MessageFile from models.model import EndUser, MessageFile
@ -137,12 +137,8 @@ class LangSmithDataTrace(BaseTraceInstance):
# through workflow_run_id get all_nodes_execution using repository # through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine) session_factory = sessionmaker(bind=db.engine)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
params={ session_factory=session_factory, tenant_id=trace_info.tenant_id, app_id=trace_info.metadata.get("app_id")
"tenant_id": trace_info.tenant_id,
"app_id": trace_info.metadata.get("app_id"),
"session_factory": session_factory,
},
) )
# Get all executions for this workflow run # Get all executions for this workflow run

@ -22,7 +22,7 @@ from core.ops.entities.trace_entity import (
TraceTaskName, TraceTaskName,
WorkflowTraceInfo, WorkflowTraceInfo,
) )
from core.workflow.repository.repository_factory import RepositoryFactory from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from extensions.ext_database import db from extensions.ext_database import db
from models.model import EndUser, MessageFile from models.model import EndUser, MessageFile
@ -150,12 +150,8 @@ class OpikDataTrace(BaseTraceInstance):
# through workflow_run_id get all_nodes_execution using repository # through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine) session_factory = sessionmaker(bind=db.engine)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
params={ session_factory=session_factory, tenant_id=trace_info.tenant_id, app_id=trace_info.metadata.get("app_id")
"tenant_id": trace_info.tenant_id,
"app_id": trace_info.metadata.get("app_id"),
"session_factory": session_factory,
},
) )
# Get all executions for this workflow run # Get all executions for this workflow run

@ -4,3 +4,9 @@ Repository implementations for data access.
This package contains concrete implementations of the repository interfaces This package contains concrete implementations of the repository interfaces
defined in the core.workflow.repository package. defined in the core.workflow.repository package.
""" """
from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository
__all__ = [
"SQLAlchemyWorkflowNodeExecutionRepository",
]

@ -1,87 +0,0 @@
"""
Registry for repository implementations.
This module is responsible for registering factory functions with the repository factory.
"""
import logging
from collections.abc import Mapping
from typing import Any
from sqlalchemy.orm import sessionmaker
from configs import dify_config
from core.repositories.workflow_node_execution import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.repository.repository_factory import RepositoryFactory
from extensions.ext_database import db
logger = logging.getLogger(__name__)
# Storage type constants
STORAGE_TYPE_RDBMS = "rdbms"
STORAGE_TYPE_HYBRID = "hybrid"
def register_repositories() -> None:
"""
Register repository factory functions with the RepositoryFactory.
This function reads configuration settings to determine which repository
implementations to register.
"""
# Configure WorkflowNodeExecutionRepository factory based on configuration
workflow_node_execution_storage = dify_config.WORKFLOW_NODE_EXECUTION_STORAGE
# Check storage type and register appropriate implementation
if workflow_node_execution_storage == STORAGE_TYPE_RDBMS:
# Register SQLAlchemy implementation for RDBMS storage
logger.info("Registering WorkflowNodeExecution repository with RDBMS storage")
RepositoryFactory.register_workflow_node_execution_factory(create_workflow_node_execution_repository)
elif workflow_node_execution_storage == STORAGE_TYPE_HYBRID:
# Hybrid storage is not yet implemented
raise NotImplementedError("Hybrid storage for WorkflowNodeExecution repository is not yet implemented")
else:
# Unknown storage type
raise ValueError(
f"Unknown storage type '{workflow_node_execution_storage}' for WorkflowNodeExecution repository. "
f"Supported types: {STORAGE_TYPE_RDBMS}"
)
def create_workflow_node_execution_repository(params: Mapping[str, Any]) -> SQLAlchemyWorkflowNodeExecutionRepository:
"""
Create a WorkflowNodeExecutionRepository instance using SQLAlchemy implementation.
This factory function creates a repository for the RDBMS storage type.
Args:
params: Parameters for creating the repository, including:
- tenant_id: Required. The tenant ID for multi-tenancy.
- app_id: Optional. The application ID for filtering.
- session_factory: Optional. A SQLAlchemy sessionmaker instance. If not provided,
a new sessionmaker will be created using the global database engine.
Returns:
A WorkflowNodeExecutionRepository instance
Raises:
ValueError: If required parameters are missing
"""
# Extract required parameters
tenant_id = params.get("tenant_id")
if tenant_id is None:
raise ValueError("tenant_id is required for WorkflowNodeExecution repository with RDBMS storage")
# Extract optional parameters
app_id = params.get("app_id")
# Use the session_factory from params if provided, otherwise create one using the global db engine
session_factory = params.get("session_factory")
if session_factory is None:
# Create a sessionmaker using the same engine as the global db session
session_factory = sessionmaker(bind=db.engine)
# Create and return the repository
return SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_factory, tenant_id=tenant_id, app_id=app_id
)

@ -10,13 +10,13 @@ from sqlalchemy import UnaryExpression, asc, delete, desc, select
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from core.workflow.repository.workflow_node_execution_repository import OrderConfig from core.workflow.repository.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository
from models.workflow import WorkflowNodeExecution, WorkflowNodeExecutionStatus, WorkflowNodeExecutionTriggeredFrom from models.workflow import WorkflowNodeExecution, WorkflowNodeExecutionStatus, WorkflowNodeExecutionTriggeredFrom
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SQLAlchemyWorkflowNodeExecutionRepository: class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
""" """
SQLAlchemy implementation of the WorkflowNodeExecutionRepository interface. SQLAlchemy implementation of the WorkflowNodeExecutionRepository interface.

@ -1,9 +0,0 @@
"""
WorkflowNodeExecution repository implementations.
"""
from core.repositories.workflow_node_execution.sqlalchemy_repository import SQLAlchemyWorkflowNodeExecutionRepository
__all__ = [
"SQLAlchemyWorkflowNodeExecutionRepository",
]

@ -6,10 +6,9 @@ for accessing and manipulating data, regardless of the underlying
storage mechanism. storage mechanism.
""" """
from core.workflow.repository.repository_factory import RepositoryFactory from core.workflow.repository.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
__all__ = [ __all__ = [
"RepositoryFactory", "OrderConfig",
"WorkflowNodeExecutionRepository", "WorkflowNodeExecutionRepository",
] ]

@ -1,97 +0,0 @@
"""
Repository factory for creating repository instances.
This module provides a simple factory interface for creating repository instances.
It does not contain any implementation details or dependencies on specific repositories.
"""
from collections.abc import Callable, Mapping
from typing import Any, Literal, Optional, cast
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
# Type for factory functions - takes a dict of parameters and returns any repository type
RepositoryFactoryFunc = Callable[[Mapping[str, Any]], Any]
# Type for workflow node execution factory function
WorkflowNodeExecutionFactoryFunc = Callable[[Mapping[str, Any]], WorkflowNodeExecutionRepository]
# Repository type literals
_RepositoryType = Literal["workflow_node_execution"]
class RepositoryFactory:
"""
Factory class for creating repository instances.
This factory delegates the actual repository creation to implementation-specific
factory functions that are registered with the factory at runtime.
"""
# Dictionary to store factory functions
_factory_functions: dict[str, RepositoryFactoryFunc] = {}
@classmethod
def _register_factory(cls, repository_type: _RepositoryType, factory_func: RepositoryFactoryFunc) -> None:
"""
Register a factory function for a specific repository type.
This is a private method and should not be called directly.
Args:
repository_type: The type of repository (e.g., 'workflow_node_execution')
factory_func: A function that takes parameters and returns a repository instance
"""
cls._factory_functions[repository_type] = factory_func
@classmethod
def _create_repository(cls, repository_type: _RepositoryType, params: Optional[Mapping[str, Any]] = None) -> Any:
"""
Create a new repository instance with the provided parameters.
This is a private method and should not be called directly.
Args:
repository_type: The type of repository to create
params: A dictionary of parameters to pass to the factory function
Returns:
A new instance of the requested repository
Raises:
ValueError: If no factory function is registered for the repository type
"""
if repository_type not in cls._factory_functions:
raise ValueError(f"No factory function registered for repository type '{repository_type}'")
# Use empty dict if params is None
params = params or {}
return cls._factory_functions[repository_type](params)
@classmethod
def register_workflow_node_execution_factory(cls, factory_func: WorkflowNodeExecutionFactoryFunc) -> None:
"""
Register a factory function for the workflow node execution repository.
Args:
factory_func: A function that takes parameters and returns a WorkflowNodeExecutionRepository instance
"""
cls._register_factory("workflow_node_execution", factory_func)
@classmethod
def create_workflow_node_execution_repository(
cls, params: Optional[Mapping[str, Any]] = None
) -> WorkflowNodeExecutionRepository:
"""
Create a new WorkflowNodeExecutionRepository instance with the provided parameters.
Args:
params: A dictionary of parameters to pass to the factory function
Returns:
A new instance of the WorkflowNodeExecutionRepository
Raises:
ValueError: If no factory function is registered for the workflow_node_execution repository type
"""
# We can safely cast here because we've registered a WorkflowNodeExecutionFactoryFunc
return cast(WorkflowNodeExecutionRepository, cls._create_repository("workflow_node_execution", params))

@ -1,18 +0,0 @@
"""
Extension for initializing repositories.
This extension registers repository implementations with the RepositoryFactory.
"""
from core.repositories.repository_registry import register_repositories
from dify_app import DifyApp
def init_app(_app: DifyApp) -> None:
"""
Initialize repository implementations.
Args:
_app: The Flask application instance (unused)
"""
register_repositories()

@ -2,7 +2,7 @@ import threading
from typing import Optional from typing import Optional
import contexts import contexts
from core.workflow.repository import RepositoryFactory from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import OrderConfig from core.workflow.repository.workflow_node_execution_repository import OrderConfig
from extensions.ext_database import db from extensions.ext_database import db
from libs.infinite_scroll_pagination import InfiniteScrollPagination from libs.infinite_scroll_pagination import InfiniteScrollPagination
@ -129,12 +129,8 @@ class WorkflowRunService:
return [] return []
# Use the repository to get the node executions # Use the repository to get the node executions
repository = RepositoryFactory.create_workflow_node_execution_repository( repository = SQLAlchemyWorkflowNodeExecutionRepository(
params={ session_factory=db.engine, tenant_id=app_model.tenant_id, app_id=app_model.id
"tenant_id": app_model.tenant_id,
"app_id": app_model.id,
"session_factory": db.session.get_bind(),
}
) )
# Use the repository to get the node executions with ordering # Use the repository to get the node executions with ordering

@ -11,6 +11,7 @@ from sqlalchemy.orm import Session
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.model_runtime.utils.encoders import jsonable_encoder from core.model_runtime.utils.encoders import jsonable_encoder
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.variables import Variable from core.variables import Variable
from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.errors import WorkflowNodeRunFailedError
@ -21,7 +22,6 @@ from core.workflow.nodes.enums import ErrorStrategy
from core.workflow.nodes.event import RunCompletedEvent from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.nodes.event.types import NodeEvent from core.workflow.nodes.event.types import NodeEvent
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from core.workflow.repository import RepositoryFactory
from core.workflow.workflow_entry import WorkflowEntry from core.workflow.workflow_entry import WorkflowEntry
from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
from extensions.ext_database import db from extensions.ext_database import db
@ -285,12 +285,8 @@ class WorkflowService:
workflow_node_execution.workflow_id = draft_workflow.id workflow_node_execution.workflow_id = draft_workflow.id
# Use the repository to save the workflow node execution # Use the repository to save the workflow node execution
repository = RepositoryFactory.create_workflow_node_execution_repository( repository = SQLAlchemyWorkflowNodeExecutionRepository(
params={ session_factory=db.engine, tenant_id=app_model.tenant_id, app_id=app_model.id
"tenant_id": app_model.tenant_id,
"app_id": app_model.id,
"session_factory": db.session.get_bind(),
}
) )
repository.save(workflow_node_execution) repository.save(workflow_node_execution)

@ -7,7 +7,7 @@ from celery import shared_task # type: ignore
from sqlalchemy import delete from sqlalchemy import delete
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from core.workflow.repository import RepositoryFactory from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from extensions.ext_database import db from extensions.ext_database import db
from models.dataset import AppDatasetJoin from models.dataset import AppDatasetJoin
from models.model import ( from models.model import (
@ -189,12 +189,8 @@ def _delete_app_workflow_runs(tenant_id: str, app_id: str):
def _delete_app_workflow_node_executions(tenant_id: str, app_id: str): def _delete_app_workflow_node_executions(tenant_id: str, app_id: str):
# Create a repository instance for WorkflowNodeExecution # Create a repository instance for WorkflowNodeExecution
repository = RepositoryFactory.create_workflow_node_execution_repository( repository = SQLAlchemyWorkflowNodeExecutionRepository(
params={ session_factory=db.engine, tenant_id=tenant_id, app_id=app_id
"tenant_id": tenant_id,
"app_id": app_id,
"session_factory": db.session.get_bind(),
}
) )
# Use the clear method to delete all records for this tenant_id and app_id # Use the clear method to delete all records for this tenant_id and app_id

@ -8,7 +8,7 @@ import pytest
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm import Session, sessionmaker
from core.repositories.workflow_node_execution.sqlalchemy_repository import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import OrderConfig from core.workflow.repository.workflow_node_execution_repository import OrderConfig
from models.workflow import WorkflowNodeExecution from models.workflow import WorkflowNodeExecution
@ -80,7 +80,7 @@ def test_get_by_node_execution_id(repository, session, mocker: MockerFixture):
"""Test get_by_node_execution_id method.""" """Test get_by_node_execution_id method."""
session_obj, _ = session session_obj, _ = session
# Set up mock # Set up mock
mock_select = mocker.patch("core.repositories.workflow_node_execution.sqlalchemy_repository.select") mock_select = mocker.patch("core.repositories.sqlalchemy_workflow_node_execution_repository.select")
mock_stmt = mocker.MagicMock() mock_stmt = mocker.MagicMock()
mock_select.return_value = mock_stmt mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt mock_stmt.where.return_value = mock_stmt
@ -99,7 +99,7 @@ def test_get_by_workflow_run(repository, session, mocker: MockerFixture):
"""Test get_by_workflow_run method.""" """Test get_by_workflow_run method."""
session_obj, _ = session session_obj, _ = session
# Set up mock # Set up mock
mock_select = mocker.patch("core.repositories.workflow_node_execution.sqlalchemy_repository.select") mock_select = mocker.patch("core.repositories.sqlalchemy_workflow_node_execution_repository.select")
mock_stmt = mocker.MagicMock() mock_stmt = mocker.MagicMock()
mock_select.return_value = mock_stmt mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt mock_stmt.where.return_value = mock_stmt
@ -120,7 +120,7 @@ def test_get_running_executions(repository, session, mocker: MockerFixture):
"""Test get_running_executions method.""" """Test get_running_executions method."""
session_obj, _ = session session_obj, _ = session
# Set up mock # Set up mock
mock_select = mocker.patch("core.repositories.workflow_node_execution.sqlalchemy_repository.select") mock_select = mocker.patch("core.repositories.sqlalchemy_workflow_node_execution_repository.select")
mock_stmt = mocker.MagicMock() mock_stmt = mocker.MagicMock()
mock_select.return_value = mock_stmt mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt mock_stmt.where.return_value = mock_stmt
@ -158,7 +158,7 @@ def test_clear(repository, session, mocker: MockerFixture):
"""Test clear method.""" """Test clear method."""
session_obj, _ = session session_obj, _ = session
# Set up mock # Set up mock
mock_delete = mocker.patch("core.repositories.workflow_node_execution.sqlalchemy_repository.delete") mock_delete = mocker.patch("core.repositories.sqlalchemy_workflow_node_execution_repository.delete")
mock_stmt = mocker.MagicMock() mock_stmt = mocker.MagicMock()
mock_delete.return_value = mock_stmt mock_delete.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt mock_stmt.where.return_value = mock_stmt

Loading…
Cancel
Save