@ -68,7 +68,7 @@ class ClickzettaVector(BaseVector):
"""
Clickzetta vector storage implementation .
"""
# Class-level write queue and lock for serializing writes
_write_queue : Optional [ queue . Queue ] = None
_write_thread : Optional [ threading . Thread ] = None
@ -94,13 +94,13 @@ class ClickzettaVector(BaseVector):
vcluster = self . _config . vcluster ,
schema = self . _config . schema_name
)
# Set session parameters for better string handling
with self . _connection . cursor ( ) as cursor :
# Use quote mode for string literal escaping to handle quotes better
cursor . execute ( " SET cz.sql.string.literal.escape.mode = ' quote ' " )
logger . info ( " Set string literal escape mode to ' quote ' for better quote handling " )
@classmethod
def _init_write_queue ( cls ) :
""" Initialize the write queue and worker thread. """
@ -110,7 +110,7 @@ class ClickzettaVector(BaseVector):
cls . _write_thread = threading . Thread ( target = cls . _write_worker , daemon = True )
cls . _write_thread . start ( )
logger . info ( " Started Clickzetta write worker thread " )
@classmethod
def _write_worker ( cls ) :
""" Worker thread that processes write tasks sequentially. """
@ -120,7 +120,7 @@ class ClickzettaVector(BaseVector):
task = cls . _write_queue . get ( timeout = 1 )
if task is None : # Shutdown signal
break
# Execute the write task
func , args , kwargs , result_queue = task
try :
@ -135,15 +135,15 @@ class ClickzettaVector(BaseVector):
continue
except Exception as e :
logger . exception ( " Write worker error " )
def _execute_write ( self , func , * args , * * kwargs ) :
""" Execute a write operation through the queue. """
if ClickzettaVector . _write_queue is None :
raise RuntimeError ( " Write queue not initialized " )
result_queue = queue . Queue ( )
ClickzettaVector . _write_queue . put ( ( func , args , kwargs , result_queue ) )
# Wait for result
success , result = result_queue . get ( )
if not success :
@ -171,18 +171,18 @@ class ClickzettaVector(BaseVector):
""" Create the collection and add initial documents. """
# Execute table creation through write queue to avoid concurrent conflicts
self . _execute_write ( self . _create_table_and_indexes , embeddings )
# Add initial texts
if texts :
self . add_texts ( texts , embeddings , * * kwargs )
def _create_table_and_indexes ( self , embeddings : list [ list [ float ] ] ) :
""" Create table and indexes (executed in write worker thread). """
# Check if table already exists to avoid unnecessary index creation
if self . _table_exists ( ) :
logger . info ( f " Table { self . _config . schema_name } . { self . _table_name } already exists, skipping creation " )
return
# Create table with vector and metadata columns
dimension = len ( embeddings [ 0 ] ) if embeddings else 768
@ -191,7 +191,8 @@ class ClickzettaVector(BaseVector):
id STRING NOT NULL COMMENT ' Unique document identifier ' ,
{ Field . CONTENT_KEY . value } STRING NOT NULL COMMENT ' Document text content for search and retrieval ' ,
{ Field . METADATA_KEY . value } JSON COMMENT ' Document metadata including source, type, and other attributes ' ,
{ Field . VECTOR . value } VECTOR ( FLOAT , { dimension } ) NOT NULL COMMENT ' High-dimensional embedding vector for semantic similarity search ' ,
{ Field . VECTOR . value } VECTOR ( FLOAT , { dimension } ) NOT NULL COMMENT
' High-dimensional embedding vector for semantic similarity search ' ,
PRIMARY KEY ( id )
) COMMENT ' Dify RAG knowledge base vector storage table for document embeddings and content '
"""
@ -211,7 +212,7 @@ class ClickzettaVector(BaseVector):
""" Create HNSW vector index for similarity search. """
# Use a fixed index name based on table and column name
index_name = f " idx_ { self . _table_name } _vector "
# First check if an index already exists on this column
try :
cursor . execute ( f " SHOW INDEX FROM { self . _config . schema_name } . { self . _table_name } " )
@ -223,7 +224,7 @@ class ClickzettaVector(BaseVector):
return
except Exception as e :
logger . warning ( f " Failed to check existing indexes: { e } " )
index_sql = f """
CREATE VECTOR INDEX IF NOT EXISTS { index_name }
ON TABLE { self . _config . schema_name } . { self . _table_name } ( { Field . VECTOR . value } )
@ -239,8 +240,8 @@ class ClickzettaVector(BaseVector):
logger . info ( f " Created vector index: { index_name } " )
except Exception as e :
error_msg = str ( e ) . lower ( )
if ( " already exists " in error_msg or
" already has index " in error_msg or
if ( " already exists " in error_msg or
" already has index " in error_msg or
" with the same type " in error_msg ) :
logger . info ( f " Vector index already exists: { e } " )
else :
@ -251,7 +252,7 @@ class ClickzettaVector(BaseVector):
""" Create inverted index for full-text search. """
# Use a fixed index name based on table name to avoid duplicates
index_name = f " idx_ { self . _table_name } _text "
# Check if an inverted index already exists on this column
try :
cursor . execute ( f " SHOW INDEX FROM { self . _config . schema_name } . { self . _table_name } " )
@ -259,14 +260,14 @@ class ClickzettaVector(BaseVector):
for idx in existing_indexes :
idx_str = str ( idx ) . lower ( )
# More precise check: look for inverted index specifically on the content column
if ( " inverted " in idx_str and
if ( " inverted " in idx_str and
Field . CONTENT_KEY . value . lower ( ) in idx_str and
( index_name . lower ( ) in idx_str or f " idx_ { self . _table_name } _text " in idx_str ) ) :
logger . info ( f " Inverted index already exists on column { Field . CONTENT_KEY . value } : { idx } " )
return
except Exception as e :
logger . warning ( f " Failed to check existing indexes: { e } " )
index_sql = f """
CREATE INVERTED INDEX IF NOT EXISTS { index_name }
ON TABLE { self . _config . schema_name } . { self . _table_name } ( { Field . CONTENT_KEY . value } )
@ -281,8 +282,8 @@ class ClickzettaVector(BaseVector):
except Exception as e :
error_msg = str ( e ) . lower ( )
# Handle ClickZetta specific error messages
if ( ( " already exists " in error_msg or
" already has index " in error_msg or
if ( ( " already exists " in error_msg or
" already has index " in error_msg or
" with the same type " in error_msg or
" cannot create inverted index " in error_msg ) and
" already has index " in error_msg ) :
@ -313,44 +314,44 @@ class ClickzettaVector(BaseVector):
for i in range ( 0 , len ( documents ) , batch_size ) :
batch_docs = documents [ i : i + batch_size ]
batch_embeddings = embeddings [ i : i + batch_size ]
# Execute batch insert through write queue
self . _execute_write ( self . _insert_batch , batch_docs , batch_embeddings , i , batch_size , total_batches )
def _insert_batch ( self , batch_docs : list [ Document ] , batch_embeddings : list [ list [ float ] ] ,
def _insert_batch ( self , batch_docs : list [ Document ] , batch_embeddings : list [ list [ float ] ] ,
batch_index : int , batch_size : int , total_batches : int ) :
""" Insert a batch of documents using parameterized queries (executed in write worker thread). """
if not batch_docs or not batch_embeddings :
logger . warning ( " Empty batch provided, skipping insertion " )
return
if len ( batch_docs ) != len ( batch_embeddings ) :
logger . error ( f " Mismatch between docs ( { len ( batch_docs ) } ) and embeddings ( { len ( batch_embeddings ) } ) " )
return
# Prepare data for parameterized insertion
data_rows = [ ]
vector_dimension = len ( batch_embeddings [ 0 ] ) if batch_embeddings and batch_embeddings [ 0 ] else 768
for doc , embedding in zip ( batch_docs , batch_embeddings ) :
# Optimized: minimal checks for common case, fallback for edge cases
metadata = doc . metadata if doc . metadata else { }
if not isinstance ( metadata , dict ) :
metadata = { }
doc_id = self . _safe_doc_id ( metadata . get ( " doc_id " , str ( uuid . uuid4 ( ) ) ) )
# Fast path for JSON serialization
try :
metadata_json = json . dumps ( metadata , ensure_ascii = True )
except ( TypeError , ValueError ) :
logger . warning ( " JSON serialization failed, using empty dict " )
metadata_json = " {} "
content = doc . page_content or " "
# According to ClickZetta docs, vector should be formatted as array string
# According to ClickZetta docs, vector should be formatted as array string
# for external systems: '[1.0, 2.0, 3.0]'
vector_str = ' [ ' + ' , ' . join ( map ( str , embedding ) ) + ' ] '
data_rows . append ( [ doc_id , content , metadata_json , vector_str ] )
@ -359,17 +360,22 @@ class ClickzettaVector(BaseVector):
if not data_rows :
logger . warning ( f " No valid documents to insert in batch { batch_index / / batch_size + 1 } / { total_batches } " )
return
# Use parameterized INSERT with executemany for better performance and security
# Cast JSON and VECTOR in SQL, pass raw data as parameters
columns = f " id, { Field . CONTENT_KEY . value } , { Field . METADATA_KEY . value } , { Field . VECTOR . value } "
insert_sql = f " INSERT INTO { self . _config . schema_name } . { self . _table_name } ( { columns } ) VALUES (?, ?, CAST(? AS JSON), CAST(? AS VECTOR( { vector_dimension } ))) "
insert_sql = (
f " INSERT INTO { self . _config . schema_name } . { self . _table_name } ( { columns } ) "
f " VALUES (?, ?, CAST(? AS JSON), CAST(? AS VECTOR( { vector_dimension } ))) "
)
with self . _connection . cursor ( ) as cursor :
try :
cursor . executemany ( insert_sql , data_rows )
logger . info ( f " Inserted batch { batch_index / / batch_size + 1 } / { total_batches } "
f " ( { len ( data_rows ) } valid docs using parameterized query with VECTOR( { vector_dimension } ) cast) " )
logger . info (
f " Inserted batch { batch_index / / batch_size + 1 } / { total_batches } "
f " ( { len ( data_rows ) } valid docs using parameterized query with VECTOR( { vector_dimension } ) cast) "
)
except Exception as e :
logger . exception ( f " Parameterized SQL execution failed for { len ( data_rows ) } documents: { e } " )
logger . exception ( f " SQL template: { insert_sql } " )
@ -399,14 +405,14 @@ class ClickzettaVector(BaseVector):
# Execute delete through write queue
self . _execute_write ( self . _delete_by_ids_impl , ids )
def _delete_by_ids_impl ( self , ids : list [ str ] ) - > None :
""" Implementation of delete by IDs (executed in write worker thread). """
safe_ids = [ self . _safe_doc_id ( id ) for id in ids ]
# Create properly escaped string literals for SQL
id_list = " , " . join ( f " ' { id } ' " for id in safe_ids )
sql = f " DELETE FROM { self . _config . schema_name } . { self . _table_name } WHERE id IN ( { id_list } ) "
with self . _connection . cursor ( ) as cursor :
cursor . execute ( sql )
@ -419,7 +425,7 @@ class ClickzettaVector(BaseVector):
# Execute delete through write queue
self . _execute_write ( self . _delete_by_metadata_field_impl , key , value )
def _delete_by_metadata_field_impl ( self , key : str , value : str ) - > None :
""" Implementation of delete by metadata field (executed in write worker thread). """
with self . _connection . cursor ( ) as cursor :
@ -435,7 +441,7 @@ class ClickzettaVector(BaseVector):
top_k = kwargs . get ( " top_k " , 10 )
score_threshold = kwargs . get ( " score_threshold " , 0.0 )
document_ids_filter = kwargs . get ( " document_ids_filter " )
# Handle filter parameter from canvas (workflow)
filter_param = kwargs . get ( " filter " , { } )
@ -445,8 +451,10 @@ class ClickzettaVector(BaseVector):
safe_doc_ids = [ str ( id ) . replace ( " ' " , " ' ' " ) for id in document_ids_filter ]
doc_ids_str = " , " . join ( f " ' { id } ' " for id in safe_doc_ids )
# Use json_extract_string function for ClickZetta compatibility
filter_clauses . append ( f " json_extract_string( { Field . METADATA_KEY . value } , ' $.document_id ' ) IN ( { doc_ids_str } ) " )
filter_clauses . append (
f " json_extract_string( { Field . METADATA_KEY . value } , ' $.document_id ' ) IN ( { doc_ids_str } ) "
)
# No need for dataset_id filter since each dataset has its own table
# Add distance threshold based on distance function
@ -489,11 +497,11 @@ class ClickzettaVector(BaseVector):
try :
if row [ 2 ] :
metadata = json . loads ( row [ 2 ] )
# If result is a string, it's double-encoded JSON - parse again
if isinstance ( metadata , str ) :
metadata = json . loads ( metadata )
if not isinstance ( metadata , dict ) :
metadata = { }
else :
@ -504,14 +512,14 @@ class ClickzettaVector(BaseVector):
import re
doc_id_match = re . search ( r ' " document_id " : \ s* " ([^ " ]+) " ' , str ( row [ 2 ] or ' ' ) )
metadata = { " document_id " : doc_id_match . group ( 1 ) } if doc_id_match else { }
# Ensure required fields are set
metadata [ " doc_id " ] = row [ 0 ] # segment id
# Ensure document_id exists (critical for Dify's format_retrieval_documents)
if " document_id " not in metadata :
metadata [ " document_id " ] = row [ 0 ] # fallback to segment id
# Add score based on distance
if self . _config . vector_distance_function == " cosine_distance " :
metadata [ " score " ] = 1 - ( row [ 3 ] / 2 )
@ -531,7 +539,7 @@ class ClickzettaVector(BaseVector):
top_k = kwargs . get ( " top_k " , 10 )
document_ids_filter = kwargs . get ( " document_ids_filter " )
# Handle filter parameter from canvas (workflow)
filter_param = kwargs . get ( " filter " , { } )
@ -541,8 +549,10 @@ class ClickzettaVector(BaseVector):
safe_doc_ids = [ str ( id ) . replace ( " ' " , " ' ' " ) for id in document_ids_filter ]
doc_ids_str = " , " . join ( f " ' { id } ' " for id in safe_doc_ids )
# Use json_extract_string function for ClickZetta compatibility
filter_clauses . append ( f " json_extract_string( { Field . METADATA_KEY . value } , ' $.document_id ' ) IN ( { doc_ids_str } ) " )
filter_clauses . append (
f " json_extract_string( { Field . METADATA_KEY . value } , ' $.document_id ' ) IN ( { doc_ids_str } ) "
)
# No need for dataset_id filter since each dataset has its own table
# Use match_all function for full-text search
@ -572,11 +582,11 @@ class ClickzettaVector(BaseVector):
try :
if row [ 2 ] :
metadata = json . loads ( row [ 2 ] )
# If result is a string, it's double-encoded JSON - parse again
if isinstance ( metadata , str ) :
metadata = json . loads ( metadata )
if not isinstance ( metadata , dict ) :
metadata = { }
else :
@ -587,14 +597,14 @@ class ClickzettaVector(BaseVector):
import re
doc_id_match = re . search ( r ' " document_id " : \ s* " ([^ " ]+) " ' , str ( row [ 2 ] or ' ' ) )
metadata = { " document_id " : doc_id_match . group ( 1 ) } if doc_id_match else { }
# Ensure required fields are set
metadata [ " doc_id " ] = row [ 0 ] # segment id
# Ensure document_id exists (critical for Dify's format_retrieval_documents)
if " document_id " not in metadata :
metadata [ " document_id " ] = row [ 0 ] # fallback to segment id
# Add a relevance score for full-text search
metadata [ " score " ] = 1.0 # Clickzetta doesn't provide relevance scores
doc = Document ( page_content = row [ 1 ] , metadata = metadata )
@ -610,7 +620,7 @@ class ClickzettaVector(BaseVector):
""" Fallback search using LIKE operator. """
top_k = kwargs . get ( " top_k " , 10 )
document_ids_filter = kwargs . get ( " document_ids_filter " )
# Handle filter parameter from canvas (workflow)
filter_param = kwargs . get ( " filter " , { } )
@ -620,8 +630,10 @@ class ClickzettaVector(BaseVector):
safe_doc_ids = [ str ( id ) . replace ( " ' " , " ' ' " ) for id in document_ids_filter ]
doc_ids_str = " , " . join ( f " ' { id } ' " for id in safe_doc_ids )
# Use json_extract_string function for ClickZetta compatibility
filter_clauses . append ( f " json_extract_string( { Field . METADATA_KEY . value } , ' $.document_id ' ) IN ( { doc_ids_str } ) " )
filter_clauses . append (
f " json_extract_string( { Field . METADATA_KEY . value } , ' $.document_id ' ) IN ( { doc_ids_str } ) "
)
# No need for dataset_id filter since each dataset has its own table
# Use simple quote escaping for LIKE clause
@ -646,11 +658,11 @@ class ClickzettaVector(BaseVector):
try :
if row [ 2 ] :
metadata = json . loads ( row [ 2 ] )
# If result is a string, it's double-encoded JSON - parse again
if isinstance ( metadata , str ) :
metadata = json . loads ( metadata )
if not isinstance ( metadata , dict ) :
metadata = { }
else :
@ -661,14 +673,14 @@ class ClickzettaVector(BaseVector):
import re
doc_id_match = re . search ( r ' " document_id " : \ s* " ([^ " ]+) " ' , str ( row [ 2 ] or ' ' ) )
metadata = { " document_id " : doc_id_match . group ( 1 ) } if doc_id_match else { }
# Ensure required fields are set
metadata [ " doc_id " ] = row [ 0 ] # segment id
# Ensure document_id exists (critical for Dify's format_retrieval_documents)
if " document_id " not in metadata :
metadata [ " document_id " ] = row [ 0 ] # fallback to segment id
metadata [ " score " ] = 0.5 # Lower score for LIKE search
doc = Document ( page_content = row [ 1 ] , metadata = metadata )
documents . append ( doc )
@ -680,11 +692,11 @@ class ClickzettaVector(BaseVector):
with self . _connection . cursor ( ) as cursor :
cursor . execute ( f " DROP TABLE IF EXISTS { self . _config . schema_name } . { self . _table_name } " )
def _format_vector_simple ( self , vector : list [ float ] ) - > str :
""" Simple vector formatting for SQL queries. """
return ' , ' . join ( map ( str , vector ) )
def _safe_doc_id ( self , doc_id : str ) - > str :
""" Ensure doc_id is safe for SQL and doesn ' t contain special characters. """
if not doc_id :
@ -696,7 +708,7 @@ class ClickzettaVector(BaseVector):
if not safe_id : # If all characters were removed
return str ( uuid . uuid4 ( ) )
return safe_id [ : 255 ] # Limit length
class ClickzettaVectorFactory ( AbstractVectorFactory ) :
@ -724,3 +736,4 @@ class ClickzettaVectorFactory(AbstractVectorFactory):
collection_name = Dataset . gen_collection_name_by_id ( dataset . id ) . lower ( )
return ClickzettaVector ( collection_name = collection_name , config = config )