@ -44,7 +44,12 @@ from models.dataset import Dataset, DatasetMetadata, Document, RateLimitLog
from models . workflow import WorkflowNodeExecutionStatus
from services . feature_service import FeatureService
from . entities import KnowledgeRetrievalNodeData , ModelConfig
from . entities import (
KnowledgeRetrievalNodeData ,
MetadataFilteringComplexCondition ,
MetadataFilteringComplexSubCondition ,
ModelConfig ,
)
from . exc import (
InvalidModelTypeError ,
KnowledgeRetrievalNodeError ,
@ -315,7 +320,60 @@ class KnowledgeRetrievalNode(LLMNode):
for position , item in enumerate ( retrieval_resource_list , start = 1 ) :
item [ " metadata " ] [ " position " ] = position
return retrieval_resource_list
def _recursive_metadata_filter (
self , metadata_filtering_complex_conditions : MetadataFilteringComplexSubCondition , filters
) :
logical_operator = metadata_filtering_complex_conditions . logical_operator
conditions = metadata_filtering_complex_conditions . conditions
sub_conditions = metadata_filtering_complex_conditions . sub_conditions
sub_filters = [ ]
if sub_conditions :
for sub_condition in sub_conditions :
sub_filter = self . _recursive_metadata_filter ( sub_condition , filters )
sub_filters . append ( sub_filter )
temp_filters = [ ]
if conditions :
for sequence , condition in enumerate ( conditions ) :
metadata_name = condition . name
expected_value = condition . value
if expected_value is not None or condition . comparison_operator in ( " empty " , " not empty " ) :
if isinstance ( expected_value , str ) :
expected_value = self . graph_runtime_state . variable_pool . convert_template (
expected_value
) . value [ 0 ]
if expected_value . value_type == " number " : # type: ignore
expected_value = expected_value . value # type: ignore
elif expected_value . value_type == " string " : # type: ignore
expected_value = re . sub ( r " [ \ r \ n \ t]+ " , " " , expected_value . text ) . strip ( ) # type: ignore
else :
raise ValueError ( " Invalid expected metadata value type " )
temp_filters = self . _process_metadata_filter_func (
sequence ,
condition . comparison_operator ,
metadata_name ,
expected_value ,
temp_filters ,
)
if temp_filters :
if logical_operator == " and " : # type: ignore
temp_filters = and_ ( * temp_filters )
else :
temp_filters = or_ ( * temp_filters )
filters . append ( temp_filters )
if sub_filters :
if logical_operator == " and " : # type: ignore
sub_filters = and_ ( * sub_filters )
else :
sub_filters = or_ ( * sub_filters )
filters . append ( sub_filters )
return filters
def _get_metadata_filter_condition (
self , dataset_ids : list , query : str , node_data : KnowledgeRetrievalNodeData
) - > tuple [ Optional [ dict [ str , list [ str ] ] ] , Optional [ MetadataCondition ] ] :
@ -329,6 +387,27 @@ class KnowledgeRetrievalNode(LLMNode):
metadata_condition = None
if node_data . metadata_filtering_mode == " disabled " :
return None , None
elif node_data . metadata_filtering_mode == " complex_conditions " :
# todo: do not support external_knowledge_retrieval
if node_data . metadata_filtering_complex_conditions :
# Enable forward references
MetadataFilteringComplexSubCondition . model_rebuild ( )
metadata_filtering_complex_conditions = MetadataFilteringComplexCondition (
* * node_data . metadata_filtering_complex_conditions . model_dump ( ) )
for sequence , condition in enumerate ( metadata_filtering_complex_conditions . conditions ) : # type: ignore
filters = self . _recursive_metadata_filter ( condition , filters )
if filters :
if metadata_filtering_complex_conditions . logical_operator == " and " : # type: ignore
document_query = document_query . filter ( and_ ( * filters ) )
else :
document_query = document_query . filter ( or_ ( * filters ) )
metadata_condition = metadata_filtering_complex_conditions
documents = document_query . all ( )
# group by dataset_id
metadata_filter_document_ids = defaultdict ( list ) if documents else None # type: ignore
for document in documents :
metadata_filter_document_ids [ document . dataset_id ] . append ( document . id ) # type: ignore
return metadata_filter_document_ids , metadata_condition
elif node_data . metadata_filtering_mode == " automatic " :
automatic_metadata_filters = self . _automatic_metadata_filter_func ( dataset_ids , query , node_data )
if automatic_metadata_filters :