From 967b004037b1a14753f86e72780872dbdb0172f2 Mon Sep 17 00:00:00 2001 From: ZeroZ_JQ Date: Wed, 16 Apr 2025 16:40:00 +0800 Subject: [PATCH] refactor: consolidate document status update logic into batch_update_document_status method --- .../console/datasets/datasets_document.py | 78 +-------------- .../service_api/dataset/dataset.py | 88 ++--------------- api/services/dataset_service.py | 94 +++++++++++++++++++ 3 files changed, 105 insertions(+), 155 deletions(-) diff --git a/api/controllers/console/datasets/datasets_document.py b/api/controllers/console/datasets/datasets_document.py index 0b40312368..e65dbdc822 100644 --- a/api/controllers/console/datasets/datasets_document.py +++ b/api/controllers/console/datasets/datasets_document.py @@ -43,7 +43,6 @@ from core.model_runtime.errors.invoke import InvokeAuthorizationError from core.plugin.manager.exc import PluginDaemonClientSideError from core.rag.extractor.entity.extract_setting import ExtractSetting from extensions.ext_database import db -from extensions.ext_redis import redis_client from fields.document_fields import ( dataset_and_document_fields, document_fields, @@ -54,8 +53,6 @@ from libs.login import login_required from models import Dataset, DatasetProcessRule, Document, DocumentSegment, UploadFile from services.dataset_service import DatasetService, DocumentService from services.entities.knowledge_entities.knowledge_entities import KnowledgeConfig -from tasks.add_document_to_index_task import add_document_to_index_task -from tasks.remove_document_from_index_task import remove_document_from_index_task class DocumentResource(Resource): @@ -822,77 +819,12 @@ class DocumentStatusApi(DocumentResource): DatasetService.check_dataset_permission(dataset, current_user) document_ids = request.args.getlist("document_id") - for document_id in document_ids: - document = self.get_document(dataset_id, document_id) - - indexing_cache_key = "document_{}_indexing".format(document.id) - cache_result = redis_client.get(indexing_cache_key) - if cache_result is not None: - raise InvalidActionError(f"Document:{document.name} is being indexed, please try again later") - - if action == "enable": - if document.enabled: - continue - document.enabled = True - document.disabled_at = None - document.disabled_by = None - document.updated_at = datetime.now(UTC).replace(tzinfo=None) - db.session.commit() - - # Set cache to prevent indexing the same document multiple times - redis_client.setex(indexing_cache_key, 600, 1) - - add_document_to_index_task.delay(document_id) - - elif action == "disable": - if not document.completed_at or document.indexing_status != "completed": - raise InvalidActionError(f"Document: {document.name} is not completed.") - if not document.enabled: - continue - - document.enabled = False - document.disabled_at = datetime.now(UTC).replace(tzinfo=None) - document.disabled_by = current_user.id - document.updated_at = datetime.now(UTC).replace(tzinfo=None) - db.session.commit() - - # Set cache to prevent indexing the same document multiple times - redis_client.setex(indexing_cache_key, 600, 1) - - remove_document_from_index_task.delay(document_id) - - elif action == "archive": - if document.archived: - continue - - document.archived = True - document.archived_at = datetime.now(UTC).replace(tzinfo=None) - document.archived_by = current_user.id - document.updated_at = datetime.now(UTC).replace(tzinfo=None) - db.session.commit() - - if document.enabled: - # Set cache to prevent indexing the same document multiple times - redis_client.setex(indexing_cache_key, 600, 1) - - remove_document_from_index_task.delay(document_id) - - elif action == "un_archive": - if not document.archived: - continue - document.archived = False - document.archived_at = None - document.archived_by = None - document.updated_at = datetime.now(UTC).replace(tzinfo=None) - db.session.commit() - - # Set cache to prevent indexing the same document multiple times - redis_client.setex(indexing_cache_key, 600, 1) - - add_document_to_index_task.delay(document_id) + + try: + DocumentService.batch_update_document_status(dataset, document_ids, action, current_user) + except services.errors.dataset.InvalidActionError as e: + raise InvalidActionError(str(e)) - else: - raise InvalidActionError() return {"result": "success"}, 200 diff --git a/api/controllers/service_api/dataset/dataset.py b/api/controllers/service_api/dataset/dataset.py index d93ca97f46..e449ef7620 100644 --- a/api/controllers/service_api/dataset/dataset.py +++ b/api/controllers/service_api/dataset/dataset.py @@ -1,4 +1,3 @@ -from datetime import UTC, datetime from flask import request from flask_restful import marshal, reqparse # type: ignore @@ -11,15 +10,11 @@ from controllers.service_api.wraps import DatasetApiResource from core.model_runtime.entities.model_entities import ModelType from core.plugin.entities.plugin import ModelProviderID from core.provider_manager import ProviderManager -from extensions.ext_database import db -from extensions.ext_redis import redis_client from fields.dataset_fields import dataset_detail_fields from libs.login import current_user from models.dataset import Dataset, DatasetPermissionEnum from services.dataset_service import DatasetPermissionService, DatasetService, DocumentService from services.entities.knowledge_entities.knowledge_entities import RetrievalModel -from tasks.add_document_to_index_task import add_document_to_index_task -from tasks.remove_document_from_index_task import remove_document_from_index_task def _validate_name(name): @@ -366,83 +361,12 @@ class DocumentStatusApi(DatasetApiResource): data = request.get_json() document_ids = data.get("document_ids", []) - if not document_ids: - return {"result": "success"}, 200 - - for document_id in document_ids: - document = DocumentService.get_document(dataset_id_str, document_id) - - if not document: - continue - - indexing_cache_key = f"document_{document.id}_indexing" - cache_result = redis_client.get(indexing_cache_key) - if cache_result is not None: - raise InvalidActionError(f"Document:{document.name} is being indexed, please try again later") - - if action == "enable": - if document.enabled: - continue - document.enabled = True - document.disabled_at = None - document.disabled_by = None - document.updated_at = datetime.now(UTC).replace(tzinfo=None) - db.session.commit() - - # Set cache to prevent indexing the same document multiple times - redis_client.setex(indexing_cache_key, 600, 1) - - add_document_to_index_task.delay(document_id) - - elif action == "disable": - if not document.completed_at or document.indexing_status != "completed": - raise InvalidActionError(f"Document: {document.name} is not completed.") - if not document.enabled: - continue - - document.enabled = False - document.disabled_at = datetime.now(UTC).replace(tzinfo=None) - document.disabled_by = current_user.id - document.updated_at = datetime.now(UTC).replace(tzinfo=None) - db.session.commit() - - # Set cache to prevent indexing the same document multiple times - redis_client.setex(indexing_cache_key, 600, 1) - - remove_document_from_index_task.delay(document_id) - - elif action == "archive": - if document.archived: - continue - - document.archived = True - document.archived_at = datetime.now(UTC).replace(tzinfo=None) - document.archived_by = current_user.id - document.updated_at = datetime.now(UTC).replace(tzinfo=None) - db.session.commit() - - if document.enabled: - # Set cache to prevent indexing the same document multiple times - redis_client.setex(indexing_cache_key, 600, 1) - - remove_document_from_index_task.delay(document_id) - - elif action == "un_archive": - if not document.archived: - continue - document.archived = False - document.archived_at = None - document.archived_by = None - document.updated_at = datetime.now(UTC).replace(tzinfo=None) - db.session.commit() - - # Set cache to prevent indexing the same document multiple times - redis_client.setex(indexing_cache_key, 600, 1) - - add_document_to_index_task.delay(document_id) - - else: - raise InvalidActionError() + try: + DocumentService.batch_update_document_status(dataset, document_ids, action, current_user) + except services.errors.document.DocumentIndexingError as e: + raise InvalidActionError(str(e)) + except ValueError as e: + raise InvalidActionError(str(e)) return {"result": "success"}, 200 diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index deb6be5a43..0fc0d4f5fd 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -72,6 +72,8 @@ from tasks.enable_segments_to_index_task import enable_segments_to_index_task from tasks.recover_document_indexing_task import recover_document_indexing_task from tasks.retry_document_indexing_task import retry_document_indexing_task from tasks.sync_website_document_indexing_task import sync_website_document_indexing_task +from tasks.add_document_to_index_task import add_document_to_index_task +from tasks.remove_document_from_index_task import remove_document_from_index_task class DatasetService: @@ -1573,6 +1575,98 @@ class DocumentService: if not isinstance(args["process_rule"]["rules"]["segmentation"]["max_tokens"], int): raise ValueError("Process rule segmentation max_tokens is invalid") + @staticmethod + def batch_update_document_status(dataset: Dataset, document_ids: list[str], action: str, user): + """ + Batch update document status. + + Args: + dataset (Dataset): The dataset object + document_ids (list[str]): List of document IDs to update + action (str): Action to perform (enable, disable, archive, un_archive) + user: Current user performing the action + + Raises: + DocumentIndexingError: If document is being indexed or not in correct state + """ + if not document_ids: + return + + for document_id in document_ids: + document = DocumentService.get_document(dataset.id, document_id) + + if not document: + continue + + indexing_cache_key = f"document_{document.id}_indexing" + cache_result = redis_client.get(indexing_cache_key) + if cache_result is not None: + raise DocumentIndexingError(f"Document:{document.name} is being indexed, please try again later") + + if action == "enable": + if document.enabled: + continue + document.enabled = True + document.disabled_at = None + document.disabled_by = None + document.updated_at = datetime.datetime.now() + db.session.commit() + + # Set cache to prevent indexing the same document multiple times + redis_client.setex(indexing_cache_key, 600, 1) + + add_document_to_index_task.delay(document_id) + + elif action == "disable": + if not document.completed_at or document.indexing_status != "completed": + raise DocumentIndexingError(f"Document: {document.name} is not completed.") + if not document.enabled: + continue + + document.enabled = False + document.disabled_at = datetime.datetime.now() + document.disabled_by = user.id + document.updated_at = datetime.datetime.now() + db.session.commit() + + # Set cache to prevent indexing the same document multiple times + redis_client.setex(indexing_cache_key, 600, 1) + + remove_document_from_index_task.delay(document_id) + + elif action == "archive": + if document.archived: + continue + + document.archived = True + document.archived_at = datetime.datetime.now() + document.archived_by = user.id + document.updated_at = datetime.datetime.now() + db.session.commit() + + if document.enabled: + # Set cache to prevent indexing the same document multiple times + redis_client.setex(indexing_cache_key, 600, 1) + + remove_document_from_index_task.delay(document_id) + + elif action == "un_archive": + if not document.archived: + continue + document.archived = False + document.archived_at = None + document.archived_by = None + document.updated_at = datetime.datetime.now() + db.session.commit() + + # Set cache to prevent indexing the same document multiple times + redis_client.setex(indexing_cache_key, 600, 1) + + add_document_to_index_task.delay(document_id) + + else: + raise ValueError(f"Invalid action: {action}") + class SegmentService: @classmethod