feat: Create a APIWorkflowRunRepository

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/22581/head
-LAN- 11 months ago
parent bc4efc3b48
commit 8c945b7a08
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -567,6 +567,11 @@ class RepositoryConfig(BaseSettings):
default="repositories.sqlalchemy_api_workflow_node_execution_repository.DifyAPISQLAlchemyWorkflowNodeExecutionRepository", default="repositories.sqlalchemy_api_workflow_node_execution_repository.DifyAPISQLAlchemyWorkflowNodeExecutionRepository",
) )
API_WORKFLOW_RUN_REPOSITORY: str = Field(
description="Service-layer repository implementation for WorkflowRun operations. Specify as a module path",
default="repositories.sqlalchemy_api_workflow_run_repository.DifyAPISQLAlchemyWorkflowRunRepository",
)
class AuthConfig(BaseSettings): class AuthConfig(BaseSettings):
""" """

@ -138,6 +138,7 @@ class AdvancedChatAppGenerateTaskPipeline:
self._workflow_response_converter = WorkflowResponseConverter( self._workflow_response_converter = WorkflowResponseConverter(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
workflow_execution_repository=workflow_execution_repository,
) )
self._task_state = WorkflowTaskState() self._task_state = WorkflowTaskState()

@ -49,6 +49,7 @@ from core.workflow.entities.workflow_execution import WorkflowExecution
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.nodes.tool.entities import ToolNodeData from core.workflow.nodes.tool.entities import ToolNodeData
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter
from models import ( from models import (
Account, Account,
@ -63,8 +64,10 @@ class WorkflowResponseConverter:
self, self,
*, *,
application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity], application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity],
workflow_execution_repository: WorkflowExecutionRepository,
) -> None: ) -> None:
self._application_generate_entity = application_generate_entity self._application_generate_entity = application_generate_entity
self._workflow_execution_repository = workflow_execution_repository
def workflow_start_to_stream_response( def workflow_start_to_stream_response(
self, self,

@ -126,6 +126,7 @@ class WorkflowAppGenerateTaskPipeline:
self._workflow_response_converter = WorkflowResponseConverter( self._workflow_response_converter = WorkflowResponseConverter(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
workflow_execution_repository=workflow_execution_repository,
) )
self._application_generate_entity = application_generate_entity self._application_generate_entity = application_generate_entity

@ -83,7 +83,7 @@ class TokenBufferMemory:
raise ValueError(f"Workflow not found: {workflow_run.workflow_id}") raise ValueError(f"Workflow not found: {workflow_run.workflow_id}")
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False) file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
else: else:
raise AssertionError(f"Invalid app mode: {self.conversation.mode}") raise ValueError(f"Invalid app mode: {self.conversation.mode}")
detail = ImagePromptMessageContent.DETAIL.LOW detail = ImagePromptMessageContent.DETAIL.LOW
if file_extra_config and app_record: if file_extra_config and app_record:

@ -38,12 +38,11 @@ from collections.abc import Sequence
from datetime import datetime from datetime import datetime
from typing import Optional, Protocol from typing import Optional, Protocol
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from libs.infinite_scroll_pagination import InfiniteScrollPagination from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models.workflow import WorkflowRun from models.workflow import WorkflowRun
class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): class APIWorkflowRunRepository(Protocol):
""" """
Protocol for service-layer WorkflowRun repository operations. Protocol for service-layer WorkflowRun repository operations.

