feat/datasource
jyong 11 months ago
parent a826879cf7
commit cb5cfb2dae

@ -9,7 +9,7 @@ import uuid
from collections.abc import Generator, Mapping from collections.abc import Generator, Mapping
from typing import Any, Literal, Optional, Union, overload from typing import Any, Literal, Optional, Union, overload
from flask import Flask, current_app from flask import Flask, copy_current_request_context, current_app, has_request_context
from pydantic import ValidationError from pydantic import ValidationError
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
@ -185,8 +185,9 @@ class PipelineGenerator(BaseAppGenerator):
if invoke_from == InvokeFrom.DEBUGGER: if invoke_from == InvokeFrom.DEBUGGER:
return self._generate( return self._generate(
flask_app=current_app._get_current_object(),# type: ignore flask_app=current_app._get_current_object(),# type: ignore
context=contextvars.copy_context(),
pipeline=pipeline, pipeline=pipeline,
workflow=workflow, workflow_id=workflow.id,
user=user, user=user,
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
invoke_from=invoke_from, invoke_from=invoke_from,
@ -197,22 +198,28 @@ class PipelineGenerator(BaseAppGenerator):
) )
else: else:
# run in child thread # run in child thread
thread = threading.Thread( context = contextvars.copy_context()
target=self._generate, @copy_current_request_context
kwargs={ def worker_with_context():
"flask_app": current_app._get_current_object(), # type: ignore # Run the worker within the copied context
"pipeline": pipeline, return context.run(
"workflow": workflow, self._generate,
"user": user, flask_app=current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity, context=context,
"invoke_from": invoke_from, pipeline=pipeline,
"workflow_execution_repository": workflow_execution_repository, workflow_id=workflow.id,
"workflow_node_execution_repository": workflow_node_execution_repository, user=user,
"streaming": streaming, application_generate_entity=application_generate_entity,
"workflow_thread_pool_id": workflow_thread_pool_id, invoke_from=invoke_from,
}, workflow_execution_repository=workflow_execution_repository,
) workflow_node_execution_repository=workflow_node_execution_repository,
thread.start() streaming=streaming,
workflow_thread_pool_id=workflow_thread_pool_id,
)
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start()
# return batch, dataset, documents # return batch, dataset, documents
return { return {
"batch": batch, "batch": batch,
@ -225,7 +232,7 @@ class PipelineGenerator(BaseAppGenerator):
"documents": [PipelineDocument( "documents": [PipelineDocument(
id=document.id, id=document.id,
position=document.position, position=document.position,
data_source_info=document.data_source_info, data_source_info=json.loads(document.data_source_info) if document.data_source_info else None,
name=document.name, name=document.name,
indexing_status=document.indexing_status, indexing_status=document.indexing_status,
error=document.error, error=document.error,
@ -237,8 +244,9 @@ class PipelineGenerator(BaseAppGenerator):
self, self,
*, *,
flask_app: Flask, flask_app: Flask,
context: contextvars.Context,
pipeline: Pipeline, pipeline: Pipeline,
workflow: Workflow, workflow_id: str,
user: Union[Account, EndUser], user: Union[Account, EndUser],
application_generate_entity: RagPipelineGenerateEntity, application_generate_entity: RagPipelineGenerateEntity,
invoke_from: InvokeFrom, invoke_from: InvokeFrom,
@ -260,26 +268,47 @@ class PipelineGenerator(BaseAppGenerator):
:param streaming: is stream :param streaming: is stream
:param workflow_thread_pool_id: workflow thread pool id :param workflow_thread_pool_id: workflow thread pool id
""" """
print(user.id) for var, val in context.items():
var.set(val)
# FIXME(-LAN-): Save current user before entering new app context
from flask import g
saved_user = None
if has_request_context() and hasattr(g, "_login_user"):
saved_user = g._login_user
with flask_app.app_context(): with flask_app.app_context():
# Restore user in new app context
if saved_user is not None:
from flask import g
g._login_user = saved_user
# init queue manager # init queue manager
workflow = db.session.query(Workflow).filter(Workflow.id == workflow_id).first()
if not workflow:
raise ValueError(f"Workflow not found: {workflow_id}")
queue_manager = PipelineQueueManager( queue_manager = PipelineQueueManager(
task_id=application_generate_entity.task_id, task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id, user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from, invoke_from=application_generate_entity.invoke_from,
app_mode=AppMode.RAG_PIPELINE, app_mode=AppMode.RAG_PIPELINE,
) )
context = contextvars.copy_context()
@copy_current_request_context
def worker_with_context():
# Run the worker within the copied context
return context.run(
self._generate_worker,
flask_app=current_app._get_current_object(), # type: ignore
context=context,
queue_manager=queue_manager,
application_generate_entity=application_generate_entity,
workflow_thread_pool_id=workflow_thread_pool_id,
)
# new thread # new thread
worker_thread = threading.Thread( worker_thread = threading.Thread(
target=self._generate_worker, target=worker_with_context
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"context": contextvars.copy_context(),
"workflow_thread_pool_id": workflow_thread_pool_id,
},
) )
worker_thread.start() worker_thread.start()
@ -479,8 +508,17 @@ class PipelineGenerator(BaseAppGenerator):
""" """
for var, val in context.items(): for var, val in context.items():
var.set(val) var.set(val)
from flask import g
saved_user = None
if has_request_context() and hasattr(g, "_login_user"):
saved_user = g._login_user
with flask_app.app_context(): with flask_app.app_context():
try: try:
if saved_user is not None:
from flask import g
g._login_user = saved_user
# workflow app # workflow app
runner = PipelineRunner( runner = PipelineRunner(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,

@ -25,17 +25,17 @@ class PipelineDataset(BaseModel):
description: str description: str
chunk_structure: str chunk_structure: str
class PipelineDocument(BaseModel): class PipelineDocument(BaseModel):
id: str id: str
position: int position: int
data_source_info: dict data_source_info: Optional[dict] = None
name: str name: str
indexing_status: str indexing_status: str
error: str error: Optional[str] = None
enabled: bool enabled: bool
class PipelineGenerateResponse(BaseModel): class PipelineGenerateResponse(BaseModel):
batch: str batch: str
dataset: PipelineDataset dataset: PipelineDataset

Loading…
Cancel
Save