feat: Create a APIWorkflowRunRepository

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/21458/head
-LAN- 11 months ago
parent b2b4049279
commit b450acf94b
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -558,6 +558,11 @@ class RepositoryConfig(BaseSettings):
default="repositories.sqlalchemy_api_workflow_node_execution_repository.DifyAPISQLAlchemyWorkflowNodeExecutionRepository", default="repositories.sqlalchemy_api_workflow_node_execution_repository.DifyAPISQLAlchemyWorkflowNodeExecutionRepository",
) )
API_WORKFLOW_RUN_REPOSITORY: str = Field(
description="Service-layer repository implementation for WorkflowRun operations. Specify as a module path",
default="repositories.sqlalchemy_api_workflow_run_repository.DifyAPISQLAlchemyWorkflowRunRepository",
)
class AuthConfig(BaseSettings): class AuthConfig(BaseSettings):
""" """

@ -35,8 +35,6 @@ def get_app_model(view: Optional[Callable] = None, *, mode: Union[AppMode, list[
raise AppNotFoundError() raise AppNotFoundError()
app_mode = AppMode.value_of(app_model.mode) app_mode = AppMode.value_of(app_model.mode)
if app_mode == AppMode.CHANNEL:
raise AppNotFoundError()
if mode is not None: if mode is not None:
if isinstance(mode, list): if isinstance(mode, list):

@ -3,7 +3,7 @@ import logging
from dateutil.parser import isoparse from dateutil.parser import isoparse
from flask_restful import Resource, fields, marshal_with, reqparse from flask_restful import Resource, fields, marshal_with, reqparse
from flask_restful.inputs import int_range from flask_restful.inputs import int_range
from sqlalchemy.orm import Session from sqlalchemy.orm import Session, sessionmaker
from werkzeug.exceptions import InternalServerError from werkzeug.exceptions import InternalServerError
from controllers.service_api import api from controllers.service_api import api
@ -30,7 +30,7 @@ from fields.workflow_app_log_fields import workflow_app_log_pagination_fields
from libs import helper from libs import helper
from libs.helper import TimestampField from libs.helper import TimestampField
from models.model import App, AppMode, EndUser from models.model import App, AppMode, EndUser
from models.workflow import WorkflowRun from repositories.factory import DifyAPIRepositoryFactory
from services.app_generate_service import AppGenerateService from services.app_generate_service import AppGenerateService
from services.errors.llm import InvokeRateLimitError from services.errors.llm import InvokeRateLimitError
from services.workflow_app_service import WorkflowAppService from services.workflow_app_service import WorkflowAppService
@ -63,7 +63,15 @@ class WorkflowRunDetailApi(Resource):
if app_mode not in [AppMode.WORKFLOW, AppMode.ADVANCED_CHAT]: if app_mode not in [AppMode.WORKFLOW, AppMode.ADVANCED_CHAT]:
raise NotWorkflowAppError() raise NotWorkflowAppError()
workflow_run = db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first() # Use repository to get workflow run
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
workflow_run = workflow_run_repo.get_workflow_run_by_id(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
run_id=workflow_run_id,
)
return workflow_run return workflow_run

@ -3,6 +3,8 @@ import logging
import uuid import uuid
from typing import Optional, Union, cast from typing import Optional, Union, cast
from sqlalchemy import select
from core.agent.entities import AgentEntity, AgentToolEntity from core.agent.entities import AgentEntity, AgentToolEntity
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.agent_chat.app_config_manager import AgentChatAppConfig from core.app.apps.agent_chat.app_config_manager import AgentChatAppConfig
@ -417,12 +419,15 @@ class BaseAgentRunner(AppRunner):
if isinstance(prompt_message, SystemPromptMessage): if isinstance(prompt_message, SystemPromptMessage):
result.append(prompt_message) result.append(prompt_message)
messages: list[Message] = ( messages = (
db.session.query(Message) (
.filter( db.session.execute(
Message.conversation_id == self.message.conversation_id, select(Message)
) .where(Message.conversation_id == self.message.conversation_id)
.order_by(Message.created_at.desc()) .order_by(Message.created_at.desc())
)
)
.scalars()
.all() .all()
) )

@ -138,6 +138,7 @@ class AdvancedChatAppGenerateTaskPipeline:
self._workflow_response_converter = WorkflowResponseConverter( self._workflow_response_converter = WorkflowResponseConverter(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
workflow_execution_repository=workflow_execution_repository,
) )
self._task_state = WorkflowTaskState() self._task_state = WorkflowTaskState()

@ -49,6 +49,7 @@ from core.workflow.entities.workflow_execution import WorkflowExecution
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.nodes.tool.entities import ToolNodeData from core.workflow.nodes.tool.entities import ToolNodeData
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter
from models import ( from models import (
Account, Account,
@ -63,8 +64,10 @@ class WorkflowResponseConverter:
self, self,
*, *,
application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity], application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity],
workflow_execution_repository: WorkflowExecutionRepository,
) -> None: ) -> None:
self._application_generate_entity = application_generate_entity self._application_generate_entity = application_generate_entity
self._workflow_execution_repository = workflow_execution_repository
def workflow_start_to_stream_response( def workflow_start_to_stream_response(
self, self,

@ -3,7 +3,6 @@ import time
from collections.abc import Generator from collections.abc import Generator
from typing import Optional, Union from typing import Optional, Union
from sqlalchemy import select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME
@ -68,7 +67,6 @@ from models.workflow import (
Workflow, Workflow,
WorkflowAppLog, WorkflowAppLog,
WorkflowAppLogCreatedFrom, WorkflowAppLogCreatedFrom,
WorkflowRun,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -128,6 +126,7 @@ class WorkflowAppGenerateTaskPipeline:
self._workflow_response_converter = WorkflowResponseConverter( self._workflow_response_converter = WorkflowResponseConverter(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
workflow_execution_repository=workflow_execution_repository,
) )
self._application_generate_entity = application_generate_entity self._application_generate_entity = application_generate_entity
@ -562,8 +561,6 @@ class WorkflowAppGenerateTaskPipeline:
tts_publisher.publish(None) tts_publisher.publish(None)
def _save_workflow_app_log(self, *, session: Session, workflow_execution: WorkflowExecution) -> None: def _save_workflow_app_log(self, *, session: Session, workflow_execution: WorkflowExecution) -> None:
workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id_))
assert workflow_run is not None
invoke_from = self._application_generate_entity.invoke_from invoke_from = self._application_generate_entity.invoke_from
if invoke_from == InvokeFrom.SERVICE_API: if invoke_from == InvokeFrom.SERVICE_API:
created_from = WorkflowAppLogCreatedFrom.SERVICE_API created_from = WorkflowAppLogCreatedFrom.SERVICE_API
@ -576,10 +573,10 @@ class WorkflowAppGenerateTaskPipeline:
return return
workflow_app_log = WorkflowAppLog() workflow_app_log = WorkflowAppLog()
workflow_app_log.tenant_id = workflow_run.tenant_id workflow_app_log.tenant_id = self._application_generate_entity.app_config.tenant_id
workflow_app_log.app_id = workflow_run.app_id workflow_app_log.app_id = self._application_generate_entity.app_config.app_id
workflow_app_log.workflow_id = workflow_run.workflow_id workflow_app_log.workflow_id = workflow_execution.workflow_id
workflow_app_log.workflow_run_id = workflow_run.id workflow_app_log.workflow_run_id = workflow_execution.id_
workflow_app_log.created_from = created_from.value workflow_app_log.created_from = created_from.value
workflow_app_log.created_by_role = self._created_by_role workflow_app_log.created_by_role = self._created_by_role
workflow_app_log.created_by = self._user_id workflow_app_log.created_by = self._user_id

@ -1,6 +1,8 @@
from collections.abc import Sequence from collections.abc import Sequence
from typing import Optional from typing import Optional
from sqlalchemy import select
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.file import file_manager from core.file import file_manager
from core.model_manager import ModelInstance from core.model_manager import ModelInstance
@ -17,11 +19,15 @@ from core.prompt.utils.extract_thread_messages import extract_thread_messages
from extensions.ext_database import db from extensions.ext_database import db
from factories import file_factory from factories import file_factory
from models.model import AppMode, Conversation, Message, MessageFile from models.model import AppMode, Conversation, Message, MessageFile
from models.workflow import WorkflowRun from models.workflow import Workflow, WorkflowRun
class TokenBufferMemory: class TokenBufferMemory:
def __init__(self, conversation: Conversation, model_instance: ModelInstance) -> None: def __init__(
self,
conversation: Conversation,
model_instance: ModelInstance,
) -> None:
self.conversation = conversation self.conversation = conversation
self.model_instance = model_instance self.model_instance = model_instance
@ -36,20 +42,8 @@ class TokenBufferMemory:
app_record = self.conversation.app app_record = self.conversation.app
# fetch limited messages, and return reversed # fetch limited messages, and return reversed
query = ( stmt = (
db.session.query( select(Message).where(Message.conversation_id == self.conversation.id).order_by(Message.created_at.desc())
Message.id,
Message.query,
Message.answer,
Message.created_at,
Message.workflow_run_id,
Message.parent_message_id,
Message.answer_tokens,
)
.filter(
Message.conversation_id == self.conversation.id,
)
.order_by(Message.created_at.desc())
) )
if message_limit and message_limit > 0: if message_limit and message_limit > 0:
@ -57,7 +51,9 @@ class TokenBufferMemory:
else: else:
message_limit = 500 message_limit = 500
messages = query.limit(message_limit).all() stmt = stmt.limit(message_limit)
messages = db.session.scalars(stmt).all()
# instead of all messages from the conversation, we only need to extract messages # instead of all messages from the conversation, we only need to extract messages
# that belong to the thread of last message # that belong to the thread of last message
@ -74,18 +70,20 @@ class TokenBufferMemory:
files = db.session.query(MessageFile).filter(MessageFile.message_id == message.id).all() files = db.session.query(MessageFile).filter(MessageFile.message_id == message.id).all()
if files: if files:
file_extra_config = None file_extra_config = None
if self.conversation.mode not in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}: if self.conversation.mode in {AppMode.AGENT_CHAT, AppMode.COMPLETION, AppMode.CHAT}:
file_extra_config = FileUploadConfigManager.convert(self.conversation.model_config) file_extra_config = FileUploadConfigManager.convert(self.conversation.model_config)
else: elif self.conversation.mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
if message.workflow_run_id: workflow_run = db.session.scalar(
workflow_run = ( select(WorkflowRun).where(WorkflowRun.id == message.workflow_run_id)
db.session.query(WorkflowRun).filter(WorkflowRun.id == message.workflow_run_id).first()
)
if workflow_run and workflow_run.workflow:
file_extra_config = FileUploadConfigManager.convert(
workflow_run.workflow.features_dict, is_vision=False
) )
if not workflow_run:
raise ValueError(f"Workflow run not found: {message.workflow_run_id}")
workflow = db.session.scalar(select(Workflow).where(Workflow.id == workflow_run.workflow_id))
if not workflow:
raise ValueError(f"Workflow not found: {workflow_run.workflow_id}")
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
else:
raise ValueError(f"Invalid app mode: {self.conversation.mode}")
detail = ImagePromptMessageContent.DETAIL.LOW detail = ImagePromptMessageContent.DETAIL.LOW
if file_extra_config and app_record: if file_extra_config and app_record:

@ -1,10 +1,11 @@
from typing import Any from collections.abc import Sequence
from constants import UUID_NIL from constants import UUID_NIL
from models import Message
def extract_thread_messages(messages: list[Any]): def extract_thread_messages(messages: Sequence[Message]):
thread_messages = [] thread_messages: list[Message] = []
next_message = None next_message = None
for message in messages: for message in messages:

@ -1,3 +1,5 @@
from sqlalchemy import select
from core.prompt.utils.extract_thread_messages import extract_thread_messages from core.prompt.utils.extract_thread_messages import extract_thread_messages
from extensions.ext_database import db from extensions.ext_database import db
from models.model import Message from models.model import Message
@ -8,19 +10,9 @@ def get_thread_messages_length(conversation_id: str) -> int:
Get the number of thread messages based on the parent message id. Get the number of thread messages based on the parent message id.
""" """
# Fetch all messages related to the conversation # Fetch all messages related to the conversation
query = ( stmt = select(Message).where(Message.conversation_id == conversation_id).order_by(Message.created_at.desc())
db.session.query(
Message.id,
Message.parent_message_id,
Message.answer,
)
.filter(
Message.conversation_id == conversation_id,
)
.order_by(Message.created_at.desc())
)
messages = query.all() messages = db.session.scalars(stmt).all()
# Extract thread messages # Extract thread messages
thread_messages = extract_thread_messages(messages) thread_messages = extract_thread_messages(messages)

@ -50,7 +50,6 @@ class AppMode(StrEnum):
CHAT = "chat" CHAT = "chat"
ADVANCED_CHAT = "advanced-chat" ADVANCED_CHAT = "advanced-chat"
AGENT_CHAT = "agent-chat" AGENT_CHAT = "agent-chat"
CHANNEL = "channel"
@classmethod @classmethod
def value_of(cls, value: str) -> "AppMode": def value_of(cls, value: str) -> "AppMode":
@ -934,7 +933,7 @@ class Message(Base):
created_at: Mapped[datetime] = mapped_column(db.DateTime, server_default=func.current_timestamp()) created_at: Mapped[datetime] = mapped_column(db.DateTime, server_default=func.current_timestamp())
updated_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp()) updated_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp())
agent_based = db.Column(db.Boolean, nullable=False, server_default=db.text("false")) agent_based = db.Column(db.Boolean, nullable=False, server_default=db.text("false"))
workflow_run_id = db.Column(StringUUID) workflow_run_id: Mapped[str] = db.Column(StringUUID)
@property @property
def inputs(self): def inputs(self):

@ -0,0 +1,180 @@
"""
API WorkflowRun Repository Protocol
This module defines the protocol for service-layer WorkflowRun operations.
The repository provides an abstraction layer for WorkflowRun database operations
used by service classes, separating service-layer concerns from core domain logic.
Key Features:
- Paginated workflow run queries with filtering
- Bulk deletion operations with OSS backup support
- Multi-tenant data isolation
- Expired record cleanup with data retention
- Service-layer specific query patterns
Usage:
This protocol should be used by service classes that need to perform
WorkflowRun database operations. It provides a clean interface that
hides implementation details and supports dependency injection.
Example:
```python
from repositories.dify_api_repository_factory import DifyAPIRepositoryFactory
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
# Get paginated workflow runs
runs = repo.get_paginated_workflow_runs(
tenant_id="tenant-123",
app_id="app-456",
triggered_from="debugging",
limit=20
)
```
"""
from collections.abc import Sequence
from datetime import datetime
from typing import Optional, Protocol
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models.workflow import WorkflowRun
class APIWorkflowRunRepository(Protocol):
"""
Protocol for service-layer WorkflowRun repository operations.
This protocol defines the interface for WorkflowRun database operations
that are specific to service-layer needs, including pagination, filtering,
and bulk operations with data backup support.
"""
def get_paginated_workflow_runs(
self,
tenant_id: str,
app_id: str,
triggered_from: str,
limit: int = 20,
last_id: Optional[str] = None,
) -> InfiniteScrollPagination:
"""
Get paginated workflow runs with filtering.
Retrieves workflow runs for a specific app and trigger source with
cursor-based pagination support. Used primarily for debugging and
workflow run listing in the UI.
Args:
tenant_id: Tenant identifier for multi-tenant isolation
app_id: Application identifier
triggered_from: Filter by trigger source (e.g., "debugging", "app-run")
limit: Maximum number of records to return (default: 20)
last_id: Cursor for pagination - ID of the last record from previous page
Returns:
InfiniteScrollPagination object containing:
- data: List of WorkflowRun objects
- limit: Applied limit
- has_more: Boolean indicating if more records exist
Raises:
ValueError: If last_id is provided but the corresponding record doesn't exist
"""
...
def get_workflow_run_by_id(
self,
tenant_id: str,
app_id: str,
run_id: str,
) -> Optional[WorkflowRun]:
"""
Get a specific workflow run by ID.
Retrieves a single workflow run with tenant and app isolation.
Used for workflow run detail views and execution tracking.
Args:
tenant_id: Tenant identifier for multi-tenant isolation
app_id: Application identifier
run_id: Workflow run identifier
Returns:
WorkflowRun object if found, None otherwise
"""
...
def get_expired_runs_batch(
self,
tenant_id: str,
before_date: datetime,
batch_size: int = 1000,
) -> Sequence[WorkflowRun]:
"""
Get a batch of expired workflow runs for cleanup.
Retrieves workflow runs created before the specified date for
cleanup operations. Used by scheduled tasks to remove old data
while maintaining data retention policies.
Args:
tenant_id: Tenant identifier for multi-tenant isolation
before_date: Only return runs created before this date
batch_size: Maximum number of records to return
Returns:
Sequence of WorkflowRun objects to be processed for cleanup
"""
...
def delete_runs_by_ids(
self,
run_ids: Sequence[str],
) -> int:
"""
Delete workflow runs by their IDs.
Performs bulk deletion of workflow runs by ID. This method should
be used after backing up the data to OSS storage for retention.
Args:
run_ids: Sequence of workflow run IDs to delete
Returns:
Number of records actually deleted
Note:
This method performs hard deletion. Ensure data is backed up
to OSS storage before calling this method for compliance with
data retention policies.
"""
...
def delete_runs_by_app(
self,
tenant_id: str,
app_id: str,
batch_size: int = 1000,
) -> int:
"""
Delete all workflow runs for a specific app.
Performs bulk deletion of all workflow runs associated with an app.
Used during app cleanup operations. Processes records in batches
to avoid memory issues and long-running transactions.
Args:
tenant_id: Tenant identifier for multi-tenant isolation
app_id: Application identifier
batch_size: Number of records to process in each batch
Returns:
Total number of records deleted across all batches
Note:
This method performs hard deletion without backup. Use with caution
and ensure proper data retention policies are followed.
"""
...

