fix: Copy request context and current user in app generators.

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/20240/head
-LAN- 1 year ago
parent acd4b9a8ac
commit 637d225317
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -99,7 +99,12 @@ def validate_app_token(view: Optional[Callable] = None, *, fetch_user_arg: Optio
if user_id: if user_id:
user_id = str(user_id) user_id = str(user_id)
kwargs["end_user"] = create_or_update_end_user_for_user_id(app_model, user_id) end_user = create_or_update_end_user_for_user_id(app_model, user_id)
kwargs["end_user"] = end_user
# Set EndUser as current logged-in user for flask_login.current_user
current_app.login_manager._update_request_context_with_user(end_user) # type: ignore
user_logged_in.send(current_app._get_current_object(), user=end_user) # type: ignore
return view_func(*args, **kwargs) return view_func(*args, **kwargs)

@ -5,7 +5,7 @@ import uuid
from collections.abc import Generator, Mapping from collections.abc import Generator, Mapping
from typing import Any, Literal, Optional, Union, overload from typing import Any, Literal, Optional, Union, overload
from flask import Flask, current_app from flask import Flask, copy_current_request_context, current_app, has_request_context
from pydantic import ValidationError from pydantic import ValidationError
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
@ -399,19 +399,24 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
message_id=message.id, message_id=message.id,
) )
# new thread # new thread with request context and contextvars
worker_thread = threading.Thread( context = contextvars.copy_context()
target=self._generate_worker,
kwargs={ @copy_current_request_context
"flask_app": current_app._get_current_object(), # type: ignore def worker_with_context():
"application_generate_entity": application_generate_entity, # Run the worker within the copied context
"queue_manager": queue_manager, return context.run(
"conversation_id": conversation.id, self._generate_worker,
"message_id": message.id, flask_app=current_app._get_current_object(), # type: ignore
"context": contextvars.copy_context(), application_generate_entity=application_generate_entity,
}, queue_manager=queue_manager,
conversation_id=conversation.id,
message_id=message.id,
context=context,
) )
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start() worker_thread.start()
# return response or stream generator # return response or stream generator
@ -449,8 +454,22 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
""" """
for var, val in context.items(): for var, val in context.items():
var.set(val) var.set(val)
# Save current user before entering new app context
from flask import g
saved_user = None
if has_request_context() and hasattr(g, "_login_user"):
saved_user = g._login_user
with flask_app.app_context(): with flask_app.app_context():
try: try:
# Restore user in new app context
if saved_user is not None:
from flask import g
g._login_user = saved_user
# get conversation and message # get conversation and message
conversation = self._get_conversation(conversation_id) conversation = self._get_conversation(conversation_id)
message = self._get_message(message_id) message = self._get_message(message_id)

@ -5,7 +5,7 @@ import uuid
from collections.abc import Generator, Mapping from collections.abc import Generator, Mapping
from typing import Any, Literal, Union, overload from typing import Any, Literal, Union, overload
from flask import Flask, current_app from flask import Flask, copy_current_request_context, current_app, has_request_context
from pydantic import ValidationError from pydantic import ValidationError
from configs import dify_config from configs import dify_config
@ -179,19 +179,24 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
message_id=message.id, message_id=message.id,
) )
# new thread # new thread with request context and contextvars
worker_thread = threading.Thread( context = contextvars.copy_context()
target=self._generate_worker,
kwargs={ @copy_current_request_context
"flask_app": current_app._get_current_object(), # type: ignore def worker_with_context():
"context": contextvars.copy_context(), # Run the worker within the copied context
"application_generate_entity": application_generate_entity, return context.run(
"queue_manager": queue_manager, self._generate_worker,
"conversation_id": conversation.id, flask_app=current_app._get_current_object(), # type: ignore
"message_id": message.id, context=context,
}, application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation_id=conversation.id,
message_id=message.id,
) )
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start() worker_thread.start()
# return response or stream generator # return response or stream generator
@ -227,8 +232,21 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
for var, val in context.items(): for var, val in context.items():
var.set(val) var.set(val)
# Save current user before entering new app context
from flask import g
saved_user = None
if has_request_context() and hasattr(g, "_login_user"):
saved_user = g._login_user
with flask_app.app_context(): with flask_app.app_context():
try: try:
# Restore user in new app context
if saved_user is not None:
from flask import g
g._login_user = saved_user
# get conversation and message # get conversation and message
conversation = self._get_conversation(conversation_id) conversation = self._get_conversation(conversation_id)
message = self._get_message(message_id) message = self._get_message(message_id)

@ -4,7 +4,7 @@ import uuid
from collections.abc import Generator, Mapping from collections.abc import Generator, Mapping
from typing import Any, Literal, Union, overload from typing import Any, Literal, Union, overload
from flask import Flask, current_app from flask import Flask, copy_current_request_context, current_app
from pydantic import ValidationError from pydantic import ValidationError
from configs import dify_config from configs import dify_config
@ -170,18 +170,19 @@ class ChatAppGenerator(MessageBasedAppGenerator):
message_id=message.id, message_id=message.id,
) )
# new thread # new thread with request context
worker_thread = threading.Thread( @copy_current_request_context
target=self._generate_worker, def worker_with_context():
kwargs={ return self._generate_worker(
"flask_app": current_app._get_current_object(), # type: ignore flask_app=current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity, application_generate_entity=application_generate_entity,
"queue_manager": queue_manager, queue_manager=queue_manager,
"conversation_id": conversation.id, conversation_id=conversation.id,
"message_id": message.id, message_id=message.id,
},
) )
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start() worker_thread.start()
# return response or stream generator # return response or stream generator

@ -4,7 +4,7 @@ import uuid
from collections.abc import Generator, Mapping from collections.abc import Generator, Mapping
from typing import Any, Literal, Union, overload from typing import Any, Literal, Union, overload
from flask import Flask, current_app from flask import Flask, copy_current_request_context, current_app
from pydantic import ValidationError from pydantic import ValidationError
from configs import dify_config from configs import dify_config
@ -151,17 +151,18 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
message_id=message.id, message_id=message.id,
) )
# new thread # new thread with request context
worker_thread = threading.Thread( @copy_current_request_context
target=self._generate_worker, def worker_with_context():
kwargs={ return self._generate_worker(
"flask_app": current_app._get_current_object(), # type: ignore flask_app=current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity, application_generate_entity=application_generate_entity,
"queue_manager": queue_manager, queue_manager=queue_manager,
"message_id": message.id, message_id=message.id,
},
) )
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start() worker_thread.start()
# return response or stream generator # return response or stream generator
@ -313,17 +314,18 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
message_id=message.id, message_id=message.id,
) )
# new thread # new thread with request context
worker_thread = threading.Thread( @copy_current_request_context
target=self._generate_worker, def worker_with_context():
kwargs={ return self._generate_worker(
"flask_app": current_app._get_current_object(), # type: ignore flask_app=current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity, application_generate_entity=application_generate_entity,
"queue_manager": queue_manager, queue_manager=queue_manager,
"message_id": message.id, message_id=message.id,
},
) )
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start() worker_thread.start()
# return response or stream generator # return response or stream generator

@ -5,7 +5,7 @@ import uuid
from collections.abc import Generator, Mapping, Sequence from collections.abc import Generator, Mapping, Sequence
from typing import Any, Literal, Optional, Union, overload from typing import Any, Literal, Optional, Union, overload
from flask import Flask, current_app from flask import Flask, copy_current_request_context, current_app, has_request_context
from pydantic import ValidationError from pydantic import ValidationError
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
@ -207,18 +207,23 @@ class WorkflowAppGenerator(BaseAppGenerator):
app_mode=app_model.mode, app_mode=app_model.mode,
) )
# new thread # new thread with request context and contextvars
worker_thread = threading.Thread( context = contextvars.copy_context()
target=self._generate_worker,
kwargs={ @copy_current_request_context
"flask_app": current_app._get_current_object(), # type: ignore def worker_with_context():
"application_generate_entity": application_generate_entity, # Run the worker within the copied context
"queue_manager": queue_manager, return context.run(
"context": contextvars.copy_context(), self._generate_worker,
"workflow_thread_pool_id": workflow_thread_pool_id, flask_app=current_app._get_current_object(), # type: ignore
}, application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
context=context,
workflow_thread_pool_id=workflow_thread_pool_id,
) )
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start() worker_thread.start()
# return response or stream generator # return response or stream generator
@ -408,8 +413,22 @@ class WorkflowAppGenerator(BaseAppGenerator):
""" """
for var, val in context.items(): for var, val in context.items():
var.set(val) var.set(val)
# Save current user before entering new app context
from flask import g
saved_user = None
if has_request_context() and hasattr(g, "_login_user"):
saved_user = g._login_user
with flask_app.app_context(): with flask_app.app_context():
try: try:
# Restore user in new app context
if saved_user is not None:
from flask import g
g._login_user = saved_user
# workflow app # workflow app
runner = WorkflowAppRunner( runner = WorkflowAppRunner(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,

Loading…
Cancel
Save