diff --git a/api/commands.py b/api/commands.py index ce1703380e..2b0f3813f3 100644 --- a/api/commands.py +++ b/api/commands.py @@ -1,7 +1,10 @@ import base64 import json import logging +import os import secrets +import time +from pathlib import Path from typing import Optional import click @@ -13,6 +16,7 @@ from core.rag.models.document import Document from events.app_event import app_was_created from extensions.ext_database import db from extensions.ext_redis import redis_client +from extensions.ext_storage import storage from flask import current_app from libs.helper import email as email_validate from libs.password import hash_password, password_pattern, valid_password @@ -21,7 +25,7 @@ from models import Account, Tenant, TenantAccountJoin, TenantAccountJoinRole from models.dataset import Dataset, DatasetCollectionBinding from models.dataset import Document as DatasetDocument from models.dataset import DocumentSegment -from models.model import App, AppAnnotationSetting, AppMode, Conversation, MessageAnnotation +from models.model import App, AppAnnotationSetting, AppMode, Conversation, MessageAnnotation, UploadFile from models.provider import Provider, ProviderModel from services.account_service import RegisterService, TenantService from werkzeug.exceptions import NotFound @@ -506,7 +510,7 @@ def add_qdrant_doc_id_index(field: str): prefer_grpc=dify_config.QDRANT_GRPC_ENABLED, ) try: - client = qdrant_client.QdrantClient(**qdrant_config.to_qdrant_params()) + client = qdrant_client.QdrantClient(**qdrant_config.to_qdrant_params()) # type: ignore # create payload index client.create_payload_index(binding.collection_name, field, field_schema=PayloadSchemaType.KEYWORD) create_count += 1 @@ -983,3 +987,122 @@ def add_account_to_organization_cmd(org_id, account_id, role, department, title, except Exception as e: db.session.rollback() click.echo(f"Error adding account to organization: {str(e)}") + + +@click.command("upload-private-key-file-to-cloud-storage", help="upload private key file to cloud storage") +@click.option("--tenant_id", prompt=False, help="tenant_id") +def upload_private_key_file_cloud_storage(tenant_id: Optional[str] = None): + """ + upload private.pem to cloud storage + """ + click.echo( + click.style( + "Start upload private.pem to cloud storage", + fg="green", + ) + ) + + if not tenant_id: + click.echo( + click.style( + "Warning: did not provide an tenant_id, it will be auto queried in the database", + fg="yellow", + ) + ) + tenants_list: list[Tenant] = Tenant.query.all() + tenants = [item.id for item in tenants_list] + else: + tenants = [ + tenant_id, + ] + + for tenant_id in tenants: + click.echo( + click.style( + f"Current tenant_id is: {tenant_id}", + fg="green", + ) + ) + + file_key = f"privkeys/{tenant_id}/private.pem" + file_content = Path(f"{os.environ.get('STORAGE_LOCAL_PATH', 'storage')}/{file_key}").read_bytes() + storage.save(filename=file_key, data=file_content) + click.echo( + click.style( + f"Congratulations! file uploaded. file.key: {file_key}", + fg="green", + ) + ) + + +@click.command("upload-local-files-to-cloud-storage", help="upload local files to cloud storage") +def upload_local_files_to_cloud_storage(): + """ + upload local files to cloud storage + """ + click.echo( + click.style( + "Start upload local files to cloud storage", + fg="green", + ) + ) + + total_count = UploadFile.query.filter_by(storage_type="local").count() + click.echo(click.style(f"Total files to process: {total_count}", fg="green")) + + batch_size = 100 + processed_count = 0 + while processed_count < total_count: + files: list[UploadFile] = UploadFile.query.filter_by(storage_type="local").limit(batch_size).all() + + for file in files: + target_filepath = f"{os.environ.get('STORAGE_LOCAL_PATH', 'storage')}/{file.key}" + + # if the file exists + if not os.path.exists(target_filepath): + click.echo( + click.style( + f"Warning! file not exist. filepath: {target_filepath}, ignore this, continue", + fg="yellow", + ) + ) + processed_count += 1 + if processed_count % 10 == 0 or processed_count == total_count: + click.echo(click.style(f"Processed {processed_count}/{total_count} files\n", fg="blue")) + continue + + # Upload to cloud storage + file_content = Path(target_filepath).read_bytes() + storage.save(filename=file.key, data=file_content) + click.echo( + click.style( + f"File uploaded. file.key: {file.key}", + fg="green", + ) + ) + + # Update database record + try: + file.storage_type = os.environ["STORAGE_TYPE"] + db.session.commit() + click.echo( + click.style( + f"file.storage_type updated to database. file.key: {file.key}", + fg="green", + ) + ) + except Exception as e: + click.echo(click.style(f"An error occurred: {str(e)}", fg="red")) + db.session.rollback() + + processed_count += 1 + if processed_count % 10 == 0 or processed_count == total_count: + click.echo(click.style(f"Processed {processed_count}/{total_count} files\n", fg="blue")) + + time.sleep(3) + click.echo( + click.style( + "Congratulations! finish files uploaded.", + fg="green", + ) + ) diff --git a/api/extensions/ext_commands.py b/api/extensions/ext_commands.py index 85ff9f8c91..842ef94840 100644 --- a/api/extensions/ext_commands.py +++ b/api/extensions/ext_commands.py @@ -17,6 +17,8 @@ def init_app(app: DifyApp): show_organization_cmd, update_organization_cmd, upgrade_db, + upload_local_files_to_cloud_storage, + upload_private_key_file_cloud_storage, vdb_migrate, ) @@ -36,6 +38,8 @@ def init_app(app: DifyApp): list_organizations_cmd, show_organization_cmd, update_organization_cmd, + upload_private_key_file_cloud_storage, + upload_local_files_to_cloud_storage, ] for cmd in cmds_to_register: app.cli.add_command(cmd)