@ -32,6 +32,7 @@ from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchem
from core . workflow . repositories . workflow_execution_repository import WorkflowExecutionRepository
from core . workflow . repositories . workflow_node_execution_repository import WorkflowNodeExecutionRepository
from extensions . ext_database import db
from libs . flask_utils import preserve_flask_contexts
from models import Account , EndUser , Workflow , WorkflowNodeExecutionTriggeredFrom
from models . dataset import Document , DocumentPipelineExecutionLog , Pipeline
from models . enums import WorkflowRunTriggeredFrom
@ -209,25 +210,22 @@ class PipelineGenerator(BaseAppGenerator):
# run in child thread
context = contextvars . copy_context ( )
@copy_current_request_context
def worker_with_context ( ) :
# Run the worker within the copied context
return context . run (
self . _generate ,
flask_app = current_app . _get_current_object ( ) , # type: ignore
context = context ,
pipeline = pipeline ,
workflow_id = workflow . id ,
user = user ,
application_generate_entity = application_generate_entity ,
invoke_from = invoke_from ,
workflow_execution_repository = workflow_execution_repository ,
workflow_node_execution_repository = workflow_node_execution_repository ,
streaming = streaming ,
workflow_thread_pool_id = workflow_thread_pool_id ,
)
worker_thread = threading . Thread ( target = worker_with_context )
worker_thread = threading . Thread (
target = self . _generate ,
kwargs = {
" flask_app " : current_app . _get_current_object ( ) , # type: ignore
" context " : context ,
" pipeline " : pipeline ,
" workflow_id " : workflow . id ,
" user " : user ,
" application_generate_entity " : application_generate_entity ,
" invoke_from " : invoke_from ,
" workflow_execution_repository " : workflow_execution_repository ,
" workflow_node_execution_repository " : workflow_node_execution_repository ,
" streaming " : streaming ,
" workflow_thread_pool_id " : workflow_thread_pool_id ,
} ,
)
worker_thread . start ( )
# return batch, dataset, documents
@ -282,23 +280,7 @@ class PipelineGenerator(BaseAppGenerator):
: param streaming : is stream
: param workflow_thread_pool_id : workflow thread pool id
"""
print ( " jin ru la 1 " )
for var , val in context . items ( ) :
var . set ( val )
# FIXME(-LAN-): Save current user before entering new app context
from flask import g
saved_user = None
if has_request_context ( ) and hasattr ( g , " _login_user " ) :
saved_user = g . _login_user
with flask_app . app_context ( ) :
# Restore user in new app context
print ( " jin ru la 2 " )
if saved_user is not None :
from flask import g
g . _login_user = saved_user
with preserve_flask_contexts ( flask_app , context_vars = context ) :
# init queue manager
workflow = db . session . query ( Workflow ) . filter ( Workflow . id == workflow_id ) . first ( )
if not workflow :
@ -311,20 +293,17 @@ class PipelineGenerator(BaseAppGenerator):
)
context = contextvars . copy_context ( )
@copy_current_request_context
def worker_with_context ( ) :
# Run the worker within the copied context
return context . run (
self . _generate_worker ,
flask_app = current_app . _get_current_object ( ) , # type: ignore
context = context ,
queue_manager = queue_manager ,
application_generate_entity = application_generate_entity ,
workflow_thread_pool_id = workflow_thread_pool_id ,
)
# new thread
worker_thread = threading . Thread ( target = worker_with_context )
worker_thread = threading . Thread (
target = self . _generate_worker ,
kwargs = {
" flask_app " : current_app . _get_current_object ( ) , # type: ignore
" context " : context ,
" queue_manager " : queue_manager ,
" application_generate_entity " : application_generate_entity ,
" workflow_thread_pool_id " : workflow_thread_pool_id ,
} ,
)
worker_thread . start ( )
@ -521,20 +500,9 @@ class PipelineGenerator(BaseAppGenerator):
: param workflow_thread_pool_id : workflow thread pool id
: return :
"""
print ( " jin ru la 3 " )
for var , val in context . items ( ) :
var . set ( val )
from flask import g
saved_user = None
if has_request_context ( ) and hasattr ( g , " _login_user " ) :
saved_user = g . _login_user
with flask_app . app_context ( ) :
try :
if saved_user is not None :
from flask import g
g . _login_user = saved_user
with preserve_flask_contexts ( flask_app , context_vars = context ) :
try :
# workflow app
runner = PipelineRunner (
application_generate_entity = application_generate_entity ,