@ -12,6 +12,7 @@ from sqlalchemy.orm import sessionmaker
from configs import dify_config from configs import dify_config
from core.repositories import DifyCoreRepositoryFactory, RepositoryImportError from core.repositories import DifyCoreRepositoryFactory, RepositoryImportError
from repositories.api_workflow_node_execution_repository import DifyAPIWorkflowNodeExecutionRepository from repositories.api_workflow_node_execution_repository import DifyAPIWorkflowNodeExecutionRepository
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -64,3 +65,39 @@ class DifyAPIRepositoryFactory(DifyCoreRepositoryFactory):
raise RepositoryImportError( raise RepositoryImportError(
f"Failed to create DifyAPIWorkflowNodeExecutionRepository from '{class_path}': {e}" f"Failed to create DifyAPIWorkflowNodeExecutionRepository from '{class_path}': {e}"
) from e ) from e
@classmethod
def create_api_workflow_run_repository(cls, session_maker: sessionmaker) -> APIWorkflowRunRepository:
"""
Create an APIWorkflowRunRepository instance based on configuration.
This repository is designed for service-layer WorkflowRun operations and uses dependency
injection with a sessionmaker for better testability and separation of concerns. It provides
database access patterns specifically needed by service classes for workflow run management,
including pagination, filtering, and bulk operations.
Args:
session_maker: SQLAlchemy sessionmaker to inject for database session management.
Returns:
Configured APIWorkflowRunRepository instance
Raises:
RepositoryImportError: If the configured repository cannot be imported or instantiated
"""
class_path = dify_config.API_WORKFLOW_RUN_REPOSITORY
logger.debug(f"Creating APIWorkflowRunRepository from: {class_path}")
try:
repository_class = cls._import_class(class_path)
cls._validate_repository_interface(repository_class, APIWorkflowRunRepository)
# Service repository requires session_maker parameter
cls._validate_constructor_signature(repository_class, ["session_maker"])
return repository_class(session_maker=session_maker) # type: ignore[no-any-return]
except RepositoryImportError:
# Re-raise our custom errors as-is
raise
except Exception as e:
logger.exception("Failed to create APIWorkflowRunRepository")
raise RepositoryImportError(f"Failed to create APIWorkflowRunRepository from '{class_path}': {e}") from e

@ -0,0 +1,218 @@
"""
SQLAlchemy API WorkflowRun Repository Implementation
This module provides the SQLAlchemy-based implementation of the APIWorkflowRunRepository
protocol. It handles service-layer WorkflowRun database operations using SQLAlchemy 2.0
style queries with proper session management and multi-tenant data isolation.
Key Features:
- SQLAlchemy 2.0 style queries for modern database operations
- Cursor-based pagination for efficient large dataset handling
- Bulk operations with batch processing for performance
- Multi-tenant data isolation and security
- Proper session management with dependency injection
Implementation Notes:
- Uses sessionmaker for consistent session management
- Implements cursor-based pagination using created_at timestamps
- Provides efficient bulk deletion with batch processing
- Maintains data consistency with proper transaction handling
"""
import logging
from collections.abc import Sequence
from datetime import datetime
from typing import Optional, cast
from sqlalchemy import delete, select
from sqlalchemy.orm import sessionmaker
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models.workflow import WorkflowRun
logger = logging.getLogger(__name__)
class DifyAPISQLAlchemyWorkflowRunRepository:
"""
SQLAlchemy implementation of APIWorkflowRunRepository.
Provides service-layer WorkflowRun database operations using SQLAlchemy 2.0
style queries. Supports dependency injection through sessionmaker and
maintains proper multi-tenant data isolation.
Args:
session_maker: SQLAlchemy sessionmaker instance for database connections
"""
def __init__(self, session_maker: sessionmaker) -> None:
"""
Initialize the repository with a sessionmaker.
Args:
session_maker: SQLAlchemy sessionmaker for database connections
"""
self._session_maker = session_maker
def get_paginated_workflow_runs(
self,
tenant_id: str,
app_id: str,
triggered_from: str,
limit: int = 20,
last_id: Optional[str] = None,
) -> InfiniteScrollPagination:
"""
Get paginated workflow runs with filtering.
Implements cursor-based pagination using created_at timestamps for
efficient handling of large datasets. Filters by tenant, app, and
trigger source for proper data isolation.
"""
with self._session_maker() as session:
# Build base query with filters
base_stmt = select(WorkflowRun).where(
WorkflowRun.tenant_id == tenant_id,
WorkflowRun.app_id == app_id,
WorkflowRun.triggered_from == triggered_from,
)
if last_id:
# Get the last workflow run for cursor-based pagination
last_run_stmt = base_stmt.where(WorkflowRun.id == last_id)
last_workflow_run = session.scalar(last_run_stmt)
if not last_workflow_run:
raise ValueError("Last workflow run not exists")
# Get records created before the last run's timestamp
workflow_runs = session.scalars(
base_stmt.where(
WorkflowRun.created_at < last_workflow_run.created_at,
WorkflowRun.id != last_workflow_run.id,
)
.order_by(WorkflowRun.created_at.desc())
.limit(limit)
).all()
else:
# First page - get most recent records
workflow_runs = session.scalars(base_stmt.order_by(WorkflowRun.created_at.desc()).limit(limit)).all()
# Check if there are more records for pagination
has_more = False
if len(workflow_runs) == limit:
current_page_last_run = workflow_runs[-1]
remaining_count = session.scalar(
select(WorkflowRun.id)
.where(
WorkflowRun.tenant_id == tenant_id,
WorkflowRun.app_id == app_id,
WorkflowRun.triggered_from == triggered_from,
WorkflowRun.created_at < current_page_last_run.created_at,
WorkflowRun.id != current_page_last_run.id,
)
.limit(1)
)
has_more = remaining_count is not None
return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more)
def get_workflow_run_by_id(
self,
tenant_id: str,
app_id: str,
run_id: str,
) -> Optional[WorkflowRun]:
"""
Get a specific workflow run by ID with tenant and app isolation.
"""
with self._session_maker() as session:
stmt = select(WorkflowRun).where(
WorkflowRun.tenant_id == tenant_id,
WorkflowRun.app_id == app_id,
WorkflowRun.id == run_id,
)
return cast(Optional[WorkflowRun], session.scalar(stmt))
def get_expired_runs_batch(
self,
tenant_id: str,
before_date: datetime,
batch_size: int = 1000,
) -> Sequence[WorkflowRun]:
"""
Get a batch of expired workflow runs for cleanup operations.
"""
with self._session_maker() as session:
stmt = (
select(WorkflowRun)
.where(
WorkflowRun.tenant_id == tenant_id,
WorkflowRun.created_at < before_date,
)
.limit(batch_size)
)
return cast(Sequence[WorkflowRun], session.scalars(stmt).all())
def delete_runs_by_ids(
self,
run_ids: Sequence[str],
) -> int:
"""
Delete workflow runs by their IDs using bulk deletion.
"""
if not run_ids:
return 0
with self._session_maker() as session:
stmt = delete(WorkflowRun).where(WorkflowRun.id.in_(run_ids))
result = session.execute(stmt)
session.commit()
deleted_count = cast(int, result.rowcount)
logger.info(f"Deleted {deleted_count} workflow runs by IDs")
return deleted_count
def delete_runs_by_app(
self,
tenant_id: str,
app_id: str,
batch_size: int = 1000,
) -> int:
"""
Delete all workflow runs for a specific app in batches.
"""
total_deleted = 0
while True:
with self._session_maker() as session:
# Get a batch of run IDs to delete
stmt = (
select(WorkflowRun.id)
.where(
WorkflowRun.tenant_id == tenant_id,
WorkflowRun.app_id == app_id,
)
.limit(batch_size)
)
run_ids = session.scalars(stmt).all()
if not run_ids:
break
# Delete the batch
delete_stmt = delete(WorkflowRun).where(WorkflowRun.id.in_(run_ids))
result = session.execute(delete_stmt)
session.commit()
batch_deleted = result.rowcount
total_deleted += batch_deleted
logger.info(f"Deleted batch of {batch_deleted} workflow runs for app {app_id}")
# If we deleted fewer records than the batch size, we're done
if batch_deleted < batch_size:
break
logger.info(f"Total deleted {total_deleted} workflow runs for app {app_id}")
return total_deleted

