diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 756959d3c9..2453b1cdcc 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -18,8 +18,6 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.apps.workflow.app_queue_manager import WorkflowAppQueueManager from core.app.apps.workflow.app_runner import WorkflowAppRunner from core.app.apps.workflow.generate_response_converter import WorkflowAppGenerateResponseConverter -from core.app.apps.workflow.generate_task_pipeline import WorkflowAppGenerateTaskPipeline -from core.app.apps.workflow.generate_task_pipeline_fast import WorkflowAppGenerateTaskPipelineFast from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity from core.app.entities.task_entities import WorkflowAppBlockingResponse, WorkflowAppStreamResponse from core.model_runtime.errors.invoke import InvokeAuthorizationError @@ -27,6 +25,7 @@ from core.ops.ops_trace_manager import TraceQueueManager from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.workflow_app_generate_task_pipeline import WorkflowAppGenerateTaskPipeline +from core.workflow.workflow_app_generate_task_pipeline_fast import WorkflowAppGenerateTaskPipelineFast from extensions.ext_database import db from factories import file_factory from models import Account, App, EndUser, Workflow @@ -418,6 +417,7 @@ class WorkflowAppGenerator(BaseAppGenerator): queue_manager=queue_manager, user=user, stream=stream, + workflow_node_execution_repository=workflow_node_execution_repository, ) else: # Use the standard task pipeline @@ -427,6 +427,7 @@ class WorkflowAppGenerator(BaseAppGenerator): queue_manager=queue_manager, user=user, stream=stream, + workflow_node_execution_repository=workflow_node_execution_repository, ) try: diff --git a/api/core/app/apps/workflow/generate_task_pipeline_fast.py b/api/core/workflow/workflow_app_generate_task_pipeline_fast.py similarity index 89% rename from api/core/app/apps/workflow/generate_task_pipeline_fast.py rename to api/core/workflow/workflow_app_generate_task_pipeline_fast.py index 20ecd29226..417cc91322 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline_fast.py +++ b/api/core/workflow/workflow_app_generate_task_pipeline_fast.py @@ -4,15 +4,9 @@ from datetime import UTC, datetime from collections.abc import Generator from typing import Optional, Union -from sqlalchemy.orm import Session - from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME -from core.app.apps.advanced_chat.app_generator_tts_publisher import AppGeneratorTTSPublisher, AudioTrunk from core.app.apps.base_app_queue_manager import AppQueueManager -from core.app.entities.app_invoke_entities import ( - InvokeFrom, - WorkflowAppGenerateEntity, -) +from core.app.entities.app_invoke_entities import WorkflowAppGenerateEntity from core.app.entities.queue_entities import ( QueueAgentLogEvent, QueueErrorEvent, @@ -37,17 +31,17 @@ from core.app.entities.task_entities import ( WorkflowTaskState, ) from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline -from core.app.task_pipeline.workflow_cycle_manage import WorkflowCycleManage +from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.enums import SystemVariableKey +from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.workflow_cycle_manager import WorkflowCycleManager from extensions.ext_database import db from models.account import Account from models.enums import CreatedByRole from models.model import EndUser from models.workflow import ( Workflow, - WorkflowAppLog, - WorkflowAppLogCreatedFrom, WorkflowRun, WorkflowRunStatus, ) @@ -67,6 +61,7 @@ class WorkflowAppGenerateTaskPipelineFast: queue_manager: AppQueueManager, user: Union[Account, EndUser], stream: bool, + workflow_node_execution_repository: WorkflowNodeExecutionRepository, ) -> None: self._base_task_pipeline = BasedGenerateTaskPipeline( application_generate_entity=application_generate_entity, @@ -85,7 +80,7 @@ class WorkflowAppGenerateTaskPipelineFast: else: raise ValueError(f"Invalid user type: {type(user)}") - self._workflow_cycle_manager = WorkflowCycleManage( + self._workflow_cycle_manager = WorkflowCycleManager( application_generate_entity=application_generate_entity, workflow_system_variables={ SystemVariableKey.FILES: application_generate_entity.files, @@ -94,6 +89,7 @@ class WorkflowAppGenerateTaskPipelineFast: SystemVariableKey.WORKFLOW_ID: workflow.id, SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id, }, + workflow_node_execution_repository=workflow_node_execution_repository, ) self._application_generate_entity = application_generate_entity @@ -324,34 +320,6 @@ class WorkflowAppGenerateTaskPipelineFast: if tts_publisher: tts_publisher.publish(None) - def _save_workflow_app_log(self, *, session: Session, workflow_run: WorkflowRun) -> None: - """ - Save workflow app log. - :return: - """ - invoke_from = self._application_generate_entity.invoke_from - if invoke_from == InvokeFrom.SERVICE_API: - created_from = WorkflowAppLogCreatedFrom.SERVICE_API - elif invoke_from == InvokeFrom.EXPLORE: - created_from = WorkflowAppLogCreatedFrom.INSTALLED_APP - elif invoke_from == InvokeFrom.WEB_APP: - created_from = WorkflowAppLogCreatedFrom.WEB_APP - else: - # not save log for debugging - return - - workflow_app_log = WorkflowAppLog() - workflow_app_log.tenant_id = workflow_run.tenant_id - workflow_app_log.app_id = workflow_run.app_id - workflow_app_log.workflow_id = workflow_run.workflow_id - workflow_app_log.workflow_run_id = workflow_run.id - workflow_app_log.created_from = created_from.value - workflow_app_log.created_by_role = self._created_by_role - workflow_app_log.created_by = self._user_id - - session.add(workflow_app_log) - session.commit() - def _text_chunk_to_stream_response( self, text: str, from_variable_selector: Optional[list[str]] = None ) -> TextChunkStreamResponse: