feat/datasource
jyong 11 months ago
parent f7a4e5d1a6
commit ac917bb56d

@ -32,6 +32,7 @@ from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchem
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from extensions.ext_database import db from extensions.ext_database import db
from libs.flask_utils import preserve_flask_contexts
from models import Account, EndUser, Workflow, WorkflowNodeExecutionTriggeredFrom from models import Account, EndUser, Workflow, WorkflowNodeExecutionTriggeredFrom
from models.dataset import Document, DocumentPipelineExecutionLog, Pipeline from models.dataset import Document, DocumentPipelineExecutionLog, Pipeline
from models.enums import WorkflowRunTriggeredFrom from models.enums import WorkflowRunTriggeredFrom
@ -209,25 +210,22 @@ class PipelineGenerator(BaseAppGenerator):
# run in child thread # run in child thread
context = contextvars.copy_context() context = contextvars.copy_context()
@copy_current_request_context worker_thread = threading.Thread(
def worker_with_context(): target=self._generate,
# Run the worker within the copied context kwargs={
return context.run( "flask_app": current_app._get_current_object(), # type: ignore
self._generate, "context": context,
flask_app=current_app._get_current_object(), # type: ignore "pipeline": pipeline,
context=context, "workflow_id": workflow.id,
pipeline=pipeline, "user": user,
workflow_id=workflow.id, "application_generate_entity": application_generate_entity,
user=user, "invoke_from": invoke_from,
application_generate_entity=application_generate_entity, "workflow_execution_repository": workflow_execution_repository,
invoke_from=invoke_from, "workflow_node_execution_repository": workflow_node_execution_repository,
workflow_execution_repository=workflow_execution_repository, "streaming": streaming,
workflow_node_execution_repository=workflow_node_execution_repository, "workflow_thread_pool_id": workflow_thread_pool_id,
streaming=streaming, },
workflow_thread_pool_id=workflow_thread_pool_id, )
)
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start() worker_thread.start()
# return batch, dataset, documents # return batch, dataset, documents
@ -282,23 +280,7 @@ class PipelineGenerator(BaseAppGenerator):
:param streaming: is stream :param streaming: is stream
:param workflow_thread_pool_id: workflow thread pool id :param workflow_thread_pool_id: workflow thread pool id
""" """
print("jin ru la 1") with preserve_flask_contexts(flask_app, context_vars=context):
for var, val in context.items():
var.set(val)
# FIXME(-LAN-): Save current user before entering new app context
from flask import g
saved_user = None
if has_request_context() and hasattr(g, "_login_user"):
saved_user = g._login_user
with flask_app.app_context():
# Restore user in new app context
print("jin ru la 2")
if saved_user is not None:
from flask import g
g._login_user = saved_user
# init queue manager # init queue manager
workflow = db.session.query(Workflow).filter(Workflow.id == workflow_id).first() workflow = db.session.query(Workflow).filter(Workflow.id == workflow_id).first()
if not workflow: if not workflow:
@ -311,20 +293,17 @@ class PipelineGenerator(BaseAppGenerator):
) )
context = contextvars.copy_context() context = contextvars.copy_context()
@copy_current_request_context
def worker_with_context():
# Run the worker within the copied context
return context.run(
self._generate_worker,
flask_app=current_app._get_current_object(), # type: ignore
context=context,
queue_manager=queue_manager,
application_generate_entity=application_generate_entity,
workflow_thread_pool_id=workflow_thread_pool_id,
)
# new thread # new thread
worker_thread = threading.Thread(target=worker_with_context) worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"context": context,
"queue_manager": queue_manager,
"application_generate_entity": application_generate_entity,
"workflow_thread_pool_id": workflow_thread_pool_id,
},
)
worker_thread.start() worker_thread.start()
@ -521,20 +500,9 @@ class PipelineGenerator(BaseAppGenerator):
:param workflow_thread_pool_id: workflow thread pool id :param workflow_thread_pool_id: workflow thread pool id
:return: :return:
""" """
print("jin ru la 3")
for var, val in context.items():
var.set(val)
from flask import g
saved_user = None
if has_request_context() and hasattr(g, "_login_user"):
saved_user = g._login_user
with flask_app.app_context():
try:
if saved_user is not None:
from flask import g
g._login_user = saved_user with preserve_flask_contexts(flask_app, context_vars=context):
try:
# workflow app # workflow app
runner = PipelineRunner( runner = PipelineRunner(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,

Loading…
Cancel
Save