@ -7,6 +7,7 @@ from typing import Any, Literal, Optional, Union, overload
from flask import Flask , current_app
from pydantic import ValidationError
from sqlalchemy . orm import sessionmaker
import contexts
from configs import dify_config
@ -22,6 +23,8 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat
from core . app . entities . task_entities import WorkflowAppBlockingResponse , WorkflowAppStreamResponse
from core . model_runtime . errors . invoke import InvokeAuthorizationError
from core . ops . ops_trace_manager import TraceQueueManager
from core . repository import RepositoryFactory
from core . repository . workflow_node_execution_repository import WorkflowNodeExecutionRepository
from extensions . ext_database import db
from factories import file_factory
from models import Account , App , EndUser , Workflow
@ -133,12 +136,23 @@ class WorkflowAppGenerator(BaseAppGenerator):
contexts . plugin_tool_providers . set ( { } )
contexts . plugin_tool_providers_lock . set ( threading . Lock ( ) )
# Create workflow node execution repository
session_factory = sessionmaker ( bind = db . engine , expire_on_commit = False )
workflow_node_execution_repository = RepositoryFactory . create_workflow_node_execution_repository (
params = {
" tenant_id " : application_generate_entity . app_config . tenant_id ,
" app_id " : application_generate_entity . app_config . app_id ,
" session_factory " : session_factory ,
}
)
return self . _generate (
app_model = app_model ,
workflow = workflow ,
user = user ,
application_generate_entity = application_generate_entity ,
invoke_from = invoke_from ,
workflow_node_execution_repository = workflow_node_execution_repository ,
streaming = streaming ,
workflow_thread_pool_id = workflow_thread_pool_id ,
)
@ -151,6 +165,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
user : Union [ Account , EndUser ] ,
application_generate_entity : WorkflowAppGenerateEntity ,
invoke_from : InvokeFrom ,
workflow_node_execution_repository : WorkflowNodeExecutionRepository ,
streaming : bool = True ,
workflow_thread_pool_id : Optional [ str ] = None ,
) - > Union [ Mapping [ str , Any ] , Generator [ str | Mapping [ str , Any ] , None , None ] ] :
@ -162,6 +177,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
: param user : account or end user
: param application_generate_entity : application generate entity
: param invoke_from : invoke from source
: param workflow_node_execution_repository : repository for workflow node execution
: param streaming : is stream
: param workflow_thread_pool_id : workflow thread pool id
"""
@ -193,6 +209,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow = workflow ,
queue_manager = queue_manager ,
user = user ,
workflow_node_execution_repository = workflow_node_execution_repository ,
stream = streaming ,
)
@ -245,12 +262,23 @@ class WorkflowAppGenerator(BaseAppGenerator):
contexts . plugin_tool_providers . set ( { } )
contexts . plugin_tool_providers_lock . set ( threading . Lock ( ) )
# Create workflow node execution repository
session_factory = sessionmaker ( bind = db . engine , expire_on_commit = False )
workflow_node_execution_repository = RepositoryFactory . create_workflow_node_execution_repository (
params = {
" tenant_id " : application_generate_entity . app_config . tenant_id ,
" app_id " : application_generate_entity . app_config . app_id ,
" session_factory " : session_factory ,
}
)
return self . _generate (
app_model = app_model ,
workflow = workflow ,
user = user ,
invoke_from = InvokeFrom . DEBUGGER ,
application_generate_entity = application_generate_entity ,
workflow_node_execution_repository = workflow_node_execution_repository ,
streaming = streaming ,
)
@ -299,12 +327,23 @@ class WorkflowAppGenerator(BaseAppGenerator):
contexts . plugin_tool_providers . set ( { } )
contexts . plugin_tool_providers_lock . set ( threading . Lock ( ) )
# Create workflow node execution repository
session_factory = sessionmaker ( bind = db . engine , expire_on_commit = False )
workflow_node_execution_repository = RepositoryFactory . create_workflow_node_execution_repository (
params = {
" tenant_id " : application_generate_entity . app_config . tenant_id ,
" app_id " : application_generate_entity . app_config . app_id ,
" session_factory " : session_factory ,
}
)
return self . _generate (
app_model = app_model ,
workflow = workflow ,
user = user ,
invoke_from = InvokeFrom . DEBUGGER ,
application_generate_entity = application_generate_entity ,
workflow_node_execution_repository = workflow_node_execution_repository ,
streaming = streaming ,
)
@ -361,6 +400,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow : Workflow ,
queue_manager : AppQueueManager ,
user : Union [ Account , EndUser ] ,
workflow_node_execution_repository : WorkflowNodeExecutionRepository ,
stream : bool = False ,
) - > Union [ WorkflowAppBlockingResponse , Generator [ WorkflowAppStreamResponse , None , None ] ] :
"""
@ -370,6 +410,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
: param queue_manager : queue manager
: param user : account or end user
: param stream : is stream
: param workflow_node_execution_repository : optional repository for workflow node execution
: return :
"""
# init generate task pipeline
@ -379,6 +420,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
queue_manager = queue_manager ,
user = user ,
stream = stream ,
workflow_node_execution_repository = workflow_node_execution_repository ,
)
try :