feat: add upstash as a new vector database provider (#9644)
parent
999d3f1539
commit
8e7a752b2a
@ -0,0 +1,20 @@
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import Field
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class UpstashConfig(BaseSettings):
|
||||
"""
|
||||
Configuration settings for Upstash vector database
|
||||
"""
|
||||
|
||||
UPSTASH_VECTOR_URL: Optional[str] = Field(
|
||||
description="URL of the upstash server (e.g., 'https://vector.upstash.io')",
|
||||
default=None,
|
||||
)
|
||||
|
||||
UPSTASH_VECTOR_TOKEN: Optional[str] = Field(
|
||||
description="Token for authenticating with the upstash server",
|
||||
default=None,
|
||||
)
|
||||
@ -0,0 +1,129 @@
|
||||
import json
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
from pydantic import BaseModel, model_validator
|
||||
from upstash_vector import Index, Vector
|
||||
|
||||
from configs import dify_config
|
||||
from core.rag.datasource.vdb.vector_base import BaseVector
|
||||
from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory
|
||||
from core.rag.datasource.vdb.vector_type import VectorType
|
||||
from core.rag.embedding.embedding_base import Embeddings
|
||||
from core.rag.models.document import Document
|
||||
from models.dataset import Dataset
|
||||
|
||||
|
||||
class UpstashVectorConfig(BaseModel):
|
||||
url: str
|
||||
token: str
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_config(cls, values: dict) -> dict:
|
||||
if not values["url"]:
|
||||
raise ValueError("Upstash URL is required")
|
||||
if not values["token"]:
|
||||
raise ValueError("Upstash Token is required")
|
||||
return values
|
||||
|
||||
|
||||
class UpstashVector(BaseVector):
|
||||
def __init__(self, collection_name: str, config: UpstashVectorConfig):
|
||||
super().__init__(collection_name)
|
||||
self._table_name = collection_name
|
||||
self.index = Index(url=config.url, token=config.token)
|
||||
|
||||
def _get_index_dimension(self) -> int:
|
||||
index_info = self.index.info()
|
||||
if index_info and index_info.dimension:
|
||||
return index_info.dimension
|
||||
else:
|
||||
return 1536
|
||||
|
||||
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
self.add_texts(texts, embeddings)
|
||||
|
||||
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
vectors = [
|
||||
Vector(
|
||||
id=str(uuid4()),
|
||||
vector=embedding,
|
||||
metadata=doc.metadata,
|
||||
data=doc.page_content,
|
||||
)
|
||||
for doc, embedding in zip(documents, embeddings)
|
||||
]
|
||||
self.index.upsert(vectors=vectors)
|
||||
|
||||
def text_exists(self, id: str) -> bool:
|
||||
response = self.get_ids_by_metadata_field("doc_id", id)
|
||||
return len(response) > 0
|
||||
|
||||
def delete_by_ids(self, ids: list[str]) -> None:
|
||||
item_ids = []
|
||||
for doc_id in ids:
|
||||
ids = self.get_ids_by_metadata_field("doc_id", doc_id)
|
||||
if id:
|
||||
item_ids += ids
|
||||
self._delete_by_ids(ids=item_ids)
|
||||
|
||||
def _delete_by_ids(self, ids: list[str]) -> None:
|
||||
if ids:
|
||||
self.index.delete(ids=ids)
|
||||
|
||||
def get_ids_by_metadata_field(self, key: str, value: str) -> list[str]:
|
||||
query_result = self.index.query(
|
||||
vector=[1.001 * i for i in range(self._get_index_dimension())],
|
||||
include_metadata=True,
|
||||
top_k=1000,
|
||||
filter=f"{key} = '{value}'",
|
||||
)
|
||||
return [result.id for result in query_result]
|
||||
|
||||
def delete_by_metadata_field(self, key: str, value: str) -> None:
|
||||
ids = self.get_ids_by_metadata_field(key, value)
|
||||
if ids:
|
||||
self._delete_by_ids(ids)
|
||||
|
||||
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
|
||||
top_k = kwargs.get("top_k", 4)
|
||||
result = self.index.query(vector=query_vector, top_k=top_k, include_metadata=True, include_data=True)
|
||||
docs = []
|
||||
score_threshold = float(kwargs.get("score_threshold") or 0.0)
|
||||
for record in result:
|
||||
metadata = record.metadata
|
||||
text = record.data
|
||||
score = record.score
|
||||
metadata["score"] = score
|
||||
if score > score_threshold:
|
||||
docs.append(Document(page_content=text, metadata=metadata))
|
||||
return docs
|
||||
|
||||
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
|
||||
return []
|
||||
|
||||
def delete(self) -> None:
|
||||
self.index.reset()
|
||||
|
||||
def get_type(self) -> str:
|
||||
return VectorType.UPSTASH
|
||||
|
||||
|
||||
class UpstashVectorFactory(AbstractVectorFactory):
|
||||
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> UpstashVector:
|
||||
if dataset.index_struct_dict:
|
||||
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
|
||||
collection_name = class_prefix.lower()
|
||||
else:
|
||||
dataset_id = dataset.id
|
||||
collection_name = Dataset.gen_collection_name_by_id(dataset_id)
|
||||
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.UPSTASH, collection_name))
|
||||
|
||||
return UpstashVector(
|
||||
collection_name=collection_name,
|
||||
config=UpstashVectorConfig(
|
||||
url=dify_config.UPSTASH_VECTOR_URL,
|
||||
token=dify_config.UPSTASH_VECTOR_TOKEN,
|
||||
),
|
||||
)
|
||||
@ -0,0 +1,75 @@
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from _pytest.monkeypatch import MonkeyPatch
|
||||
from upstash_vector import Index
|
||||
|
||||
|
||||
# Mocking the Index class from upstash_vector
|
||||
class MockIndex:
|
||||
def __init__(self, url="", token=""):
|
||||
self.url = url
|
||||
self.token = token
|
||||
self.vectors = []
|
||||
|
||||
def upsert(self, vectors):
|
||||
for vector in vectors:
|
||||
vector.score = 0.5
|
||||
self.vectors.append(vector)
|
||||
return {"code": 0, "msg": "operation success", "affectedCount": len(vectors)}
|
||||
|
||||
def fetch(self, ids):
|
||||
return [vector for vector in self.vectors if vector.id in ids]
|
||||
|
||||
def delete(self, ids):
|
||||
self.vectors = [vector for vector in self.vectors if vector.id not in ids]
|
||||
return {"code": 0, "msg": "Success"}
|
||||
|
||||
def query(
|
||||
self,
|
||||
vector: None,
|
||||
top_k: int = 10,
|
||||
include_vectors: bool = False,
|
||||
include_metadata: bool = False,
|
||||
filter: str = "",
|
||||
data: Optional[str] = None,
|
||||
namespace: str = "",
|
||||
include_data: bool = False,
|
||||
):
|
||||
# Simple mock query, in real scenario you would calculate similarity
|
||||
mock_result = []
|
||||
for vector_data in self.vectors:
|
||||
mock_result.append(vector_data)
|
||||
return mock_result[:top_k]
|
||||
|
||||
def reset(self):
|
||||
self.vectors = []
|
||||
|
||||
def info(self):
|
||||
return AttrDict({"dimension": 1024})
|
||||
|
||||
|
||||
class AttrDict(dict):
|
||||
def __getattr__(self, item):
|
||||
return self.get(item)
|
||||
|
||||
|
||||
MOCK = os.getenv("MOCK_SWITCH", "false").lower() == "true"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def setup_upstashvector_mock(request, monkeypatch: MonkeyPatch):
|
||||
if MOCK:
|
||||
monkeypatch.setattr(Index, "__init__", MockIndex.__init__)
|
||||
monkeypatch.setattr(Index, "upsert", MockIndex.upsert)
|
||||
monkeypatch.setattr(Index, "fetch", MockIndex.fetch)
|
||||
monkeypatch.setattr(Index, "delete", MockIndex.delete)
|
||||
monkeypatch.setattr(Index, "query", MockIndex.query)
|
||||
monkeypatch.setattr(Index, "reset", MockIndex.reset)
|
||||
monkeypatch.setattr(Index, "info", MockIndex.info)
|
||||
|
||||
yield
|
||||
|
||||
if MOCK:
|
||||
monkeypatch.undo()
|
||||
@ -0,0 +1,63 @@
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from core.rag.datasource.vdb.upstash.upstash_vector import UpstashVector, UpstashVectorConfig
|
||||
from core.rag.models.document import Document
|
||||
from tests.integration_tests.vdb.__mock.upstashvectordb import setup_upstashvector_mock
|
||||
from tests.integration_tests.vdb.test_vector_store import AbstractVectorTest
|
||||
|
||||
|
||||
def get_example_text() -> str:
|
||||
return "test_text"
|
||||
|
||||
|
||||
def get_example_document(doc_id: str) -> Document:
|
||||
doc = Document(
|
||||
page_content=get_example_text(),
|
||||
metadata={
|
||||
"doc_id": doc_id,
|
||||
"doc_hash": doc_id,
|
||||
"document_id": doc_id,
|
||||
"dataset_id": doc_id,
|
||||
},
|
||||
)
|
||||
return doc
|
||||
|
||||
|
||||
class UpstashVectorTest(AbstractVectorTest):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.vector = UpstashVector(
|
||||
collection_name="test_collection",
|
||||
config=UpstashVectorConfig(
|
||||
url="your-server-url",
|
||||
token="your-access-token",
|
||||
),
|
||||
)
|
||||
self.example_embedding = [1.001 * i for i in range(self.vector._get_index_dimension())]
|
||||
|
||||
def add_texts(self) -> list[str]:
|
||||
batch_size = 1
|
||||
documents = [get_example_document(doc_id=str(uuid.uuid4())) for _ in range(batch_size)]
|
||||
embeddings = [self.example_embedding] * batch_size
|
||||
self.vector.add_texts(documents=documents, embeddings=embeddings)
|
||||
return [doc.metadata["doc_id"] for doc in documents]
|
||||
|
||||
def get_ids_by_metadata_field(self):
|
||||
print("doc_id", self.example_doc_id)
|
||||
ids = self.vector.get_ids_by_metadata_field(key="document_id", value=self.example_doc_id)
|
||||
assert len(ids) != 0
|
||||
|
||||
def run_all_tests(self):
|
||||
self.create_vector()
|
||||
time.sleep(1)
|
||||
self.search_by_vector()
|
||||
self.text_exists()
|
||||
self.get_ids_by_metadata_field()
|
||||
added_doc_ids = self.add_texts()
|
||||
self.delete_by_ids(added_doc_ids + [self.example_doc_id])
|
||||
self.delete_vector()
|
||||
|
||||
|
||||
def test_upstash_vector(setup_upstashvector_mock):
|
||||
UpstashVectorTest().run_all_tests()
|
||||
Loading…
Reference in New Issue