Merge remote-tracking branch 'upstream/main' into feat/variable-pool-rebased
commit
c67a7abbe2
@ -0,0 +1,28 @@
|
||||
name: Deploy RAG Dev
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
on:
|
||||
workflow_run:
|
||||
workflows: ["Build and Push API & Web"]
|
||||
branches:
|
||||
- "deploy/rag-dev"
|
||||
types:
|
||||
- completed
|
||||
|
||||
jobs:
|
||||
deploy:
|
||||
runs-on: ubuntu-latest
|
||||
if: |
|
||||
github.event.workflow_run.conclusion == 'success' &&
|
||||
github.event.workflow_run.head_branch == 'deploy/rag-dev'
|
||||
steps:
|
||||
- name: Deploy to server
|
||||
uses: appleboy/ssh-action@v0.1.8
|
||||
with:
|
||||
host: ${{ secrets.RAG_SSH_HOST }}
|
||||
username: ${{ secrets.SSH_USER }}
|
||||
key: ${{ secrets.SSH_PRIVATE_KEY }}
|
||||
script: |
|
||||
${{ vars.SSH_SCRIPT || secrets.SSH_SCRIPT }}
|
||||
@ -0,0 +1,14 @@
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class MatrixoneConfig(BaseModel):
|
||||
"""Matrixone vector database configuration."""
|
||||
|
||||
MATRIXONE_HOST: str = Field(default="localhost", description="Host address of the Matrixone server")
|
||||
MATRIXONE_PORT: int = Field(default=6001, description="Port number of the Matrixone server")
|
||||
MATRIXONE_USER: str = Field(default="dump", description="Username for authenticating with Matrixone")
|
||||
MATRIXONE_PASSWORD: str = Field(default="111", description="Password for authenticating with Matrixone")
|
||||
MATRIXONE_DATABASE: str = Field(default="dify", description="Name of the Matrixone database to connect to")
|
||||
MATRIXONE_METRIC: str = Field(
|
||||
default="l2", description="Distance metric type for vector similarity search (cosine or l2)"
|
||||
)
|
||||
@ -0,0 +1,233 @@
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from functools import wraps
|
||||
from typing import Any, Optional
|
||||
|
||||
from mo_vector.client import MoVectorClient # type: ignore
|
||||
from pydantic import BaseModel, model_validator
|
||||
|
||||
from configs import dify_config
|
||||
from core.rag.datasource.vdb.vector_base import BaseVector
|
||||
from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory
|
||||
from core.rag.datasource.vdb.vector_type import VectorType
|
||||
from core.rag.embedding.embedding_base import Embeddings
|
||||
from core.rag.models.document import Document
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.dataset import Dataset
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MatrixoneConfig(BaseModel):
|
||||
host: str = "localhost"
|
||||
port: int = 6001
|
||||
user: str = "dump"
|
||||
password: str = "111"
|
||||
database: str = "dify"
|
||||
metric: str = "l2"
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_config(cls, values: dict) -> dict:
|
||||
if not values["host"]:
|
||||
raise ValueError("config host is required")
|
||||
if not values["port"]:
|
||||
raise ValueError("config port is required")
|
||||
if not values["user"]:
|
||||
raise ValueError("config user is required")
|
||||
if not values["password"]:
|
||||
raise ValueError("config password is required")
|
||||
if not values["database"]:
|
||||
raise ValueError("config database is required")
|
||||
return values
|
||||
|
||||
|
||||
def ensure_client(func):
|
||||
@wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
if self.client is None:
|
||||
self.client = self._get_client(None, False)
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class MatrixoneVector(BaseVector):
|
||||
"""
|
||||
Matrixone vector storage implementation.
|
||||
"""
|
||||
|
||||
def __init__(self, collection_name: str, config: MatrixoneConfig):
|
||||
super().__init__(collection_name)
|
||||
self.config = config
|
||||
self.collection_name = collection_name.lower()
|
||||
self.client = None
|
||||
|
||||
@property
|
||||
def collection_name(self):
|
||||
return self._collection_name
|
||||
|
||||
@collection_name.setter
|
||||
def collection_name(self, value):
|
||||
self._collection_name = value
|
||||
|
||||
def get_type(self) -> str:
|
||||
return VectorType.MATRIXONE
|
||||
|
||||
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
if self.client is None:
|
||||
self.client = self._get_client(len(embeddings[0]), True)
|
||||
return self.add_texts(texts, embeddings)
|
||||
|
||||
def _get_client(self, dimension: Optional[int] = None, create_table: bool = False) -> MoVectorClient:
|
||||
"""
|
||||
Create a new client for the collection.
|
||||
|
||||
The collection will be created if it doesn't exist.
|
||||
"""
|
||||
lock_name = f"vector_indexing_lock_{self._collection_name}"
|
||||
with redis_client.lock(lock_name, timeout=20):
|
||||
client = MoVectorClient(
|
||||
connection_string=f"mysql+pymysql://{self.config.user}:{self.config.password}@{self.config.host}:{self.config.port}/{self.config.database}",
|
||||
table_name=self.collection_name,
|
||||
vector_dimension=dimension,
|
||||
create_table=create_table,
|
||||
)
|
||||
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
|
||||
if redis_client.get(collection_exist_cache_key):
|
||||
return client
|
||||
try:
|
||||
client.create_full_text_index()
|
||||
except Exception as e:
|
||||
logger.exception("Failed to create full text index")
|
||||
redis_client.set(collection_exist_cache_key, 1, ex=3600)
|
||||
return client
|
||||
|
||||
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
if self.client is None:
|
||||
self.client = self._get_client(len(embeddings[0]), True)
|
||||
assert self.client is not None
|
||||
ids = []
|
||||
for _, doc in enumerate(documents):
|
||||
if doc.metadata is not None:
|
||||
doc_id = doc.metadata.get("doc_id", str(uuid.uuid4()))
|
||||
ids.append(doc_id)
|
||||
self.client.insert(
|
||||
texts=[doc.page_content for doc in documents],
|
||||
embeddings=embeddings,
|
||||
metadatas=[doc.metadata for doc in documents],
|
||||
ids=ids,
|
||||
)
|
||||
return ids
|
||||
|
||||
@ensure_client
|
||||
def text_exists(self, id: str) -> bool:
|
||||
assert self.client is not None
|
||||
result = self.client.get(ids=[id])
|
||||
return len(result) > 0
|
||||
|
||||
@ensure_client
|
||||
def delete_by_ids(self, ids: list[str]) -> None:
|
||||
assert self.client is not None
|
||||
if not ids:
|
||||
return
|
||||
self.client.delete(ids=ids)
|
||||
|
||||
@ensure_client
|
||||
def get_ids_by_metadata_field(self, key: str, value: str):
|
||||
assert self.client is not None
|
||||
results = self.client.query_by_metadata(filter={key: value})
|
||||
return [result.id for result in results]
|
||||
|
||||
@ensure_client
|
||||
def delete_by_metadata_field(self, key: str, value: str) -> None:
|
||||
assert self.client is not None
|
||||
self.client.delete(filter={key: value})
|
||||
|
||||
@ensure_client
|
||||
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
|
||||
assert self.client is not None
|
||||
top_k = kwargs.get("top_k", 5)
|
||||
document_ids_filter = kwargs.get("document_ids_filter")
|
||||
filter = None
|
||||
if document_ids_filter:
|
||||
filter = {"document_id": {"$in": document_ids_filter}}
|
||||
|
||||
results = self.client.query(
|
||||
query_vector=query_vector,
|
||||
k=top_k,
|
||||
filter=filter,
|
||||
)
|
||||
|
||||
docs = []
|
||||
# TODO: add the score threshold to the query
|
||||
for result in results:
|
||||
metadata = result.metadata
|
||||
docs.append(
|
||||
Document(
|
||||
page_content=result.document,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
return docs
|
||||
|
||||
@ensure_client
|
||||
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
|
||||
assert self.client is not None
|
||||
top_k = kwargs.get("top_k", 5)
|
||||
document_ids_filter = kwargs.get("document_ids_filter")
|
||||
filter = None
|
||||
if document_ids_filter:
|
||||
filter = {"document_id": {"$in": document_ids_filter}}
|
||||
score_threshold = float(kwargs.get("score_threshold", 0.0))
|
||||
|
||||
results = self.client.full_text_query(
|
||||
keywords=[query],
|
||||
k=top_k,
|
||||
filter=filter,
|
||||
)
|
||||
|
||||
docs = []
|
||||
for result in results:
|
||||
metadata = result.metadata
|
||||
if isinstance(metadata, str):
|
||||
import json
|
||||
|
||||
metadata = json.loads(metadata)
|
||||
score = 1 - result.distance
|
||||
if score >= score_threshold:
|
||||
metadata["score"] = score
|
||||
docs.append(
|
||||
Document(
|
||||
page_content=result.document,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
return docs
|
||||
|
||||
@ensure_client
|
||||
def delete(self) -> None:
|
||||
assert self.client is not None
|
||||
self.client.delete()
|
||||
|
||||
|
||||
class MatrixoneVectorFactory(AbstractVectorFactory):
|
||||
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> MatrixoneVector:
|
||||
if dataset.index_struct_dict:
|
||||
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
|
||||
collection_name = class_prefix
|
||||
else:
|
||||
dataset_id = dataset.id
|
||||
collection_name = Dataset.gen_collection_name_by_id(dataset_id)
|
||||
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.MATRIXONE, collection_name))
|
||||
|
||||
config = MatrixoneConfig(
|
||||
host=dify_config.MATRIXONE_HOST or "localhost",
|
||||
port=dify_config.MATRIXONE_PORT or 6001,
|
||||
user=dify_config.MATRIXONE_USER or "dump",
|
||||
password=dify_config.MATRIXONE_PASSWORD or "111",
|
||||
database=dify_config.MATRIXONE_DATABASE or "dify",
|
||||
metric=dify_config.MATRIXONE_METRIC or "l2",
|
||||
)
|
||||
return MatrixoneVector(collection_name=collection_name, config=config)
|
||||
@ -0,0 +1,65 @@
|
||||
import contextvars
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from typing import TypeVar
|
||||
|
||||
from flask import Flask, g, has_request_context
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@contextmanager
|
||||
def preserve_flask_contexts(
|
||||
flask_app: Flask,
|
||||
context_vars: contextvars.Context,
|
||||
) -> Iterator[None]:
|
||||
"""
|
||||
A context manager that handles:
|
||||
1. flask-login's UserProxy copy
|
||||
2. ContextVars copy
|
||||
3. flask_app.app_context()
|
||||
|
||||
This context manager ensures that the Flask application context is properly set up,
|
||||
the current user is preserved across context boundaries, and any provided context variables
|
||||
are set within the new context.
|
||||
|
||||
Note:
|
||||
This manager aims to allow use current_user cross thread and app context,
|
||||
but it's not the recommend use, it's better to pass user directly in parameters.
|
||||
|
||||
Args:
|
||||
flask_app: The Flask application instance
|
||||
context_vars: contextvars.Context object containing context variables to be set in the new context
|
||||
|
||||
Yields:
|
||||
None
|
||||
|
||||
Example:
|
||||
```python
|
||||
with preserve_flask_contexts(flask_app, context_vars=context_vars):
|
||||
# Code that needs Flask app context and context variables
|
||||
# Current user will be preserved if available
|
||||
```
|
||||
"""
|
||||
# Set context variables if provided
|
||||
if context_vars:
|
||||
for var, val in context_vars.items():
|
||||
var.set(val)
|
||||
|
||||
# Save current user before entering new app context
|
||||
saved_user = None
|
||||
if has_request_context() and hasattr(g, "_login_user"):
|
||||
saved_user = g._login_user
|
||||
|
||||
# Enter Flask app context
|
||||
with flask_app.app_context():
|
||||
try:
|
||||
# Restore user in new app context if it was saved
|
||||
if saved_user is not None:
|
||||
g._login_user = saved_user
|
||||
|
||||
# Yield control back to the caller
|
||||
yield
|
||||
finally:
|
||||
# Any cleanup can be added here if needed
|
||||
pass
|
||||
@ -0,0 +1,42 @@
|
||||
import logging
|
||||
|
||||
import sendgrid # type: ignore
|
||||
from python_http_client.exceptions import ForbiddenError, UnauthorizedError
|
||||
from sendgrid.helpers.mail import Content, Email, Mail, To # type: ignore
|
||||
|
||||
|
||||
class SendGridClient:
|
||||
def __init__(self, sendgrid_api_key: str, _from: str):
|
||||
self.sendgrid_api_key = sendgrid_api_key
|
||||
self._from = _from
|
||||
|
||||
def send(self, mail: dict):
|
||||
logging.debug("Sending email with SendGrid")
|
||||
|
||||
try:
|
||||
_to = mail["to"]
|
||||
|
||||
if not _to:
|
||||
raise ValueError("SendGridClient: Cannot send email: recipient address is missing.")
|
||||
|
||||
sg = sendgrid.SendGridAPIClient(api_key=self.sendgrid_api_key)
|
||||
from_email = Email(self._from)
|
||||
to_email = To(_to)
|
||||
subject = mail["subject"]
|
||||
content = Content("text/html", mail["html"])
|
||||
mail = Mail(from_email, to_email, subject, content)
|
||||
mail_json = mail.get() # type: ignore
|
||||
response = sg.client.mail.send.post(request_body=mail_json)
|
||||
logging.debug(response.status_code)
|
||||
logging.debug(response.body)
|
||||
logging.debug(response.headers)
|
||||
|
||||
except TimeoutError as e:
|
||||
logging.exception("SendGridClient Timeout occurred while sending email")
|
||||
raise
|
||||
except (UnauthorizedError, ForbiddenError) as e:
|
||||
logging.exception("SendGridClient Authentication failed. Verify that your credentials and the 'from")
|
||||
raise
|
||||
except Exception as e:
|
||||
logging.exception(f"SendGridClient Unexpected error occurred while sending email to {_to}")
|
||||
raise
|
||||
@ -1,5 +1,5 @@
|
||||
from services.errors.base import BaseServiceError
|
||||
|
||||
|
||||
class CompletionStoppedError(BaseServiceError):
|
||||
class PluginInstallationForbiddenError(BaseServiceError):
|
||||
pass
|
||||
@ -0,0 +1,25 @@
|
||||
from core.rag.datasource.vdb.matrixone.matrixone_vector import MatrixoneConfig, MatrixoneVector
|
||||
from tests.integration_tests.vdb.test_vector_store import (
|
||||
AbstractVectorTest,
|
||||
get_example_text,
|
||||
setup_mock_redis,
|
||||
)
|
||||
|
||||
|
||||
class MatrixoneVectorTest(AbstractVectorTest):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.vector = MatrixoneVector(
|
||||
collection_name=self.collection_name,
|
||||
config=MatrixoneConfig(
|
||||
host="localhost", port=6001, user="dump", password="111", database="dify", metric="l2"
|
||||
),
|
||||
)
|
||||
|
||||
def get_ids_by_metadata_field(self):
|
||||
ids = self.vector.get_ids_by_metadata_field(key="document_id", value=self.example_doc_id)
|
||||
assert len(ids) == 1
|
||||
|
||||
|
||||
def test_matrixone_vector(setup_mock_redis):
|
||||
MatrixoneVectorTest().run_all_tests()
|
||||
@ -0,0 +1,124 @@
|
||||
import contextvars
|
||||
import threading
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from flask import Flask
|
||||
from flask_login import LoginManager, UserMixin, current_user, login_user
|
||||
|
||||
from libs.flask_utils import preserve_flask_contexts
|
||||
|
||||
|
||||
class User(UserMixin):
|
||||
"""Simple User class for testing."""
|
||||
|
||||
def __init__(self, id: str):
|
||||
self.id = id
|
||||
|
||||
def get_id(self) -> str:
|
||||
return self.id
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def login_app(app: Flask) -> Flask:
|
||||
"""Set up a Flask app with flask-login."""
|
||||
# Set a secret key for the app
|
||||
app.config["SECRET_KEY"] = "test-secret-key"
|
||||
|
||||
login_manager = LoginManager()
|
||||
login_manager.init_app(app)
|
||||
|
||||
@login_manager.user_loader
|
||||
def load_user(user_id: str) -> Optional[User]:
|
||||
if user_id == "test_user":
|
||||
return User("test_user")
|
||||
return None
|
||||
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_user() -> User:
|
||||
"""Create a test user."""
|
||||
return User("test_user")
|
||||
|
||||
|
||||
def test_current_user_not_accessible_across_threads(login_app: Flask, test_user: User):
|
||||
"""
|
||||
Test that current_user is not accessible in a different thread without preserve_flask_contexts.
|
||||
|
||||
This test demonstrates that without the preserve_flask_contexts, we cannot access
|
||||
current_user in a different thread, even with app_context.
|
||||
"""
|
||||
# Log in the user in the main thread
|
||||
with login_app.test_request_context():
|
||||
login_user(test_user)
|
||||
assert current_user.is_authenticated
|
||||
assert current_user.id == "test_user"
|
||||
|
||||
# Store the result of the thread execution
|
||||
result = {"user_accessible": True, "error": None}
|
||||
|
||||
# Define a function to run in a separate thread
|
||||
def check_user_in_thread():
|
||||
try:
|
||||
# Try to access current_user in a different thread with app_context
|
||||
with login_app.app_context():
|
||||
# This should fail because current_user is not accessible across threads
|
||||
# without preserve_flask_contexts
|
||||
result["user_accessible"] = current_user.is_authenticated
|
||||
except Exception as e:
|
||||
result["error"] = str(e) # type: ignore
|
||||
|
||||
# Run the function in a separate thread
|
||||
thread = threading.Thread(target=check_user_in_thread)
|
||||
thread.start()
|
||||
thread.join()
|
||||
|
||||
# Verify that we got an error or current_user is not authenticated
|
||||
assert result["error"] is not None or (result["user_accessible"] is not None and not result["user_accessible"])
|
||||
|
||||
|
||||
def test_current_user_accessible_with_preserve_flask_contexts(login_app: Flask, test_user: User):
|
||||
"""
|
||||
Test that current_user is accessible in a different thread with preserve_flask_contexts.
|
||||
|
||||
This test demonstrates that with the preserve_flask_contexts, we can access
|
||||
current_user in a different thread.
|
||||
"""
|
||||
# Log in the user in the main thread
|
||||
with login_app.test_request_context():
|
||||
login_user(test_user)
|
||||
assert current_user.is_authenticated
|
||||
assert current_user.id == "test_user"
|
||||
|
||||
# Save the context variables
|
||||
context_vars = contextvars.copy_context()
|
||||
|
||||
# Store the result of the thread execution
|
||||
result = {"user_accessible": False, "user_id": None, "error": None}
|
||||
|
||||
# Define a function to run in a separate thread
|
||||
def check_user_in_thread_with_manager():
|
||||
try:
|
||||
# Use preserve_flask_contexts to access current_user in a different thread
|
||||
with preserve_flask_contexts(login_app, context_vars):
|
||||
from flask_login import current_user
|
||||
|
||||
if current_user:
|
||||
result["user_accessible"] = True
|
||||
result["user_id"] = current_user.id
|
||||
else:
|
||||
result["user_accessible"] = False
|
||||
except Exception as e:
|
||||
result["error"] = str(e) # type: ignore
|
||||
|
||||
# Run the function in a separate thread
|
||||
thread = threading.Thread(target=check_user_in_thread_with_manager)
|
||||
thread.start()
|
||||
thread.join()
|
||||
|
||||
# Verify that current_user is accessible and has the correct ID
|
||||
assert result["error"] is None
|
||||
assert result["user_accessible"] is True
|
||||
assert result["user_id"] == "test_user"
|
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,72 @@
|
||||
import { useEffect, useState } from 'react'
|
||||
|
||||
type DSLDragDropHookProps = {
|
||||
onDSLFileDropped: (file: File) => void
|
||||
containerRef: React.RefObject<HTMLDivElement>
|
||||
enabled?: boolean
|
||||
}
|
||||
|
||||
export const useDSLDragDrop = ({ onDSLFileDropped, containerRef, enabled = true }: DSLDragDropHookProps) => {
|
||||
const [dragging, setDragging] = useState(false)
|
||||
|
||||
const handleDragEnter = (e: DragEvent) => {
|
||||
e.preventDefault()
|
||||
e.stopPropagation()
|
||||
if (e.dataTransfer?.types.includes('Files'))
|
||||
setDragging(true)
|
||||
}
|
||||
|
||||
const handleDragOver = (e: DragEvent) => {
|
||||
e.preventDefault()
|
||||
e.stopPropagation()
|
||||
}
|
||||
|
||||
const handleDragLeave = (e: DragEvent) => {
|
||||
e.preventDefault()
|
||||
e.stopPropagation()
|
||||
if (e.relatedTarget === null || !containerRef.current?.contains(e.relatedTarget as Node))
|
||||
setDragging(false)
|
||||
}
|
||||
|
||||
const handleDrop = (e: DragEvent) => {
|
||||
e.preventDefault()
|
||||
e.stopPropagation()
|
||||
setDragging(false)
|
||||
|
||||
if (!e.dataTransfer)
|
||||
return
|
||||
|
||||
const files = [...e.dataTransfer.files]
|
||||
if (files.length === 0)
|
||||
return
|
||||
|
||||
const file = files[0]
|
||||
if (file.name.toLowerCase().endsWith('.yaml') || file.name.toLowerCase().endsWith('.yml'))
|
||||
onDSLFileDropped(file)
|
||||
}
|
||||
|
||||
useEffect(() => {
|
||||
if (!enabled)
|
||||
return
|
||||
|
||||
const current = containerRef.current
|
||||
if (current) {
|
||||
current.addEventListener('dragenter', handleDragEnter)
|
||||
current.addEventListener('dragover', handleDragOver)
|
||||
current.addEventListener('dragleave', handleDragLeave)
|
||||
current.addEventListener('drop', handleDrop)
|
||||
}
|
||||
return () => {
|
||||
if (current) {
|
||||
current.removeEventListener('dragenter', handleDragEnter)
|
||||
current.removeEventListener('dragover', handleDragOver)
|
||||
current.removeEventListener('dragleave', handleDragLeave)
|
||||
current.removeEventListener('drop', handleDrop)
|
||||
}
|
||||
}
|
||||
}, [containerRef, enabled])
|
||||
|
||||
return {
|
||||
dragging: enabled ? dragging : false,
|
||||
}
|
||||
}
|
||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue