Fix SQL statement length issues and improve batch processing

- Add SQL length monitoring and automatic batch splitting
- Reduce default batch size from 100 to 20 to prevent large SQL statements
- Add detailed error logging for SQL execution failures
- Implement recursive batch splitting for oversized SQL statements
- Set 1MB limit for SQL statement length

This resolves issues where large batches create SQL statements that
exceed database limits, causing vector insertion failures.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
pull/22551/head
yunqiqiliang 10 months ago
parent 8dea8766e9
commit fcf8387f52

@ -35,7 +35,7 @@ class ClickzettaConfig(BaseModel):
vcluster: str = "default_ap" vcluster: str = "default_ap"
schema: str = "dify" schema: str = "dify"
# Advanced settings # Advanced settings
batch_size: int = 100 batch_size: int = 20 # Reduced batch size to avoid large SQL statements
enable_inverted_index: bool = True # Enable inverted index for full-text search enable_inverted_index: bool = True # Enable inverted index for full-text search
analyzer_type: str = "chinese" # Analyzer type for full-text search: keyword, english, chinese, unicode analyzer_type: str = "chinese" # Analyzer type for full-text search: keyword, english, chinese, unicode
analyzer_mode: str = "smart" # Analyzer mode: max_word, smart analyzer_mode: str = "smart" # Analyzer mode: max_word, smart
@ -329,9 +329,31 @@ class ClickzettaVector(BaseVector):
columns = f"id, {Field.CONTENT_KEY.value}, {Field.METADATA_KEY.value}, {Field.VECTOR.value}" columns = f"id, {Field.CONTENT_KEY.value}, {Field.METADATA_KEY.value}, {Field.VECTOR.value}"
insert_sql = f"INSERT INTO {self._config.schema}.{self._table_name} ({columns}) VALUES {','.join(values)}" insert_sql = f"INSERT INTO {self._config.schema}.{self._table_name} ({columns}) VALUES {','.join(values)}"
# Log SQL length for debugging
sql_length = len(insert_sql)
logger.debug(f"SQL statement length: {sql_length} characters")
# If SQL is too long, split into smaller batches
if sql_length > 1000000: # 1MB limit
logger.warning(f"SQL statement too long ({sql_length} chars), splitting batch")
mid_point = len(batch_docs) // 2
# Split and process recursively
self._insert_batch_impl(batch_docs[:mid_point], batch_embeddings[:mid_point],
batch_index, batch_size, total_batches)
self._insert_batch_impl(batch_docs[mid_point:], batch_embeddings[mid_point:],
batch_index + mid_point, batch_size, total_batches)
return
with self._connection.cursor() as cursor: with self._connection.cursor() as cursor:
try:
cursor.execute(insert_sql) cursor.execute(insert_sql)
logger.info(f"Inserted batch {batch_index // batch_size + 1}/{total_batches}") logger.info(f"Inserted batch {batch_index // batch_size + 1}/{total_batches} "
f"({len(batch_docs)} docs, SQL: {sql_length} chars)")
except Exception:
logger.exception(f"SQL execution failed. SQL length: {sql_length}")
logger.exception(f"First 500 chars of SQL: {insert_sql[:500]}")
logger.exception(f"Last 500 chars of SQL: {insert_sql[-500:]}")
raise
def text_exists(self, id: str) -> bool: def text_exists(self, id: str) -> bool:
"""Check if a document exists by ID.""" """Check if a document exists by ID."""

Loading…
Cancel
Save