diff --git a/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py b/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py index a3066664f9..aa773ad510 100644 --- a/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py +++ b/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py @@ -172,6 +172,11 @@ class ClickzettaVector(BaseVector): def _create_table_and_indexes(self, embeddings: list[list[float]]): """Create table and indexes (executed in write worker thread).""" + # Check if table already exists to avoid unnecessary index creation + if self._table_exists(): + logger.info(f"Table {self._config.schema}.{self._table_name} already exists, skipping creation") + return + # Create table with vector and metadata columns dimension = len(embeddings[0]) if embeddings else 768 @@ -187,6 +192,7 @@ class ClickzettaVector(BaseVector): with self._connection.cursor() as cursor: cursor.execute(create_table_sql) + logger.info(f"Created table {self._config.schema}.{self._table_name}") # Create vector index self._create_vector_index(cursor) @@ -245,9 +251,12 @@ class ClickzettaVector(BaseVector): cursor.execute(f"SHOW INDEX FROM {self._config.schema}.{self._table_name}") existing_indexes = cursor.fetchall() for idx in existing_indexes: - # Check if inverted index already exists on the content column - if Field.CONTENT_KEY.value in str(idx).lower() and "inverted" in str(idx).lower(): - logger.info(f"Inverted index already exists on column {Field.CONTENT_KEY.value}") + idx_str = str(idx).lower() + # More precise check: look for inverted index specifically on the content column + if ("inverted" in idx_str and + Field.CONTENT_KEY.value.lower() in idx_str and + (index_name.lower() in idx_str or f"idx_{self._table_name}_text" in idx_str)): + logger.info(f"Inverted index already exists on column {Field.CONTENT_KEY.value}: {idx}") return except Exception as e: logger.warning(f"Failed to check existing indexes: {e}") @@ -265,19 +274,27 @@ class ClickzettaVector(BaseVector): logger.info(f"Created inverted index: {index_name}") except Exception as e: error_msg = str(e).lower() - if ("already exists" in error_msg or + # Handle ClickZetta specific error messages + if (("already exists" in error_msg or "already has index" in error_msg or - "with the same type" in error_msg): + "with the same type" in error_msg or + "cannot create inverted index" in error_msg) and + "already has index" in error_msg): logger.info(f"Inverted index already exists on column {Field.CONTENT_KEY.value}") + # Try to get the existing index name for logging + try: + cursor.execute(f"SHOW INDEX FROM {self._config.schema}.{self._table_name}") + existing_indexes = cursor.fetchall() + for idx in existing_indexes: + if "inverted" in str(idx).lower() and Field.CONTENT_KEY.value.lower() in str(idx).lower(): + logger.info(f"Found existing inverted index: {idx}") + break + except Exception: + pass else: logger.warning(f"Failed to create inverted index: {e}") # Continue without inverted index - full-text search will fall back to LIKE - def _table_exists(self) -> bool: - """Check if the table exists.""" - with self._connection.cursor() as cursor: - cursor.execute(f"SHOW TABLES IN {self._config.schema} LIKE '{self._table_name}'") - return len(cursor.fetchall()) > 0 def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs): """Add documents with embeddings to the collection."""