Compare commits
56 Commits
main
...
dev/plugin
| Author | SHA1 | Date |
|---|---|---|
|
|
086aeea181 | 1 year ago |
|
|
1d7c4a87d0 | 1 year ago |
|
|
9042b368e9 | 1 year ago |
|
|
f1bcd26c69 | 1 year ago |
|
|
3dcd8b6330 | 1 year ago |
|
|
10c088029c | 1 year ago |
|
|
73b1adf862 | 1 year ago |
|
|
ae76dbd92c | 1 year ago |
|
|
782df0c383 | 1 year ago |
|
|
089207240e | 1 year ago |
|
|
53d30d537f | 1 year ago |
|
|
53512a4650 | 1 year ago |
|
|
1fb7dcda24 | 1 year ago |
|
|
3c3e0a35f4 | 1 year ago |
|
|
202a246e83 | 1 year ago |
|
|
08b968eca5 | 1 year ago |
|
|
b1ac71db3e | 1 year ago |
|
|
7710d8e83b | 1 year ago |
|
|
cf75fcdffc | 1 year ago |
|
|
6e8601b52c | 1 year ago |
|
|
96cf0ed5af | 1 year ago |
|
|
46a798bea8 | 1 year ago |
|
|
9e258c495d | 1 year ago |
|
|
c53786d229 | 1 year ago |
|
|
17f23f4798 | 1 year ago |
|
|
67f2c766bc | 1 year ago |
|
|
5f995fac32 | 1 year ago |
|
|
f88f9d6970 | 1 year ago |
|
|
d2cc502c71 | 1 year ago |
|
|
b88194d1c6 | 1 year ago |
|
|
2b95e54d54 | 1 year ago |
|
|
9bff9b5c9e | 1 year ago |
|
|
3dd2c170e7 | 1 year ago |
|
|
88f41f164f | 1 year ago |
|
|
cd932519b3 | 1 year ago |
|
|
2ff2b08739 | 1 year ago |
|
|
a4a45421cc | 1 year ago |
|
|
aafab1b59e | 1 year ago |
|
|
7f49f96c3f | 1 year ago |
|
|
5673f03db5 | 1 year ago |
|
|
278adbc10e | 1 year ago |
|
|
5d4e517397 | 1 year ago |
|
|
c2671c16a8 | 1 year ago |
|
|
10991cbc03 | 1 year ago |
|
|
3fcf7e88b0 | 1 year ago |
|
|
ffa5af1356 | 1 year ago |
|
|
066516b54d | 1 year ago |
|
|
49415e5e7f | 1 year ago |
|
|
a697bbdfa7 | 1 year ago |
|
|
d5c31f8728 | 1 year ago |
|
|
508005b741 | 1 year ago |
|
|
4f0ecdbb6e | 1 year ago |
|
|
ab2e69faef | 1 year ago |
|
|
e46a3343b8 | 1 year ago |
|
|
47637da734 | 1 year ago |
|
|
525bde28f6 | 1 year ago |
@ -0,0 +1,143 @@
|
||||
from flask_login import current_user # type: ignore # type: ignore
|
||||
from flask_restful import Resource, marshal_with, reqparse # type: ignore
|
||||
from werkzeug.exceptions import NotFound
|
||||
|
||||
from controllers.console import api
|
||||
from controllers.console.wraps import account_initialization_required, enterprise_license_required, setup_required
|
||||
from fields.dataset_fields import dataset_metadata_fields
|
||||
from libs.login import login_required
|
||||
from services.dataset_service import DatasetService
|
||||
from services.entities.knowledge_entities.knowledge_entities import (
|
||||
MetadataArgs,
|
||||
MetadataOperationData,
|
||||
)
|
||||
from services.metadata_service import MetadataService
|
||||
|
||||
|
||||
def _validate_name(name):
|
||||
if not name or len(name) < 1 or len(name) > 40:
|
||||
raise ValueError("Name must be between 1 to 40 characters.")
|
||||
return name
|
||||
|
||||
|
||||
def _validate_description_length(description):
|
||||
if len(description) > 400:
|
||||
raise ValueError("Description cannot exceed 400 characters.")
|
||||
return description
|
||||
|
||||
|
||||
class DatasetListApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@enterprise_license_required
|
||||
@marshal_with(dataset_metadata_fields)
|
||||
def post(self, dataset_id):
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("type", type=str, required=True, nullable=True, location="json")
|
||||
parser.add_argument("name", type=str, required=True, nullable=True, location="json")
|
||||
args = parser.parse_args()
|
||||
metadata_args = MetadataArgs(**args)
|
||||
|
||||
dataset_id_str = str(dataset_id)
|
||||
dataset = DatasetService.get_dataset(dataset_id_str)
|
||||
if dataset is None:
|
||||
raise NotFound("Dataset not found.")
|
||||
DatasetService.check_dataset_permission(dataset, current_user)
|
||||
|
||||
metadata = MetadataService.create_metadata(dataset_id_str, metadata_args)
|
||||
return metadata, 201
|
||||
|
||||
|
||||
class DatasetMetadataApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@enterprise_license_required
|
||||
def patch(self, dataset_id, metadata_id):
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("name", type=str, required=True, nullable=True, location="json")
|
||||
args = parser.parse_args()
|
||||
|
||||
dataset_id_str = str(dataset_id)
|
||||
metadata_id_str = str(metadata_id)
|
||||
dataset = DatasetService.get_dataset(dataset_id_str)
|
||||
if dataset is None:
|
||||
raise NotFound("Dataset not found.")
|
||||
DatasetService.check_dataset_permission(dataset, current_user)
|
||||
|
||||
metadata = MetadataService.update_metadata_name(dataset_id_str, metadata_id_str, args.get("name"))
|
||||
return metadata, 200
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@enterprise_license_required
|
||||
def delete(self, dataset_id, metadata_id):
|
||||
dataset_id_str = str(dataset_id)
|
||||
metadata_id_str = str(metadata_id)
|
||||
dataset = DatasetService.get_dataset(dataset_id_str)
|
||||
if dataset is None:
|
||||
raise NotFound("Dataset not found.")
|
||||
DatasetService.check_dataset_permission(dataset, current_user)
|
||||
|
||||
MetadataService.delete_metadata(dataset_id_str, metadata_id_str)
|
||||
return 200
|
||||
|
||||
|
||||
class DatasetMetadataBuiltInFieldApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@enterprise_license_required
|
||||
def get(self):
|
||||
built_in_fields = MetadataService.get_built_in_fields()
|
||||
return built_in_fields, 200
|
||||
|
||||
|
||||
class DatasetMetadataBuiltInFieldActionApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@enterprise_license_required
|
||||
def post(self, dataset_id, action):
|
||||
dataset_id_str = str(dataset_id)
|
||||
dataset = DatasetService.get_dataset(dataset_id_str)
|
||||
if dataset is None:
|
||||
raise NotFound("Dataset not found.")
|
||||
DatasetService.check_dataset_permission(dataset, current_user)
|
||||
|
||||
if action == "enable":
|
||||
MetadataService.enable_built_in_field(dataset)
|
||||
elif action == "disable":
|
||||
MetadataService.disable_built_in_field(dataset)
|
||||
return 200
|
||||
|
||||
|
||||
class DocumentMetadataApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@enterprise_license_required
|
||||
def post(self, dataset_id):
|
||||
dataset_id_str = str(dataset_id)
|
||||
dataset = DatasetService.get_dataset(dataset_id_str)
|
||||
if dataset is None:
|
||||
raise NotFound("Dataset not found.")
|
||||
DatasetService.check_dataset_permission(dataset, current_user)
|
||||
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("operation_data", type=list, required=True, nullable=True, location="json")
|
||||
args = parser.parse_args()
|
||||
metadata_args = MetadataOperationData(**args)
|
||||
|
||||
MetadataService.update_documents_metadata(dataset, metadata_args)
|
||||
|
||||
return 200
|
||||
|
||||
|
||||
api.add_resource(DatasetListApi, "/datasets/<uuid:dataset_id>/metadata")
|
||||
api.add_resource(DatasetMetadataApi, "/datasets/<uuid:dataset_id>/metadata/<uuid:metadata_id>")
|
||||
api.add_resource(DatasetMetadataBuiltInFieldApi, "/datasets/metadata/built-in")
|
||||
api.add_resource(DatasetMetadataBuiltInFieldActionApi, "/datasets/metadata/built-in/<string:action>")
|
||||
api.add_resource(DocumentMetadataApi, "/datasets/<uuid:dataset_id>/documents/metadata")
|
||||
@ -0,0 +1,9 @@
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class BuiltInField(str, Enum):
|
||||
document_name = "document_name"
|
||||
uploader = "uploader"
|
||||
upload_date = "upload_date"
|
||||
last_update_date = "last_update_date"
|
||||
source = "source"
|
||||
@ -0,0 +1,90 @@
|
||||
"""add_metadata_function
|
||||
|
||||
Revision ID: d20049ed0af6
|
||||
Revises: 08ec4f75af5e
|
||||
Create Date: 2025-02-27 09:17:48.903213
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import models as models
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'd20049ed0af6'
|
||||
down_revision = '08ec4f75af5e'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('dataset_metadata_bindings',
|
||||
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False),
|
||||
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('dataset_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('metadata_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('document_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.Column('created_by', models.types.StringUUID(), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name='dataset_metadata_binding_pkey')
|
||||
)
|
||||
with op.batch_alter_table('dataset_metadata_bindings', schema=None) as batch_op:
|
||||
batch_op.create_index('dataset_metadata_binding_dataset_idx', ['dataset_id'], unique=False)
|
||||
batch_op.create_index('dataset_metadata_binding_document_idx', ['document_id'], unique=False)
|
||||
batch_op.create_index('dataset_metadata_binding_metadata_idx', ['metadata_id'], unique=False)
|
||||
batch_op.create_index('dataset_metadata_binding_tenant_idx', ['tenant_id'], unique=False)
|
||||
|
||||
op.create_table('dataset_metadatas',
|
||||
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False),
|
||||
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('dataset_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('type', sa.String(length=255), nullable=False),
|
||||
sa.Column('name', sa.String(length=255), nullable=False),
|
||||
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP(0)'), nullable=False),
|
||||
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP(0)'), nullable=False),
|
||||
sa.Column('created_by', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('updated_by', models.types.StringUUID(), nullable=True),
|
||||
sa.PrimaryKeyConstraint('id', name='dataset_metadata_pkey')
|
||||
)
|
||||
with op.batch_alter_table('dataset_metadatas', schema=None) as batch_op:
|
||||
batch_op.create_index('dataset_metadata_dataset_idx', ['dataset_id'], unique=False)
|
||||
batch_op.create_index('dataset_metadata_tenant_idx', ['tenant_id'], unique=False)
|
||||
|
||||
with op.batch_alter_table('datasets', schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column('built_in_field_enabled', sa.Boolean(), server_default=sa.text('false'), nullable=False))
|
||||
|
||||
with op.batch_alter_table('documents', schema=None) as batch_op:
|
||||
batch_op.alter_column('doc_metadata',
|
||||
existing_type=postgresql.JSON(astext_type=sa.Text()),
|
||||
type_=postgresql.JSONB(astext_type=sa.Text()),
|
||||
existing_nullable=True)
|
||||
batch_op.create_index('document_metadata_idx', ['doc_metadata'], unique=False, postgresql_using='gin')
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('documents', schema=None) as batch_op:
|
||||
batch_op.drop_index('document_metadata_idx', postgresql_using='gin')
|
||||
batch_op.alter_column('doc_metadata',
|
||||
existing_type=postgresql.JSONB(astext_type=sa.Text()),
|
||||
type_=postgresql.JSON(astext_type=sa.Text()),
|
||||
existing_nullable=True)
|
||||
|
||||
with op.batch_alter_table('datasets', schema=None) as batch_op:
|
||||
batch_op.drop_column('built_in_field_enabled')
|
||||
|
||||
with op.batch_alter_table('dataset_metadatas', schema=None) as batch_op:
|
||||
batch_op.drop_index('dataset_metadata_tenant_idx')
|
||||
batch_op.drop_index('dataset_metadata_dataset_idx')
|
||||
|
||||
op.drop_table('dataset_metadatas')
|
||||
with op.batch_alter_table('dataset_metadata_bindings', schema=None) as batch_op:
|
||||
batch_op.drop_index('dataset_metadata_binding_tenant_idx')
|
||||
batch_op.drop_index('dataset_metadata_binding_metadata_idx')
|
||||
batch_op.drop_index('dataset_metadata_binding_document_idx')
|
||||
batch_op.drop_index('dataset_metadata_binding_dataset_idx')
|
||||
|
||||
op.drop_table('dataset_metadata_bindings')
|
||||
# ### end Alembic commands ###
|
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,182 @@
|
||||
import datetime
|
||||
from typing import Optional
|
||||
|
||||
from flask_login import current_user # type: ignore
|
||||
|
||||
from core.rag.index_processor.constant.built_in_field import BuiltInField
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding
|
||||
from services.dataset_service import DocumentService
|
||||
from services.entities.knowledge_entities.knowledge_entities import (
|
||||
MetadataArgs,
|
||||
MetadataOperationData,
|
||||
)
|
||||
from tasks.update_documents_metadata_task import update_documents_metadata_task
|
||||
|
||||
|
||||
class MetadataService:
|
||||
@staticmethod
|
||||
def create_metadata(dataset_id: str, metadata_args: MetadataArgs) -> DatasetMetadata:
|
||||
metadata = DatasetMetadata(
|
||||
dataset_id=dataset_id,
|
||||
type=metadata_args.type,
|
||||
name=metadata_args.name,
|
||||
created_by=current_user.id,
|
||||
)
|
||||
db.session.add(metadata)
|
||||
db.session.commit()
|
||||
return metadata
|
||||
|
||||
@staticmethod
|
||||
def update_metadata_name(dataset_id: str, metadata_id: str, name: str) -> DatasetMetadata:
|
||||
lock_key = f"dataset_metadata_lock_{dataset_id}"
|
||||
MetadataService.knowledge_base_metadata_lock_check(dataset_id, None)
|
||||
metadata = DatasetMetadata.query.filter_by(id=metadata_id).first()
|
||||
if metadata is None:
|
||||
raise ValueError("Metadata not found.")
|
||||
old_name = metadata.name
|
||||
metadata.name = name
|
||||
metadata.updated_by = current_user.id
|
||||
metadata.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
|
||||
|
||||
# update related documents
|
||||
documents = []
|
||||
dataset_metadata_bindings = DatasetMetadataBinding.query.filter_by(metadata_id=metadata_id).all()
|
||||
if dataset_metadata_bindings:
|
||||
document_ids = [binding.document_id for binding in dataset_metadata_bindings]
|
||||
documents = DocumentService.get_document_by_ids(document_ids)
|
||||
for document in documents:
|
||||
document.doc_metadata[name] = document.doc_metadata.pop(old_name)
|
||||
db.session.add(document)
|
||||
db.session.commit()
|
||||
if document_ids:
|
||||
update_documents_metadata_task.delay(dataset_id, document_ids, lock_key)
|
||||
return metadata
|
||||
|
||||
@staticmethod
|
||||
def delete_metadata(dataset_id: str, metadata_id: str):
|
||||
lock_key = f"dataset_metadata_lock_{dataset_id}"
|
||||
MetadataService.knowledge_base_metadata_lock_check(dataset_id, None)
|
||||
metadata = DatasetMetadata.query.filter_by(id=metadata_id).first()
|
||||
if metadata is None:
|
||||
raise ValueError("Metadata not found.")
|
||||
db.session.delete(metadata)
|
||||
|
||||
# delete related documents
|
||||
dataset_metadata_bindings = DatasetMetadataBinding.query.filter_by(metadata_id=metadata_id).all()
|
||||
if dataset_metadata_bindings:
|
||||
document_ids = [binding.document_id for binding in dataset_metadata_bindings]
|
||||
documents = DocumentService.get_document_by_ids(document_ids)
|
||||
for document in documents:
|
||||
document.doc_metadata.pop(metadata.name)
|
||||
db.session.add(document)
|
||||
db.session.commit()
|
||||
if document_ids:
|
||||
update_documents_metadata_task.delay(dataset_id, document_ids, lock_key)
|
||||
|
||||
@staticmethod
|
||||
def get_built_in_fields():
|
||||
return [
|
||||
{"name": BuiltInField.document_name, "type": "string"},
|
||||
{"name": BuiltInField.uploader, "type": "string"},
|
||||
{"name": BuiltInField.upload_date, "type": "date"},
|
||||
{"name": BuiltInField.last_update_date, "type": "date"},
|
||||
{"name": BuiltInField.source, "type": "string"},
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def enable_built_in_field(dataset: Dataset):
|
||||
if dataset.built_in_fields:
|
||||
return
|
||||
lock_key = f"dataset_metadata_lock_{dataset.id}"
|
||||
MetadataService.knowledge_base_metadata_lock_check(dataset.id, None)
|
||||
dataset.built_in_fields = True
|
||||
db.session.add(dataset)
|
||||
documents = DocumentService.get_working_documents_by_dataset_id(dataset.id)
|
||||
document_ids = []
|
||||
if documents:
|
||||
for document in documents:
|
||||
document.doc_metadata[BuiltInField.document_name] = document.name
|
||||
document.doc_metadata[BuiltInField.uploader] = document.uploader
|
||||
document.doc_metadata[BuiltInField.upload_date] = document.upload_date.strftime("%Y-%m-%d %H:%M:%S")
|
||||
document.doc_metadata[BuiltInField.last_update_date] = document.last_update_date.strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
document.doc_metadata[BuiltInField.source] = document.data_source_type
|
||||
db.session.add(document)
|
||||
document_ids.append(document.id)
|
||||
db.session.commit()
|
||||
if document_ids:
|
||||
update_documents_metadata_task.delay(dataset.id, document_ids, lock_key)
|
||||
|
||||
@staticmethod
|
||||
def disable_built_in_field(dataset: Dataset):
|
||||
if not dataset.built_in_fields:
|
||||
return
|
||||
lock_key = f"dataset_metadata_lock_{dataset.id}"
|
||||
MetadataService.knowledge_base_metadata_lock_check(dataset.id, None)
|
||||
dataset.built_in_fields = False
|
||||
db.session.add(dataset)
|
||||
documents = DocumentService.get_working_documents_by_dataset_id(dataset.id)
|
||||
document_ids = []
|
||||
if documents:
|
||||
for document in documents:
|
||||
document.doc_metadata.pop(BuiltInField.document_name)
|
||||
document.doc_metadata.pop(BuiltInField.uploader)
|
||||
document.doc_metadata.pop(BuiltInField.upload_date)
|
||||
document.doc_metadata.pop(BuiltInField.last_update_date)
|
||||
document.doc_metadata.pop(BuiltInField.source)
|
||||
db.session.add(document)
|
||||
document_ids.append(document.id)
|
||||
db.session.commit()
|
||||
if document_ids:
|
||||
update_documents_metadata_task.delay(dataset.id, document_ids, lock_key)
|
||||
|
||||
@staticmethod
|
||||
def update_documents_metadata(dataset: Dataset, metadata_args: MetadataOperationData):
|
||||
for operation in metadata_args.operation_data:
|
||||
lock_key = f"document_metadata_lock_{operation.document_id}"
|
||||
MetadataService.knowledge_base_metadata_lock_check(None, operation.document_id)
|
||||
document = DocumentService.get_document(operation.document_id)
|
||||
if document is None:
|
||||
raise ValueError("Document not found.")
|
||||
document.doc_metadata = {}
|
||||
for metadata_value in metadata_args.fields:
|
||||
document.doc_metadata[metadata_value.name] = metadata_value.value
|
||||
if dataset.built_in_fields:
|
||||
document.doc_metadata[BuiltInField.document_name] = document.name
|
||||
document.doc_metadata[BuiltInField.uploader] = document.uploader
|
||||
document.doc_metadata[BuiltInField.upload_date] = document.upload_date.strftime("%Y-%m-%d %H:%M:%S")
|
||||
document.doc_metadata[BuiltInField.last_update_date] = document.last_update_date.strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
document.doc_metadata[BuiltInField.source] = document.data_source_type
|
||||
# deal metadata bindding
|
||||
DatasetMetadataBinding.query.filter_by(document_id=operation.document_id).delete()
|
||||
for metadata_value in operation.metadata_list:
|
||||
dataset_metadata_binding = DatasetMetadataBinding(
|
||||
tenant_id=current_user.tenant_id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=operation.document_id,
|
||||
metadata_id=metadata_value.id,
|
||||
created_by=current_user.id,
|
||||
)
|
||||
db.session.add(dataset_metadata_binding)
|
||||
db.session.add(document)
|
||||
db.session.commit()
|
||||
|
||||
update_documents_metadata_task.delay(dataset.id, [document.id], lock_key)
|
||||
|
||||
@staticmethod
|
||||
def knowledge_base_metadata_lock_check(dataset_id: Optional[str], document_id: Optional[str]):
|
||||
if dataset_id:
|
||||
lock_key = f"dataset_metadata_lock_{dataset_id}"
|
||||
if redis_client.get(lock_key):
|
||||
raise ValueError("Another knowledge base metadata operation is running, please wait a moment.")
|
||||
redis_client.set(lock_key, 1, ex=3600)
|
||||
if document_id:
|
||||
lock_key = f"document_metadata_lock_{document_id}"
|
||||
if redis_client.get(lock_key):
|
||||
raise ValueError("Another document metadata operation is running, please wait a moment.")
|
||||
redis_client.set(lock_key, 1, ex=3600)
|
||||
@ -0,0 +1,121 @@
|
||||
import logging
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import click
|
||||
from celery import shared_task # type: ignoreq
|
||||
|
||||
from core.rag.index_processor.constant.built_in_field import BuiltInField
|
||||
from core.rag.index_processor.constant.index_type import IndexType
|
||||
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
|
||||
from core.rag.models.document import ChildDocument, Document
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.dataset import (
|
||||
Document as DatasetDocument,
|
||||
)
|
||||
from models.dataset import (
|
||||
DocumentSegment,
|
||||
)
|
||||
from services.dataset_service import DatasetService
|
||||
|
||||
|
||||
@shared_task(queue="dataset")
|
||||
def update_documents_metadata_task(
|
||||
dataset_id: str,
|
||||
document_ids: list[str],
|
||||
lock_key: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Update documents metadata.
|
||||
:param dataset_id: dataset id
|
||||
:param document_ids: document ids
|
||||
|
||||
Usage: update_documents_metadata_task.delay(dataset_id, document_ids)
|
||||
"""
|
||||
logging.info(click.style("Start update documents metadata: {}".format(dataset_id), fg="green"))
|
||||
start_at = time.perf_counter()
|
||||
|
||||
try:
|
||||
dataset = DatasetService.get_dataset(dataset_id)
|
||||
if dataset is None:
|
||||
raise ValueError("Dataset not found.")
|
||||
documents = (
|
||||
db.session.query(DatasetDocument)
|
||||
.filter(
|
||||
DatasetDocument.dataset_id == dataset_id,
|
||||
DatasetDocument.id.in_(document_ids),
|
||||
DatasetDocument.enabled == True,
|
||||
DatasetDocument.indexing_status == "completed",
|
||||
DatasetDocument.archived == False,
|
||||
)
|
||||
.all()
|
||||
)
|
||||
if not documents:
|
||||
raise ValueError("Documents not found.")
|
||||
for dataset_document in documents:
|
||||
index_processor = IndexProcessorFactory(dataset_document.doc_form).init_index_processor()
|
||||
|
||||
segments = (
|
||||
db.session.query(DocumentSegment)
|
||||
.filter(
|
||||
DocumentSegment.dataset_id == dataset_id,
|
||||
DocumentSegment.document_id == dataset_document.id,
|
||||
DocumentSegment.enabled == True,
|
||||
)
|
||||
.all()
|
||||
)
|
||||
if not segments:
|
||||
continue
|
||||
# delete all documents in vector index
|
||||
index_node_ids = [segment.index_node_id for segment in segments]
|
||||
index_processor.clean(dataset, index_node_ids, with_keywords=False, delete_child_chunks=True)
|
||||
# update documents metadata
|
||||
documents = []
|
||||
for segment in segments:
|
||||
document = Document(
|
||||
page_content=segment.content,
|
||||
metadata={
|
||||
"doc_id": segment.index_node_id,
|
||||
"doc_hash": segment.index_node_hash,
|
||||
"document_id": dataset_document.id,
|
||||
"dataset_id": dataset_id,
|
||||
},
|
||||
)
|
||||
|
||||
if dataset_document.doc_form == IndexType.PARENT_CHILD_INDEX:
|
||||
child_chunks = segment.child_chunks
|
||||
if child_chunks:
|
||||
child_documents = []
|
||||
for child_chunk in child_chunks:
|
||||
child_document = ChildDocument(
|
||||
page_content=child_chunk.content,
|
||||
metadata={
|
||||
"doc_id": child_chunk.index_node_id,
|
||||
"doc_hash": child_chunk.index_node_hash,
|
||||
"document_id": dataset_document.id,
|
||||
"dataset_id": dataset_id,
|
||||
},
|
||||
)
|
||||
if dataset.built_in_field_enabled:
|
||||
child_document.metadata[BuiltInField.uploader] = dataset_document.created_by
|
||||
child_document.metadata[BuiltInField.upload_date] = dataset_document.created_at
|
||||
child_document.metadata[BuiltInField.last_update_date] = dataset_document.updated_at
|
||||
child_document.metadata[BuiltInField.source] = dataset_document.data_source_type
|
||||
child_document.metadata[BuiltInField.original_filename] = dataset_document.name
|
||||
if dataset_document.doc_metadata:
|
||||
child_document.metadata.update(dataset_document.doc_metadata)
|
||||
child_documents.append(child_document)
|
||||
document.children = child_documents
|
||||
documents.append(document) # noqa: B909
|
||||
# save vector index
|
||||
index_processor.load(dataset, documents)
|
||||
end_at = time.perf_counter()
|
||||
logging.info(
|
||||
click.style("Updated documents metadata: {} latency: {}".format(dataset_id, end_at - start_at), fg="green")
|
||||
)
|
||||
except Exception:
|
||||
logging.exception("Updated documents metadata failed")
|
||||
finally:
|
||||
if lock_key:
|
||||
redis_client.delete(lock_key)
|
||||
@ -0,0 +1,24 @@
|
||||
'use client'
|
||||
import { RiCloseLine } from '@remixicon/react'
|
||||
import { useBoolean } from 'ahooks'
|
||||
import type { PropsWithChildren } from 'react'
|
||||
import { useEffect } from 'react'
|
||||
import { useTranslation } from 'react-i18next'
|
||||
|
||||
export default function OfflineNotice({ children }: PropsWithChildren) {
|
||||
const { t } = useTranslation()
|
||||
const [showOfflineNotice, { setFalse }] = useBoolean(true)
|
||||
|
||||
useEffect(() => {
|
||||
const timer = setTimeout(setFalse, 60000)
|
||||
return () => clearTimeout(timer)
|
||||
}, [setFalse])
|
||||
return <>
|
||||
{showOfflineNotice && <div className='px-4 py-2 flex items-center justify-start gap-x-2 bg-[#FFFAEB] border-b-[0.5px] border-b-[#FEF0C7]'>
|
||||
<div className='rounded-[12px] flex items-center justify-center px-2 py-0.5 h-[22px] bg-[#f79009] text-white text-[11px] not-italic font-medium leading[18px]'>{t('common.offlineNoticeTitle')}</div>
|
||||
<div className='grow font-medium leading-[18px] text-[12px] not-italic text-[#344054]'>{t('common.offlineNotice')}</div>
|
||||
<RiCloseLine className='size-4 text-[#667085] cursor-pointer' onClick={setFalse} />
|
||||
</div>}
|
||||
{children}
|
||||
</>
|
||||
}
|
||||
@ -1,16 +0,0 @@
|
||||
import { buildProviderQuery } from './_tools_util'
|
||||
|
||||
describe('makeProviderQuery', () => {
|
||||
test('collectionName without special chars', () => {
|
||||
expect(buildProviderQuery('ABC')).toBe('provider=ABC')
|
||||
})
|
||||
test('should escape &', () => {
|
||||
expect(buildProviderQuery('ABC&DEF')).toBe('provider=ABC%26DEF')
|
||||
})
|
||||
test('should escape /', () => {
|
||||
expect(buildProviderQuery('ABC/DEF')).toBe('provider=ABC%2FDEF')
|
||||
})
|
||||
test('should escape ?', () => {
|
||||
expect(buildProviderQuery('ABC?DEF')).toBe('provider=ABC%3FDEF')
|
||||
})
|
||||
})
|
||||
@ -1,5 +0,0 @@
|
||||
export const buildProviderQuery = (collectionName: string): string => {
|
||||
const query = new URLSearchParams()
|
||||
query.set('provider', collectionName)
|
||||
return query.toString()
|
||||
}
|
||||
Loading…
Reference in New Issue