From fcf8387f52acb02b2a6e9ace752b64c910ce9392 Mon Sep 17 00:00:00 2001 From: yunqiqiliang <132561395+yunqiqiliang@users.noreply.github.com> Date: Thu, 17 Jul 2025 18:00:25 +0800 Subject: [PATCH] Fix SQL statement length issues and improve batch processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add SQL length monitoring and automatic batch splitting - Reduce default batch size from 100 to 20 to prevent large SQL statements - Add detailed error logging for SQL execution failures - Implement recursive batch splitting for oversized SQL statements - Set 1MB limit for SQL statement length This resolves issues where large batches create SQL statements that exceed database limits, causing vector insertion failures. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../vdb/clickzetta/clickzetta_vector.py | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py b/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py index 57261a4442..d3e69aea9f 100644 --- a/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py +++ b/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py @@ -35,7 +35,7 @@ class ClickzettaConfig(BaseModel): vcluster: str = "default_ap" schema: str = "dify" # Advanced settings - batch_size: int = 100 + batch_size: int = 20 # Reduced batch size to avoid large SQL statements enable_inverted_index: bool = True # Enable inverted index for full-text search analyzer_type: str = "chinese" # Analyzer type for full-text search: keyword, english, chinese, unicode analyzer_mode: str = "smart" # Analyzer mode: max_word, smart @@ -329,9 +329,31 @@ class ClickzettaVector(BaseVector): columns = f"id, {Field.CONTENT_KEY.value}, {Field.METADATA_KEY.value}, {Field.VECTOR.value}" insert_sql = f"INSERT INTO {self._config.schema}.{self._table_name} ({columns}) VALUES {','.join(values)}" + # Log SQL length for debugging + sql_length = len(insert_sql) + logger.debug(f"SQL statement length: {sql_length} characters") + + # If SQL is too long, split into smaller batches + if sql_length > 1000000: # 1MB limit + logger.warning(f"SQL statement too long ({sql_length} chars), splitting batch") + mid_point = len(batch_docs) // 2 + # Split and process recursively + self._insert_batch_impl(batch_docs[:mid_point], batch_embeddings[:mid_point], + batch_index, batch_size, total_batches) + self._insert_batch_impl(batch_docs[mid_point:], batch_embeddings[mid_point:], + batch_index + mid_point, batch_size, total_batches) + return + with self._connection.cursor() as cursor: - cursor.execute(insert_sql) - logger.info(f"Inserted batch {batch_index // batch_size + 1}/{total_batches}") + try: + cursor.execute(insert_sql) + logger.info(f"Inserted batch {batch_index // batch_size + 1}/{total_batches} " + f"({len(batch_docs)} docs, SQL: {sql_length} chars)") + except Exception: + logger.exception(f"SQL execution failed. SQL length: {sql_length}") + logger.exception(f"First 500 chars of SQL: {insert_sql[:500]}") + logger.exception(f"Last 500 chars of SQL: {insert_sql[-500:]}") + raise def text_exists(self, id: str) -> bool: """Check if a document exists by ID."""