@ -12,6 +12,7 @@ from sqlalchemy.orm import sessionmaker
from configs import dify_config from configs import dify_config
from core.repositories import DifyCoreRepositoryFactory, RepositoryImportError from core.repositories import DifyCoreRepositoryFactory, RepositoryImportError
from repositories.api_workflow_node_execution_repository import DifyAPIWorkflowNodeExecutionRepository from repositories.api_workflow_node_execution_repository import DifyAPIWorkflowNodeExecutionRepository
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -64,3 +65,39 @@ class DifyAPIRepositoryFactory(DifyCoreRepositoryFactory):
raise RepositoryImportError( raise RepositoryImportError(
f"Failed to create DifyAPIWorkflowNodeExecutionRepository from '{class_path}': {e}" f"Failed to create DifyAPIWorkflowNodeExecutionRepository from '{class_path}': {e}"
) from e ) from e
@classmethod
def create_api_workflow_run_repository(cls, session_maker: sessionmaker) -> APIWorkflowRunRepository:
"""
Create an APIWorkflowRunRepository instance based on configuration.
This repository is designed for service-layer WorkflowRun operations and uses dependency
injection with a sessionmaker for better testability and separation of concerns. It provides
database access patterns specifically needed by service classes for workflow run management,
including pagination, filtering, and bulk operations.
Args:
session_maker: SQLAlchemy sessionmaker to inject for database session management.
Returns:
Configured APIWorkflowRunRepository instance
Raises:
RepositoryImportError: If the configured repository cannot be imported or instantiated
"""
class_path = dify_config.API_WORKFLOW_RUN_REPOSITORY
logger.debug(f"Creating APIWorkflowRunRepository from: {class_path}")
try:
repository_class = cls._import_class(class_path)
cls._validate_repository_interface(repository_class, APIWorkflowRunRepository)
# Service repository requires session_maker parameter
cls._validate_constructor_signature(repository_class, ["session_maker"])
return repository_class(session_maker=session_maker) # type: ignore[no-any-return]
except RepositoryImportError:
# Re-raise our custom errors as-is
raise
except Exception as e:
logger.exception("Failed to create APIWorkflowRunRepository")
raise RepositoryImportError(f"Failed to create APIWorkflowRunRepository from '{class_path}': {e}") from e

@ -25,16 +25,15 @@ from datetime import datetime
from typing import Optional, cast from typing import Optional, cast
from sqlalchemy import delete, select from sqlalchemy import delete, select
from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm import sessionmaker
from libs.infinite_scroll_pagination import InfiniteScrollPagination from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models.workflow import WorkflowRun from models.workflow import WorkflowRun
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): class DifyAPISQLAlchemyWorkflowRunRepository:
""" """
SQLAlchemy implementation of APIWorkflowRunRepository. SQLAlchemy implementation of APIWorkflowRunRepository.
@ -46,7 +45,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
session_maker: SQLAlchemy sessionmaker instance for database connections session_maker: SQLAlchemy sessionmaker instance for database connections
""" """
def __init__(self, session_maker: sessionmaker[Session]) -> None: def __init__(self, session_maker: sessionmaker) -> None:
""" """
Initialize the repository with a sessionmaker. Initialize the repository with a sessionmaker.
@ -87,18 +86,34 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
raise ValueError("Last workflow run not exists") raise ValueError("Last workflow run not exists")
# Get records created before the last run's timestamp # Get records created before the last run's timestamp
base_stmt = base_stmt.where( workflow_runs = session.scalars(
base_stmt.where(
WorkflowRun.created_at < last_workflow_run.created_at, WorkflowRun.created_at < last_workflow_run.created_at,
WorkflowRun.id != last_workflow_run.id, WorkflowRun.id != last_workflow_run.id,
) )
.order_by(WorkflowRun.created_at.desc())
.limit(limit)
).all()
else:
# First page - get most recent records # First page - get most recent records
workflow_runs = session.scalars(base_stmt.order_by(WorkflowRun.created_at.desc()).limit(limit + 1)).all() workflow_runs = session.scalars(base_stmt.order_by(WorkflowRun.created_at.desc()).limit(limit)).all()
# Check if there are more records for pagination # Check if there are more records for pagination
has_more = len(workflow_runs) > limit has_more = False
if has_more: if len(workflow_runs) == limit:
workflow_runs = workflow_runs[:-1] current_page_last_run = workflow_runs[-1]
remaining_count = session.scalar(
select(WorkflowRun.id)
.where(
WorkflowRun.tenant_id == tenant_id,
WorkflowRun.app_id == app_id,
WorkflowRun.triggered_from == triggered_from,
WorkflowRun.created_at < current_page_last_run.created_at,
WorkflowRun.id != current_page_last_run.id,
)
.limit(1)
)
has_more = remaining_count is not None
return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more) return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more)