@ -47,8 +47,6 @@ class AppService:
filters.append(App.mode == AppMode.ADVANCED_CHAT.value) filters.append(App.mode == AppMode.ADVANCED_CHAT.value)
elif args["mode"] == "agent-chat": elif args["mode"] == "agent-chat":
filters.append(App.mode == AppMode.AGENT_CHAT.value) filters.append(App.mode == AppMode.AGENT_CHAT.value)
elif args["mode"] == "channel":
filters.append(App.mode == AppMode.CHANNEL.value)
if args.get("is_created_by_me", False): if args.get("is_created_by_me", False):
filters.append(App.created_by == user_id) filters.append(App.created_by == user_id)

@ -14,7 +14,6 @@ from extensions.ext_database import db
from extensions.ext_storage import storage from extensions.ext_storage import storage
from models.account import Tenant from models.account import Tenant
from models.model import App, Conversation, Message from models.model import App, Conversation, Message
from models.workflow import WorkflowRun
from repositories.factory import DifyAPIRepositoryFactory from repositories.factory import DifyAPIRepositoryFactory
from services.billing_service import BillingService from services.billing_service import BillingService
@ -153,23 +152,24 @@ class ClearFreePlanTenantExpiredLogs:
if len(workflow_node_executions) < batch: if len(workflow_node_executions) < batch:
break break
# Process expired workflow runs with backup
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
before_date = datetime.datetime.now() - datetime.timedelta(days=days)
total_deleted = 0
while True: while True:
with Session(db.engine).no_autoflush as session: # Get a batch of expired workflow runs for backup
workflow_runs = ( workflow_runs = workflow_run_repo.get_expired_runs_batch(
session.query(WorkflowRun) tenant_id=tenant_id,
.filter( before_date=before_date,
WorkflowRun.tenant_id == tenant_id, batch_size=batch,
WorkflowRun.created_at < datetime.datetime.now() - datetime.timedelta(days=days),
)
.limit(batch)
.all()
) )
if len(workflow_runs) == 0: if len(workflow_runs) == 0:
break break
# save workflow runs # Save workflow runs to storage
storage.save( storage.save(
f"free_plan_tenant_expired_logs/" f"free_plan_tenant_expired_logs/"
f"{tenant_id}/workflow_runs/{datetime.datetime.now().strftime('%Y-%m-%d')}" f"{tenant_id}/workflow_runs/{datetime.datetime.now().strftime('%Y-%m-%d')}"
@ -181,13 +181,23 @@ class ClearFreePlanTenantExpiredLogs:
).encode("utf-8"), ).encode("utf-8"),
) )
# Extract IDs for deletion
workflow_run_ids = [workflow_run.id for workflow_run in workflow_runs] workflow_run_ids = [workflow_run.id for workflow_run in workflow_runs]
# delete workflow runs # Delete the backed up workflow runs
session.query(WorkflowRun).filter( deleted_count = workflow_run_repo.delete_runs_by_ids(workflow_run_ids)
WorkflowRun.id.in_(workflow_run_ids), total_deleted += deleted_count
).delete(synchronize_session=False)
session.commit() click.echo(
click.style(
f"[{datetime.datetime.now()}] Processed {len(workflow_run_ids)}"
f" workflow runs for tenant {tenant_id}"
)
)
# If we got fewer than the batch size, we're done
if len(workflow_runs) < batch:
break
@classmethod @classmethod
def process(cls, days: int, batch: int, tenant_ids: list[str]): def process(cls, days: int, batch: int, tenant_ids: list[str]):

