diff --git a/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py b/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py index 57261a4442..d3e69aea9f 100644 --- a/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py +++ b/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py @@ -35,7 +35,7 @@ class ClickzettaConfig(BaseModel): vcluster: str = "default_ap" schema: str = "dify" # Advanced settings - batch_size: int = 100 + batch_size: int = 20 # Reduced batch size to avoid large SQL statements enable_inverted_index: bool = True # Enable inverted index for full-text search analyzer_type: str = "chinese" # Analyzer type for full-text search: keyword, english, chinese, unicode analyzer_mode: str = "smart" # Analyzer mode: max_word, smart @@ -329,9 +329,31 @@ class ClickzettaVector(BaseVector): columns = f"id, {Field.CONTENT_KEY.value}, {Field.METADATA_KEY.value}, {Field.VECTOR.value}" insert_sql = f"INSERT INTO {self._config.schema}.{self._table_name} ({columns}) VALUES {','.join(values)}" + # Log SQL length for debugging + sql_length = len(insert_sql) + logger.debug(f"SQL statement length: {sql_length} characters") + + # If SQL is too long, split into smaller batches + if sql_length > 1000000: # 1MB limit + logger.warning(f"SQL statement too long ({sql_length} chars), splitting batch") + mid_point = len(batch_docs) // 2 + # Split and process recursively + self._insert_batch_impl(batch_docs[:mid_point], batch_embeddings[:mid_point], + batch_index, batch_size, total_batches) + self._insert_batch_impl(batch_docs[mid_point:], batch_embeddings[mid_point:], + batch_index + mid_point, batch_size, total_batches) + return + with self._connection.cursor() as cursor: - cursor.execute(insert_sql) - logger.info(f"Inserted batch {batch_index // batch_size + 1}/{total_batches}") + try: + cursor.execute(insert_sql) + logger.info(f"Inserted batch {batch_index // batch_size + 1}/{total_batches} " + f"({len(batch_docs)} docs, SQL: {sql_length} chars)") + except Exception: + logger.exception(f"SQL execution failed. SQL length: {sql_length}") + logger.exception(f"First 500 chars of SQL: {insert_sql[:500]}") + logger.exception(f"Last 500 chars of SQL: {insert_sql[-500:]}") + raise def text_exists(self, id: str) -> bool: """Check if a document exists by ID."""