@ -1,6 +1,6 @@
import logging
from collections . abc import Mapping
from typing import Any , cast
from typing import Any , Optional, cast
from sqlalchemy import select
from sqlalchemy . orm import Session
@ -9,13 +9,19 @@ from configs import dify_config
from core . app . apps . advanced_chat . app_config_manager import AdvancedChatAppConfig
from core . app . apps . base_app_queue_manager import AppQueueManager
from core . app . apps . workflow_app_runner import WorkflowBasedAppRunner
from core . app . entities . app_invoke_entities import AdvancedChatAppGenerateEntity , InvokeFrom
from core . app . entities . app_invoke_entities import (
AdvancedChatAppGenerateEntity ,
AppGenerateEntity ,
InvokeFrom ,
)
from core . app . entities . queue_entities import (
QueueAnnotationReplyEvent ,
QueueStopEvent ,
QueueTextChunkEvent ,
)
from core . app . features . annotation_reply . annotation_reply import AnnotationReplyFeature
from core . moderation . base import ModerationError
from core . moderation . input_moderation import InputModeration
from core . variables . variables import VariableUnion
from core . workflow . callbacks import WorkflowCallback , WorkflowLoggingCallback
from core . workflow . entities . variable_pool import VariablePool
@ -23,8 +29,9 @@ from core.workflow.system_variable import SystemVariable
from core . workflow . variable_loader import VariableLoader
from core . workflow . workflow_entry import WorkflowEntry
from extensions . ext_database import db
from models import Workflow
from models . enums import UserFrom
from models . model import App , Conversation , EndUser, Message
from models . model import App , Conversation , Message, MessageAnnotation
from models . workflow import ConversationVariable , WorkflowType
logger = logging . getLogger ( __name__ )
@ -37,42 +44,38 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
def __init__ (
self ,
* ,
application_generate_entity : AdvancedChatAppGenerateEntity ,
queue_manager : AppQueueManager ,
conversation : Conversation ,
message : Message ,
dialogue_count : int ,
variable_loader : VariableLoader ,
workflow : Workflow ,
system_user_id : str ,
app : App ,
) - > None :
super ( ) . __init__ ( queue_manager , variable_loader )
super ( ) . __init__ (
queue_manager = queue_manager ,
variable_loader = variable_loader ,
app_id = application_generate_entity . app_config . app_id ,
)
self . application_generate_entity = application_generate_entity
self . conversation = conversation
self . message = message
self . _dialogue_count = dialogue_count
def _get_app_id ( self ) - > str :
return self . application_generate_entity . app_config . app_id
self . _workflow = workflow
self . system_user_id = system_user_id
self . _app = app
def run ( self ) - > None :
app_config = self . application_generate_entity . app_config
app_config = cast ( AdvancedChatAppConfig , app_config )
app_record = db . session . query ( App ) . filter ( App . id == app_config . app_id ) . first ( )
app_record = db . session . query ( App ) . where ( App . id == app_config . app_id ) . first ( )
if not app_record :
raise ValueError ( " App not found " )
workflow = self . get_workflow ( app_model = app_record , workflow_id = app_config . workflow_id )
if not workflow :
raise ValueError ( " Workflow not initialized " )
user_id : str | None = None
if self . application_generate_entity . invoke_from in { InvokeFrom . WEB_APP , InvokeFrom . SERVICE_API } :
end_user = db . session . query ( EndUser ) . filter ( EndUser . id == self . application_generate_entity . user_id ) . first ( )
if end_user :
user_id = end_user . session_id
else :
user_id = self . application_generate_entity . user_id
workflow_callbacks : list [ WorkflowCallback ] = [ ]
if dify_config . DEBUG :
workflow_callbacks . append ( WorkflowLoggingCallback ( ) )
@ -80,14 +83,14 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
if self . application_generate_entity . single_iteration_run :
# if only single iteration run is requested
graph , variable_pool = self . _get_graph_and_variable_pool_of_single_iteration (
workflow = workflow,
workflow = self . _ workflow,
node_id = self . application_generate_entity . single_iteration_run . node_id ,
user_inputs = dict ( self . application_generate_entity . single_iteration_run . inputs ) ,
)
elif self . application_generate_entity . single_loop_run :
# if only single loop run is requested
graph , variable_pool = self . _get_graph_and_variable_pool_of_single_loop (
workflow = workflow,
workflow = self . _ workflow,
node_id = self . application_generate_entity . single_loop_run . node_id ,
user_inputs = dict ( self . application_generate_entity . single_loop_run . inputs ) ,
)
@ -98,7 +101,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
# moderation
if self . handle_input_moderation (
app_record = app_record ,
app_record = self . _ app,
app_generate_entity = self . application_generate_entity ,
inputs = inputs ,
query = query ,
@ -108,7 +111,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
# annotation reply
if self . handle_annotation_reply (
app_record = app_record ,
app_record = self . _ app,
message = self . message ,
query = query ,
app_generate_entity = self . application_generate_entity ,
@ -128,7 +131,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
ConversationVariable . from_variable (
app_id = self . conversation . app_id , conversation_id = self . conversation . id , variable = variable
)
for variable in workflow. conversation_variables
for variable in self . _ workflow. conversation_variables
]
session . add_all ( db_conversation_variables )
# Convert database entities to variables.
@ -141,7 +144,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
query = query ,
files = files ,
conversation_id = self . conversation . id ,
user_id = user_id,
user_id = self . system_ user_id,
dialogue_count = self . _dialogue_count ,
app_id = app_config . app_id ,
workflow_id = app_config . workflow_id ,
@ -152,25 +155,25 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
variable_pool = VariablePool (
system_variables = system_inputs ,
user_inputs = inputs ,
environment_variables = workflow. environment_variables ,
environment_variables = self . _ workflow. environment_variables ,
# Based on the definition of `VariableUnion`,
# `list[Variable]` can be safely used as `list[VariableUnion]` since they are compatible.
conversation_variables = cast ( list [ VariableUnion ] , conversation_variables ) ,
)
# init graph
graph = self . _init_graph ( graph_config = workflow. graph_dict )
graph = self . _init_graph ( graph_config = self . _ workflow. graph_dict )
db . session . close ( )
# RUN WORKFLOW
workflow_entry = WorkflowEntry (
tenant_id = workflow. tenant_id ,
app_id = workflow. app_id ,
workflow_id = workflow. id ,
workflow_type = WorkflowType . value_of ( workflow. type ) ,
tenant_id = self . _ workflow. tenant_id ,
app_id = self . _ workflow. app_id ,
workflow_id = self . _ workflow. id ,
workflow_type = WorkflowType . value_of ( self . _ workflow. type ) ,
graph = graph ,
graph_config = workflow. graph_dict ,
graph_config = self . _ workflow. graph_dict ,
user_id = self . application_generate_entity . user_id ,
user_from = (
UserFrom . ACCOUNT
@ -241,3 +244,51 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
self . _publish_event ( QueueTextChunkEvent ( text = text ) )
self . _publish_event ( QueueStopEvent ( stopped_by = stopped_by ) )
def query_app_annotations_to_reply (
self , app_record : App , message : Message , query : str , user_id : str , invoke_from : InvokeFrom
) - > Optional [ MessageAnnotation ] :
"""
Query app annotations to reply
: param app_record : app record
: param message : message
: param query : query
: param user_id : user id
: param invoke_from : invoke from
: return :
"""
annotation_reply_feature = AnnotationReplyFeature ( )
return annotation_reply_feature . query (
app_record = app_record , message = message , query = query , user_id = user_id , invoke_from = invoke_from
)
def moderation_for_inputs (
self ,
* ,
app_id : str ,
tenant_id : str ,
app_generate_entity : AppGenerateEntity ,
inputs : Mapping [ str , Any ] ,
query : str | None = None ,
message_id : str ,
) - > tuple [ bool , Mapping [ str , Any ] , str ] :
"""
Process sensitive_word_avoidance .
: param app_id : app id
: param tenant_id : tenant id
: param app_generate_entity : app generate entity
: param inputs : inputs
: param query : query
: param message_id : message id
: return :
"""
moderation_feature = InputModeration ( )
return moderation_feature . check (
app_id = app_id ,
tenant_id = tenant_id ,
app_config = app_generate_entity . app_config ,
inputs = dict ( inputs ) ,
query = query or " " ,
message_id = message_id ,
trace_manager = app_generate_entity . trace_manager ,
)