@ -25,6 +25,7 @@ class WorkflowRunService:
self._node_execution_service_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository( self._node_execution_service_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(
session_maker session_maker
) )
self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
def get_paginate_advanced_chat_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination: def get_paginate_advanced_chat_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination:
""" """
@ -69,44 +70,15 @@ class WorkflowRunService:
:param args: request args :param args: request args
""" """
limit = int(args.get("limit", 20)) limit = int(args.get("limit", 20))
last_id = args.get("last_id")
base_query = db.session.query(WorkflowRun).filter( return self._workflow_run_repo.get_paginated_workflow_runs(
WorkflowRun.tenant_id == app_model.tenant_id, tenant_id=app_model.tenant_id,
WorkflowRun.app_id == app_model.id, app_id=app_model.id,
WorkflowRun.triggered_from == WorkflowRunTriggeredFrom.DEBUGGING.value, triggered_from=WorkflowRunTriggeredFrom.DEBUGGING.value,
) limit=limit,
last_id=last_id,
if args.get("last_id"):
last_workflow_run = base_query.filter(
WorkflowRun.id == args.get("last_id"),
).first()
if not last_workflow_run:
raise ValueError("Last workflow run not exists")
workflow_runs = (
base_query.filter(
WorkflowRun.created_at < last_workflow_run.created_at, WorkflowRun.id != last_workflow_run.id
)
.order_by(WorkflowRun.created_at.desc())
.limit(limit)
.all()
) )
else:
workflow_runs = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all()
has_more = False
if len(workflow_runs) == limit:
current_page_first_workflow_run = workflow_runs[-1]
rest_count = base_query.filter(
WorkflowRun.created_at < current_page_first_workflow_run.created_at,
WorkflowRun.id != current_page_first_workflow_run.id,
).count()
if rest_count > 0:
has_more = True
return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more)
def get_workflow_run(self, app_model: App, run_id: str) -> Optional[WorkflowRun]: def get_workflow_run(self, app_model: App, run_id: str) -> Optional[WorkflowRun]:
""" """
@ -115,18 +87,12 @@ class WorkflowRunService:
:param app_model: app model :param app_model: app model
:param run_id: workflow run id :param run_id: workflow run id
""" """
workflow_run = ( return self._workflow_run_repo.get_workflow_run_by_id(
db.session.query(WorkflowRun) tenant_id=app_model.tenant_id,
.filter( app_id=app_model.id,
WorkflowRun.tenant_id == app_model.tenant_id, run_id=run_id,
WorkflowRun.app_id == app_model.id,
WorkflowRun.id == run_id,
)
.first()
) )
return workflow_run
def get_workflow_run_node_executions( def get_workflow_run_node_executions(
self, self,
app_model: App, app_model: App,

@ -32,7 +32,7 @@ from models import (
) )
from models.tools import WorkflowToolProvider from models.tools import WorkflowToolProvider
from models.web import PinnedConversation, SavedMessage from models.web import PinnedConversation, SavedMessage
from models.workflow import ConversationVariable, Workflow, WorkflowAppLog, WorkflowRun from models.workflow import ConversationVariable, Workflow, WorkflowAppLog
from repositories.factory import DifyAPIRepositoryFactory from repositories.factory import DifyAPIRepositoryFactory
@ -191,16 +191,18 @@ def _delete_app_workflows(tenant_id: str, app_id: str):
def _delete_app_workflow_runs(tenant_id: str, app_id: str): def _delete_app_workflow_runs(tenant_id: str, app_id: str):
def del_workflow_run(workflow_run_id: str): """Delete all workflow runs for an app using the service repository."""
db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).delete(synchronize_session=False) session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
_delete_records( deleted_count = workflow_run_repo.delete_runs_by_app(
"""select id from workflow_runs where tenant_id=:tenant_id and app_id=:app_id limit 1000""", tenant_id=tenant_id,
{"tenant_id": tenant_id, "app_id": app_id}, app_id=app_id,
del_workflow_run, batch_size=1000,
"workflow run",
) )
logging.info(f"Deleted {deleted_count} workflow runs for app {app_id}")
def _delete_app_workflow_node_executions(tenant_id: str, app_id: str): def _delete_app_workflow_node_executions(tenant_id: str, app_id: str):
"""Delete all workflow node executions for an app using the service repository.""" """Delete all workflow node executions for an app using the service repository."""

Loading…
Cancel
Save