@ -14,7 +14,6 @@ from extensions.ext_database import db
from extensions.ext_storage import storage from extensions.ext_storage import storage
from models.account import Tenant from models.account import Tenant
from models.model import App, Conversation, Message from models.model import App, Conversation, Message
from models.workflow import WorkflowRun
from repositories.factory import DifyAPIRepositoryFactory from repositories.factory import DifyAPIRepositoryFactory
from services.billing_service import BillingService from services.billing_service import BillingService
@ -112,47 +111,6 @@ class ClearFreePlanTenantExpiredLogs:
before_date = datetime.datetime.now() - datetime.timedelta(days=days) before_date = datetime.datetime.now() - datetime.timedelta(days=days)
total_deleted = 0 total_deleted = 0
while True:
# Get a batch of expired executions for backup
workflow_node_executions = node_execution_repo.get_expired_executions_batch(
tenant_id=tenant_id,
before_date=before_date,
batch_size=batch,
)
if len(workflow_node_executions) == 0:
break
# Save workflow node executions to storage
storage.save(
f"free_plan_tenant_expired_logs/"
f"{tenant_id}/workflow_node_executions/{datetime.datetime.now().strftime('%Y-%m-%d')}"
f"-{time.time()}.json",
json.dumps(
jsonable_encoder(workflow_node_executions),
).encode("utf-8"),
)
# Extract IDs for deletion
workflow_node_execution_ids = [
workflow_node_execution.id for workflow_node_execution in workflow_node_executions
]
# Delete the backed up executions
deleted_count = node_execution_repo.delete_executions_by_ids(workflow_node_execution_ids)
total_deleted += deleted_count
click.echo(
click.style(
f"[{datetime.datetime.now()}] Processed {len(workflow_node_execution_ids)}"
f" workflow node executions for tenant {tenant_id}"
)
)
# If we got fewer than the batch size, we're done
if len(workflow_node_executions) < batch:
break
while True: while True:
# Get a batch of expired executions for backup # Get a batch of expired executions for backup
workflow_node_executions = node_execution_repo.get_expired_executions_batch( workflow_node_executions = node_execution_repo.get_expired_executions_batch(

@ -25,6 +25,7 @@ class WorkflowRunService:
self._node_execution_service_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository( self._node_execution_service_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(
session_maker session_maker
) )
self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
def get_paginate_advanced_chat_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination: def get_paginate_advanced_chat_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination:
""" """

@ -32,7 +32,7 @@ from models import (
) )
from models.tools import WorkflowToolProvider from models.tools import WorkflowToolProvider
from models.web import PinnedConversation, SavedMessage from models.web import PinnedConversation, SavedMessage
from models.workflow import ConversationVariable, Workflow, WorkflowAppLog, WorkflowRun from models.workflow import ConversationVariable, Workflow, WorkflowAppLog
from repositories.factory import DifyAPIRepositoryFactory from repositories.factory import DifyAPIRepositoryFactory
@ -192,7 +192,7 @@ def _delete_app_workflows(tenant_id: str, app_id: str):
def _delete_app_workflow_runs(tenant_id: str, app_id: str): def _delete_app_workflow_runs(tenant_id: str, app_id: str):
"""Delete all workflow runs for an app using the service repository.""" """Delete all workflow runs for an app using the service repository."""
session_maker = sessionmaker(bind=db.engine) session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
deleted_count = workflow_run_repo.delete_runs_by_app( deleted_count = workflow_run_repo.delete_runs_by_app(

Loading…
Cancel
Save