From b5a3f1d5e05fff43b8e0f3e0dddc6ea742bb9ed8 Mon Sep 17 00:00:00 2001 From: yunqiqiliang <132561395+yunqiqiliang@users.noreply.github.com> Date: Fri, 18 Jul 2025 21:03:58 +0800 Subject: [PATCH] Fix remaining Python style and linting issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix line length violation in middleware config description - Fix RUF013 type annotation to use union syntax - Complete all Python style and linting fixes for CI checks - Resolve formatter and linter warnings 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .env.example | 1197 +++++++++++++++++ api/configs/middleware/__init__.py | 4 +- .../clickzetta_volume_storage_config.py | 22 +- .../middleware/vdb/clickzetta_config.py | 1 - api/extensions/ext_storage.py | 2 +- .../clickzetta_volume_storage.py | 224 ++- .../clickzetta_volume/file_lifecycle.py | 266 ++-- .../clickzetta_volume/volume_permissions.py | 112 +- .../storage/test_clickzetta_volume.py | 82 +- 9 files changed, 1555 insertions(+), 355 deletions(-) create mode 100644 .env.example diff --git a/.env.example b/.env.example new file mode 100644 index 0000000000..3e95f2e982 --- /dev/null +++ b/.env.example @@ -0,0 +1,1197 @@ +# ------------------------------ +# Environment Variables for API service & worker +# ------------------------------ + +# ------------------------------ +# Common Variables +# ------------------------------ + +# The backend URL of the console API, +# used to concatenate the authorization callback. +# If empty, it is the same domain. +# Example: https://api.console.dify.ai +CONSOLE_API_URL= + +# The front-end URL of the console web, +# used to concatenate some front-end addresses and for CORS configuration use. +# If empty, it is the same domain. +# Example: https://console.dify.ai +CONSOLE_WEB_URL= + +# Service API Url, +# used to display Service API Base Url to the front-end. +# If empty, it is the same domain. +# Example: https://api.dify.ai +SERVICE_API_URL= + +# WebApp API backend Url, +# used to declare the back-end URL for the front-end API. +# If empty, it is the same domain. +# Example: https://api.app.dify.ai +APP_API_URL= + +# WebApp Url, +# used to display WebAPP API Base Url to the front-end. +# If empty, it is the same domain. +# Example: https://app.dify.ai +APP_WEB_URL= + +# File preview or download Url prefix. +# used to display File preview or download Url to the front-end or as Multi-model inputs; +# Url is signed and has expiration time. +# Setting FILES_URL is required for file processing plugins. +# - For https://example.com, use FILES_URL=https://example.com +# - For http://example.com, use FILES_URL=http://example.com +# Recommendation: use a dedicated domain (e.g., https://upload.example.com). +# Alternatively, use http://:5001 or http://api:5001, +# ensuring port 5001 is externally accessible (see docker-compose.yaml). +FILES_URL= + +# INTERNAL_FILES_URL is used for plugin daemon communication within Docker network. +# Set this to the internal Docker service URL for proper plugin file access. +# Example: INTERNAL_FILES_URL=http://api:5001 +INTERNAL_FILES_URL= + +# ------------------------------ +# Server Configuration +# ------------------------------ + +# The log level for the application. +# Supported values are `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL` +LOG_LEVEL=INFO +# Log file path +LOG_FILE=/app/logs/server.log +# Log file max size, the unit is MB +LOG_FILE_MAX_SIZE=20 +# Log file max backup count +LOG_FILE_BACKUP_COUNT=5 +# Log dateformat +LOG_DATEFORMAT=%Y-%m-%d %H:%M:%S +# Log Timezone +LOG_TZ=UTC + +# Debug mode, default is false. +# It is recommended to turn on this configuration for local development +# to prevent some problems caused by monkey patch. +DEBUG=false + +# Flask debug mode, it can output trace information at the interface when turned on, +# which is convenient for debugging. +FLASK_DEBUG=false + +# Enable request logging, which will log the request and response information. +# And the log level is DEBUG +ENABLE_REQUEST_LOGGING=False + +# A secret key that is used for securely signing the session cookie +# and encrypting sensitive information on the database. +# You can generate a strong key using `openssl rand -base64 42`. +SECRET_KEY=sk-9f73s3ljTXVcMT3Blb3ljTqtsKiGHXVcMT3BlbkFJLK7U + +# Password for admin user initialization. +# If left unset, admin user will not be prompted for a password +# when creating the initial admin account. +# The length of the password cannot exceed 30 characters. +INIT_PASSWORD= + +# Deployment environment. +# Supported values are `PRODUCTION`, `TESTING`. Default is `PRODUCTION`. +# Testing environment. There will be a distinct color label on the front-end page, +# indicating that this environment is a testing environment. +DEPLOY_ENV=PRODUCTION + +# Whether to enable the version check policy. +# If set to empty, https://updates.dify.ai will be called for version check. +CHECK_UPDATE_URL=https://updates.dify.ai + +# Used to change the OpenAI base address, default is https://api.openai.com/v1. +# When OpenAI cannot be accessed in China, replace it with a domestic mirror address, +# or when a local model provides OpenAI compatible API, it can be replaced. +OPENAI_API_BASE=https://api.openai.com/v1 + +# When enabled, migrations will be executed prior to application startup +# and the application will start after the migrations have completed. +MIGRATION_ENABLED=true + +# File Access Time specifies a time interval in seconds for the file to be accessed. +# The default value is 300 seconds. +FILES_ACCESS_TIMEOUT=300 + +# Access token expiration time in minutes +ACCESS_TOKEN_EXPIRE_MINUTES=60 + +# Refresh token expiration time in days +REFRESH_TOKEN_EXPIRE_DAYS=30 + +# The maximum number of active requests for the application, where 0 means unlimited, should be a non-negative integer. +APP_MAX_ACTIVE_REQUESTS=0 +APP_MAX_EXECUTION_TIME=1200 + +# ------------------------------ +# Container Startup Related Configuration +# Only effective when starting with docker image or docker-compose. +# ------------------------------ + +# API service binding address, default: 0.0.0.0, i.e., all addresses can be accessed. +DIFY_BIND_ADDRESS=0.0.0.0 + +# API service binding port number, default 5001. +DIFY_PORT=5001 + +# The number of API server workers, i.e., the number of workers. +# Formula: number of cpu cores x 2 + 1 for sync, 1 for Gevent +# Reference: https://docs.gunicorn.org/en/stable/design.html#how-many-workers +SERVER_WORKER_AMOUNT=1 + +# Defaults to gevent. If using windows, it can be switched to sync or solo. +SERVER_WORKER_CLASS=gevent + +# Default number of worker connections, the default is 10. +SERVER_WORKER_CONNECTIONS=10 + +# Similar to SERVER_WORKER_CLASS. +# If using windows, it can be switched to sync or solo. +CELERY_WORKER_CLASS= + +# Request handling timeout. The default is 200, +# it is recommended to set it to 360 to support a longer sse connection time. +GUNICORN_TIMEOUT=360 + +# The number of Celery workers. The default is 1, and can be set as needed. +CELERY_WORKER_AMOUNT= + +# Flag indicating whether to enable autoscaling of Celery workers. +# +# Autoscaling is useful when tasks are CPU intensive and can be dynamically +# allocated and deallocated based on the workload. +# +# When autoscaling is enabled, the maximum and minimum number of workers can +# be specified. The autoscaling algorithm will dynamically adjust the number +# of workers within the specified range. +# +# Default is false (i.e., autoscaling is disabled). +# +# Example: +# CELERY_AUTO_SCALE=true +CELERY_AUTO_SCALE=false + +# The maximum number of Celery workers that can be autoscaled. +# This is optional and only used when autoscaling is enabled. +# Default is not set. +CELERY_MAX_WORKERS= + +# The minimum number of Celery workers that can be autoscaled. +# This is optional and only used when autoscaling is enabled. +# Default is not set. +CELERY_MIN_WORKERS= + +# API Tool configuration +API_TOOL_DEFAULT_CONNECT_TIMEOUT=10 +API_TOOL_DEFAULT_READ_TIMEOUT=60 + +# ------------------------------- +# Datasource Configuration +# -------------------------------- +ENABLE_WEBSITE_JINAREADER=true +ENABLE_WEBSITE_FIRECRAWL=true +ENABLE_WEBSITE_WATERCRAWL=true + +# ------------------------------ +# Database Configuration +# The database uses PostgreSQL. Please use the public schema. +# It is consistent with the configuration in the 'db' service below. +# ------------------------------ + +DB_USERNAME=postgres +DB_PASSWORD=difyai123456 +DB_HOST=db +DB_PORT=5432 +DB_DATABASE=dify +# The size of the database connection pool. +# The default is 30 connections, which can be appropriately increased. +SQLALCHEMY_POOL_SIZE=30 +# Database connection pool recycling time, the default is 3600 seconds. +SQLALCHEMY_POOL_RECYCLE=3600 +# Whether to print SQL, default is false. +SQLALCHEMY_ECHO=false +# If True, will test connections for liveness upon each checkout +SQLALCHEMY_POOL_PRE_PING=false +# Whether to enable the Last in first out option or use default FIFO queue if is false +SQLALCHEMY_POOL_USE_LIFO=false + +# Maximum number of connections to the database +# Default is 100 +# +# Reference: https://www.postgresql.org/docs/current/runtime-config-connection.html#GUC-MAX-CONNECTIONS +POSTGRES_MAX_CONNECTIONS=100 + +# Sets the amount of shared memory used for postgres's shared buffers. +# Default is 128MB +# Recommended value: 25% of available memory +# Reference: https://www.postgresql.org/docs/current/runtime-config-resource.html#GUC-SHARED-BUFFERS +POSTGRES_SHARED_BUFFERS=128MB + +# Sets the amount of memory used by each database worker for working space. +# Default is 4MB +# +# Reference: https://www.postgresql.org/docs/current/runtime-config-resource.html#GUC-WORK-MEM +POSTGRES_WORK_MEM=4MB + +# Sets the amount of memory reserved for maintenance activities. +# Default is 64MB +# +# Reference: https://www.postgresql.org/docs/current/runtime-config-resource.html#GUC-MAINTENANCE-WORK-MEM +POSTGRES_MAINTENANCE_WORK_MEM=64MB + +# Sets the planner's assumption about the effective cache size. +# Default is 4096MB +# +# Reference: https://www.postgresql.org/docs/current/runtime-config-query.html#GUC-EFFECTIVE-CACHE-SIZE +POSTGRES_EFFECTIVE_CACHE_SIZE=4096MB + +# ------------------------------ +# Redis Configuration +# This Redis configuration is used for caching and for pub/sub during conversation. +# ------------------------------ + +REDIS_HOST=redis +REDIS_PORT=6379 +REDIS_USERNAME= +REDIS_PASSWORD=difyai123456 +REDIS_USE_SSL=false +REDIS_DB=0 + +# Whether to use Redis Sentinel mode. +# If set to true, the application will automatically discover and connect to the master node through Sentinel. +REDIS_USE_SENTINEL=false + +# List of Redis Sentinel nodes. If Sentinel mode is enabled, provide at least one Sentinel IP and port. +# Format: `:,:,:` +REDIS_SENTINELS= +REDIS_SENTINEL_SERVICE_NAME= +REDIS_SENTINEL_USERNAME= +REDIS_SENTINEL_PASSWORD= +REDIS_SENTINEL_SOCKET_TIMEOUT=0.1 + +# List of Redis Cluster nodes. If Cluster mode is enabled, provide at least one Cluster IP and port. +# Format: `:,:,:` +REDIS_USE_CLUSTERS=false +REDIS_CLUSTERS= +REDIS_CLUSTERS_PASSWORD= + +# ------------------------------ +# Celery Configuration +# ------------------------------ + +# Use redis as the broker, and redis db 1 for celery broker. +# Format as follows: `redis://:@:/` +# Example: redis://:difyai123456@redis:6379/1 +# If use Redis Sentinel, format as follows: `sentinel://:@:/` +# Example: sentinel://localhost:26379/1;sentinel://localhost:26380/1;sentinel://localhost:26381/1 +CELERY_BROKER_URL=redis://:difyai123456@redis:6379/1 +BROKER_USE_SSL=false + +# If you are using Redis Sentinel for high availability, configure the following settings. +CELERY_USE_SENTINEL=false +CELERY_SENTINEL_MASTER_NAME= +CELERY_SENTINEL_PASSWORD= +CELERY_SENTINEL_SOCKET_TIMEOUT=0.1 + +# ------------------------------ +# CORS Configuration +# Used to set the front-end cross-domain access policy. +# ------------------------------ + +# Specifies the allowed origins for cross-origin requests to the Web API, +# e.g. https://dify.app or * for all origins. +WEB_API_CORS_ALLOW_ORIGINS=* + +# Specifies the allowed origins for cross-origin requests to the console API, +# e.g. https://cloud.dify.ai or * for all origins. +CONSOLE_CORS_ALLOW_ORIGINS=* + +# ------------------------------ +# File Storage Configuration +# ------------------------------ + +# The type of storage to use for storing user files. +STORAGE_TYPE=opendal + +# Apache OpenDAL Configuration +# The configuration for OpenDAL consists of the following format: OPENDAL__. +# You can find all the service configurations (CONFIG_NAME) in the repository at: https://github.com/apache/opendal/tree/main/core/src/services. +# Dify will scan configurations starting with OPENDAL_ and automatically apply them. +# The scheme name for the OpenDAL storage. +OPENDAL_SCHEME=fs +# Configurations for OpenDAL Local File System. +OPENDAL_FS_ROOT=storage + +# ClickZetta Volume Configuration (for storage backend) +# To use ClickZetta Volume as storage backend, set STORAGE_TYPE=clickzetta-volume +# Note: ClickZetta Volume will reuse the existing CLICKZETTA_* connection parameters + +# Volume type selection (three types available): +# - user: Personal/small team use, simple config, user-level permissions +# - table: Enterprise multi-tenant, smart routing, table-level + user-level permissions +# - external: Data lake integration, external storage connection, volume-level + storage-level permissions +CLICKZETTA_VOLUME_TYPE=user + +# External Volume name (required only when TYPE=external) +CLICKZETTA_VOLUME_NAME= + +# Table Volume table prefix (used only when TYPE=table) +CLICKZETTA_VOLUME_TABLE_PREFIX=dataset_ + +# Dify file directory prefix (isolates from other apps, recommended to keep default) +CLICKZETTA_VOLUME_DIFY_PREFIX=dify_km + +# S3 Configuration +# +S3_ENDPOINT= +S3_REGION=us-east-1 +S3_BUCKET_NAME=difyai +S3_ACCESS_KEY= +S3_SECRET_KEY= +# Whether to use AWS managed IAM roles for authenticating with the S3 service. +# If set to false, the access key and secret key must be provided. +S3_USE_AWS_MANAGED_IAM=false + +# Azure Blob Configuration +# +AZURE_BLOB_ACCOUNT_NAME=difyai +AZURE_BLOB_ACCOUNT_KEY=difyai +AZURE_BLOB_CONTAINER_NAME=difyai-container +AZURE_BLOB_ACCOUNT_URL=https://.blob.core.windows.net + +# Google Storage Configuration +# +GOOGLE_STORAGE_BUCKET_NAME=your-bucket-name +GOOGLE_STORAGE_SERVICE_ACCOUNT_JSON_BASE64= + +# The Alibaba Cloud OSS configurations, +# +ALIYUN_OSS_BUCKET_NAME=your-bucket-name +ALIYUN_OSS_ACCESS_KEY=your-access-key +ALIYUN_OSS_SECRET_KEY=your-secret-key +ALIYUN_OSS_ENDPOINT=https://oss-ap-southeast-1-internal.aliyuncs.com +ALIYUN_OSS_REGION=ap-southeast-1 +ALIYUN_OSS_AUTH_VERSION=v4 +# Don't start with '/'. OSS doesn't support leading slash in object names. +ALIYUN_OSS_PATH=your-path + +# Tencent COS Configuration +# +TENCENT_COS_BUCKET_NAME=your-bucket-name +TENCENT_COS_SECRET_KEY=your-secret-key +TENCENT_COS_SECRET_ID=your-secret-id +TENCENT_COS_REGION=your-region +TENCENT_COS_SCHEME=your-scheme + +# Oracle Storage Configuration +# +OCI_ENDPOINT=https://your-object-storage-namespace.compat.objectstorage.us-ashburn-1.oraclecloud.com +OCI_BUCKET_NAME=your-bucket-name +OCI_ACCESS_KEY=your-access-key +OCI_SECRET_KEY=your-secret-key +OCI_REGION=us-ashburn-1 + +# Huawei OBS Configuration +# +HUAWEI_OBS_BUCKET_NAME=your-bucket-name +HUAWEI_OBS_SECRET_KEY=your-secret-key +HUAWEI_OBS_ACCESS_KEY=your-access-key +HUAWEI_OBS_SERVER=your-server-url + +# Volcengine TOS Configuration +# +VOLCENGINE_TOS_BUCKET_NAME=your-bucket-name +VOLCENGINE_TOS_SECRET_KEY=your-secret-key +VOLCENGINE_TOS_ACCESS_KEY=your-access-key +VOLCENGINE_TOS_ENDPOINT=your-server-url +VOLCENGINE_TOS_REGION=your-region + +# Baidu OBS Storage Configuration +# +BAIDU_OBS_BUCKET_NAME=your-bucket-name +BAIDU_OBS_SECRET_KEY=your-secret-key +BAIDU_OBS_ACCESS_KEY=your-access-key +BAIDU_OBS_ENDPOINT=your-server-url + +# Supabase Storage Configuration +# +SUPABASE_BUCKET_NAME=your-bucket-name +SUPABASE_API_KEY=your-access-key +SUPABASE_URL=your-server-url + +# ------------------------------ +# Vector Database Configuration +# ------------------------------ + +# The type of vector store to use. +# Supported values are `weaviate`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `oceanbase`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`. +VECTOR_STORE=weaviate + +# The Weaviate endpoint URL. Only available when VECTOR_STORE is `weaviate`. +WEAVIATE_ENDPOINT=http://weaviate:8080 +WEAVIATE_API_KEY=WVF5YThaHlkYwhGUSmCRgsX3tD5ngdN8pkih + +# The Qdrant endpoint URL. Only available when VECTOR_STORE is `qdrant`. +QDRANT_URL=http://qdrant:6333 +QDRANT_API_KEY=difyai123456 +QDRANT_CLIENT_TIMEOUT=20 +QDRANT_GRPC_ENABLED=false +QDRANT_GRPC_PORT=6334 +QDRANT_REPLICATION_FACTOR=1 + +# Milvus configuration. Only available when VECTOR_STORE is `milvus`. +# The milvus uri. +MILVUS_URI=http://host.docker.internal:19530 +MILVUS_DATABASE= +MILVUS_TOKEN= +MILVUS_USER= +MILVUS_PASSWORD= +MILVUS_ENABLE_HYBRID_SEARCH=False +MILVUS_ANALYZER_PARAMS= + +# MyScale configuration, only available when VECTOR_STORE is `myscale` +# For multi-language support, please set MYSCALE_FTS_PARAMS with referring to: +# https://myscale.com/docs/en/text-search/#understanding-fts-index-parameters +MYSCALE_HOST=myscale +MYSCALE_PORT=8123 +MYSCALE_USER=default +MYSCALE_PASSWORD= +MYSCALE_DATABASE=dify +MYSCALE_FTS_PARAMS= + +# Couchbase configurations, only available when VECTOR_STORE is `couchbase` +# The connection string must include hostname defined in the docker-compose file (couchbase-server in this case) +COUCHBASE_CONNECTION_STRING=couchbase://couchbase-server +COUCHBASE_USER=Administrator +COUCHBASE_PASSWORD=password +COUCHBASE_BUCKET_NAME=Embeddings +COUCHBASE_SCOPE_NAME=_default + +# pgvector configurations, only available when VECTOR_STORE is `pgvector` +PGVECTOR_HOST=pgvector +PGVECTOR_PORT=5432 +PGVECTOR_USER=postgres +PGVECTOR_PASSWORD=difyai123456 +PGVECTOR_DATABASE=dify +PGVECTOR_MIN_CONNECTION=1 +PGVECTOR_MAX_CONNECTION=5 +PGVECTOR_PG_BIGM=false +PGVECTOR_PG_BIGM_VERSION=1.2-20240606 + +# vastbase configurations, only available when VECTOR_STORE is `vastbase` +VASTBASE_HOST=vastbase +VASTBASE_PORT=5432 +VASTBASE_USER=dify +VASTBASE_PASSWORD=Difyai123456 +VASTBASE_DATABASE=dify +VASTBASE_MIN_CONNECTION=1 +VASTBASE_MAX_CONNECTION=5 + +# pgvecto-rs configurations, only available when VECTOR_STORE is `pgvecto-rs` +PGVECTO_RS_HOST=pgvecto-rs +PGVECTO_RS_PORT=5432 +PGVECTO_RS_USER=postgres +PGVECTO_RS_PASSWORD=difyai123456 +PGVECTO_RS_DATABASE=dify + +# analyticdb configurations, only available when VECTOR_STORE is `analyticdb` +ANALYTICDB_KEY_ID=your-ak +ANALYTICDB_KEY_SECRET=your-sk +ANALYTICDB_REGION_ID=cn-hangzhou +ANALYTICDB_INSTANCE_ID=gp-ab123456 +ANALYTICDB_ACCOUNT=testaccount +ANALYTICDB_PASSWORD=testpassword +ANALYTICDB_NAMESPACE=dify +ANALYTICDB_NAMESPACE_PASSWORD=difypassword +ANALYTICDB_HOST=gp-test.aliyuncs.com +ANALYTICDB_PORT=5432 +ANALYTICDB_MIN_CONNECTION=1 +ANALYTICDB_MAX_CONNECTION=5 + +# TiDB vector configurations, only available when VECTOR_STORE is `tidb_vector` +TIDB_VECTOR_HOST=tidb +TIDB_VECTOR_PORT=4000 +TIDB_VECTOR_USER= +TIDB_VECTOR_PASSWORD= +TIDB_VECTOR_DATABASE=dify + +# Matrixone vector configurations. +MATRIXONE_HOST=matrixone +MATRIXONE_PORT=6001 +MATRIXONE_USER=dump +MATRIXONE_PASSWORD=111 +MATRIXONE_DATABASE=dify + +# Tidb on qdrant configuration, only available when VECTOR_STORE is `tidb_on_qdrant` +TIDB_ON_QDRANT_URL=http://127.0.0.1 +TIDB_ON_QDRANT_API_KEY=dify +TIDB_ON_QDRANT_CLIENT_TIMEOUT=20 +TIDB_ON_QDRANT_GRPC_ENABLED=false +TIDB_ON_QDRANT_GRPC_PORT=6334 +TIDB_PUBLIC_KEY=dify +TIDB_PRIVATE_KEY=dify +TIDB_API_URL=http://127.0.0.1 +TIDB_IAM_API_URL=http://127.0.0.1 +TIDB_REGION=regions/aws-us-east-1 +TIDB_PROJECT_ID=dify +TIDB_SPEND_LIMIT=100 + +# Chroma configuration, only available when VECTOR_STORE is `chroma` +CHROMA_HOST=127.0.0.1 +CHROMA_PORT=8000 +CHROMA_TENANT=default_tenant +CHROMA_DATABASE=default_database +CHROMA_AUTH_PROVIDER=chromadb.auth.token_authn.TokenAuthClientProvider +CHROMA_AUTH_CREDENTIALS= + +# Oracle configuration, only available when VECTOR_STORE is `oracle` +ORACLE_USER=dify +ORACLE_PASSWORD=dify +ORACLE_DSN=oracle:1521/FREEPDB1 +ORACLE_CONFIG_DIR=/app/api/storage/wallet +ORACLE_WALLET_LOCATION=/app/api/storage/wallet +ORACLE_WALLET_PASSWORD=dify +ORACLE_IS_AUTONOMOUS=false + +# relyt configurations, only available when VECTOR_STORE is `relyt` +RELYT_HOST=db +RELYT_PORT=5432 +RELYT_USER=postgres +RELYT_PASSWORD=difyai123456 +RELYT_DATABASE=postgres + +# open search configuration, only available when VECTOR_STORE is `opensearch` +OPENSEARCH_HOST=opensearch +OPENSEARCH_PORT=9200 +OPENSEARCH_SECURE=true +OPENSEARCH_VERIFY_CERTS=true +OPENSEARCH_AUTH_METHOD=basic +OPENSEARCH_USER=admin +OPENSEARCH_PASSWORD=admin +# If using AWS managed IAM, e.g. Managed Cluster or OpenSearch Serverless +OPENSEARCH_AWS_REGION=ap-southeast-1 +OPENSEARCH_AWS_SERVICE=aoss + +# tencent vector configurations, only available when VECTOR_STORE is `tencent` +TENCENT_VECTOR_DB_URL=http://127.0.0.1 +TENCENT_VECTOR_DB_API_KEY=dify +TENCENT_VECTOR_DB_TIMEOUT=30 +TENCENT_VECTOR_DB_USERNAME=dify +TENCENT_VECTOR_DB_DATABASE=dify +TENCENT_VECTOR_DB_SHARD=1 +TENCENT_VECTOR_DB_REPLICAS=2 +TENCENT_VECTOR_DB_ENABLE_HYBRID_SEARCH=false + +# ElasticSearch configuration, only available when VECTOR_STORE is `elasticsearch` +ELASTICSEARCH_HOST=0.0.0.0 +ELASTICSEARCH_PORT=9200 +ELASTICSEARCH_USERNAME=elastic +ELASTICSEARCH_PASSWORD=elastic +KIBANA_PORT=5601 + +# baidu vector configurations, only available when VECTOR_STORE is `baidu` +BAIDU_VECTOR_DB_ENDPOINT=http://127.0.0.1:5287 +BAIDU_VECTOR_DB_CONNECTION_TIMEOUT_MS=30000 +BAIDU_VECTOR_DB_ACCOUNT=root +BAIDU_VECTOR_DB_API_KEY=dify +BAIDU_VECTOR_DB_DATABASE=dify +BAIDU_VECTOR_DB_SHARD=1 +BAIDU_VECTOR_DB_REPLICAS=3 + +# VikingDB configurations, only available when VECTOR_STORE is `vikingdb` +VIKINGDB_ACCESS_KEY=your-ak +VIKINGDB_SECRET_KEY=your-sk +VIKINGDB_REGION=cn-shanghai +VIKINGDB_HOST=api-vikingdb.xxx.volces.com +VIKINGDB_SCHEMA=http +VIKINGDB_CONNECTION_TIMEOUT=30 +VIKINGDB_SOCKET_TIMEOUT=30 + +# Lindorm configuration, only available when VECTOR_STORE is `lindorm` +LINDORM_URL=http://lindorm:30070 +LINDORM_USERNAME=lindorm +LINDORM_PASSWORD=lindorm +LINDORM_QUERY_TIMEOUT=1 + +# OceanBase Vector configuration, only available when VECTOR_STORE is `oceanbase` +OCEANBASE_VECTOR_HOST=oceanbase +OCEANBASE_VECTOR_PORT=2881 +OCEANBASE_VECTOR_USER=root@test +OCEANBASE_VECTOR_PASSWORD=difyai123456 +OCEANBASE_VECTOR_DATABASE=test +OCEANBASE_CLUSTER_NAME=difyai +OCEANBASE_MEMORY_LIMIT=6G +OCEANBASE_ENABLE_HYBRID_SEARCH=false + +# opengauss configurations, only available when VECTOR_STORE is `opengauss` +OPENGAUSS_HOST=opengauss +OPENGAUSS_PORT=6600 +OPENGAUSS_USER=postgres +OPENGAUSS_PASSWORD=Dify@123 +OPENGAUSS_DATABASE=dify +OPENGAUSS_MIN_CONNECTION=1 +OPENGAUSS_MAX_CONNECTION=5 +OPENGAUSS_ENABLE_PQ=false + +# huawei cloud search service vector configurations, only available when VECTOR_STORE is `huawei_cloud` +HUAWEI_CLOUD_HOSTS=https://127.0.0.1:9200 +HUAWEI_CLOUD_USER=admin +HUAWEI_CLOUD_PASSWORD=admin + +# Upstash Vector configuration, only available when VECTOR_STORE is `upstash` +UPSTASH_VECTOR_URL=https://xxx-vector.upstash.io +UPSTASH_VECTOR_TOKEN=dify + +# TableStore Vector configuration +# (only used when VECTOR_STORE is tablestore) +TABLESTORE_ENDPOINT=https://instance-name.cn-hangzhou.ots.aliyuncs.com +TABLESTORE_INSTANCE_NAME=instance-name +TABLESTORE_ACCESS_KEY_ID=xxx +TABLESTORE_ACCESS_KEY_SECRET=xxx + +# Clickzetta configuration, only available when VECTOR_STORE is `clickzetta` +CLICKZETTA_USERNAME= +CLICKZETTA_PASSWORD= +CLICKZETTA_INSTANCE= +CLICKZETTA_SERVICE=api.clickzetta.com +CLICKZETTA_WORKSPACE=quick_start +CLICKZETTA_VCLUSTER=default_ap +CLICKZETTA_SCHEMA=dify +CLICKZETTA_BATCH_SIZE=100 +CLICKZETTA_ENABLE_INVERTED_INDEX=true +CLICKZETTA_ANALYZER_TYPE=chinese +CLICKZETTA_ANALYZER_MODE=smart +CLICKZETTA_VECTOR_DISTANCE_FUNCTION=cosine_distance + +# ------------------------------ +# Knowledge Configuration +# ------------------------------ + +# Upload file size limit, default 15M. +UPLOAD_FILE_SIZE_LIMIT=15 + +# The maximum number of files that can be uploaded at a time, default 5. +UPLOAD_FILE_BATCH_LIMIT=5 + +# ETL type, support: `dify`, `Unstructured` +# `dify` Dify's proprietary file extraction scheme +# `Unstructured` Unstructured.io file extraction scheme +ETL_TYPE=dify + +# Unstructured API path and API key, needs to be configured when ETL_TYPE is Unstructured +# Or using Unstructured for document extractor node for pptx. +# For example: http://unstructured:8000/general/v0/general +UNSTRUCTURED_API_URL= +UNSTRUCTURED_API_KEY= +SCARF_NO_ANALYTICS=true + +# ------------------------------ +# Model Configuration +# ------------------------------ + +# The maximum number of tokens allowed for prompt generation. +# This setting controls the upper limit of tokens that can be used by the LLM +# when generating a prompt in the prompt generation tool. +# Default: 512 tokens. +PROMPT_GENERATION_MAX_TOKENS=512 + +# The maximum number of tokens allowed for code generation. +# This setting controls the upper limit of tokens that can be used by the LLM +# when generating code in the code generation tool. +# Default: 1024 tokens. +CODE_GENERATION_MAX_TOKENS=1024 + +# Enable or disable plugin based token counting. If disabled, token counting will return 0. +# This can improve performance by skipping token counting operations. +# Default: false (disabled). +PLUGIN_BASED_TOKEN_COUNTING_ENABLED=false + +# ------------------------------ +# Multi-modal Configuration +# ------------------------------ + +# The format of the image/video/audio/document sent when the multi-modal model is input, +# the default is base64, optional url. +# The delay of the call in url mode will be lower than that in base64 mode. +# It is generally recommended to use the more compatible base64 mode. +# If configured as url, you need to configure FILES_URL as an externally accessible address so that the multi-modal model can access the image/video/audio/document. +MULTIMODAL_SEND_FORMAT=base64 +# Upload image file size limit, default 10M. +UPLOAD_IMAGE_FILE_SIZE_LIMIT=10 +# Upload video file size limit, default 100M. +UPLOAD_VIDEO_FILE_SIZE_LIMIT=100 +# Upload audio file size limit, default 50M. +UPLOAD_AUDIO_FILE_SIZE_LIMIT=50 + +# ------------------------------ +# Sentry Configuration +# Used for application monitoring and error log tracking. +# ------------------------------ +SENTRY_DSN= + +# API Service Sentry DSN address, default is empty, when empty, +# all monitoring information is not reported to Sentry. +# If not set, Sentry error reporting will be disabled. +API_SENTRY_DSN= +# API Service The reporting ratio of Sentry events, if it is 0.01, it is 1%. +API_SENTRY_TRACES_SAMPLE_RATE=1.0 +# API Service The reporting ratio of Sentry profiles, if it is 0.01, it is 1%. +API_SENTRY_PROFILES_SAMPLE_RATE=1.0 + +# Web Service Sentry DSN address, default is empty, when empty, +# all monitoring information is not reported to Sentry. +# If not set, Sentry error reporting will be disabled. +WEB_SENTRY_DSN= + +# ------------------------------ +# Notion Integration Configuration +# Variables can be obtained by applying for Notion integration: https://www.notion.so/my-integrations +# ------------------------------ + +# Configure as "public" or "internal". +# Since Notion's OAuth redirect URL only supports HTTPS, +# if deploying locally, please use Notion's internal integration. +NOTION_INTEGRATION_TYPE=public +# Notion OAuth client secret (used for public integration type) +NOTION_CLIENT_SECRET= +# Notion OAuth client id (used for public integration type) +NOTION_CLIENT_ID= +# Notion internal integration secret. +# If the value of NOTION_INTEGRATION_TYPE is "internal", +# you need to configure this variable. +NOTION_INTERNAL_SECRET= + +# ------------------------------ +# Mail related configuration +# ------------------------------ + +# Mail type, support: resend, smtp, sendgrid +MAIL_TYPE=resend + +# Default send from email address, if not specified +# If using SendGrid, use the 'from' field for authentication if necessary. +MAIL_DEFAULT_SEND_FROM= + +# API-Key for the Resend email provider, used when MAIL_TYPE is `resend`. +RESEND_API_URL=https://api.resend.com +RESEND_API_KEY=your-resend-api-key + + +# SMTP server configuration, used when MAIL_TYPE is `smtp` +SMTP_SERVER= +SMTP_PORT=465 +SMTP_USERNAME= +SMTP_PASSWORD= +SMTP_USE_TLS=true +SMTP_OPPORTUNISTIC_TLS=false + +# Sendgid configuration +SENDGRID_API_KEY= + +# ------------------------------ +# Others Configuration +# ------------------------------ + +# Maximum length of segmentation tokens for indexing +INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000 + +# Member invitation link valid time (hours), +# Default: 72. +INVITE_EXPIRY_HOURS=72 + +# Reset password token valid time (minutes), +RESET_PASSWORD_TOKEN_EXPIRY_MINUTES=5 + +# The sandbox service endpoint. +CODE_EXECUTION_ENDPOINT=http://sandbox:8194 +CODE_EXECUTION_API_KEY=dify-sandbox +CODE_MAX_NUMBER=9223372036854775807 +CODE_MIN_NUMBER=-9223372036854775808 +CODE_MAX_DEPTH=5 +CODE_MAX_PRECISION=20 +CODE_MAX_STRING_LENGTH=80000 +CODE_MAX_STRING_ARRAY_LENGTH=30 +CODE_MAX_OBJECT_ARRAY_LENGTH=30 +CODE_MAX_NUMBER_ARRAY_LENGTH=1000 +CODE_EXECUTION_CONNECT_TIMEOUT=10 +CODE_EXECUTION_READ_TIMEOUT=60 +CODE_EXECUTION_WRITE_TIMEOUT=10 +TEMPLATE_TRANSFORM_MAX_LENGTH=80000 + +# Workflow runtime configuration +WORKFLOW_MAX_EXECUTION_STEPS=500 +WORKFLOW_MAX_EXECUTION_TIME=1200 +WORKFLOW_CALL_MAX_DEPTH=5 +MAX_VARIABLE_SIZE=204800 +WORKFLOW_PARALLEL_DEPTH_LIMIT=3 +WORKFLOW_FILE_UPLOAD_LIMIT=10 + +# Workflow storage configuration +# Options: rdbms, hybrid +# rdbms: Use only the relational database (default) +# hybrid: Save new data to object storage, read from both object storage and RDBMS +WORKFLOW_NODE_EXECUTION_STORAGE=rdbms + +# Repository configuration +# Core workflow execution repository implementation +CORE_WORKFLOW_EXECUTION_REPOSITORY=core.repositories.sqlalchemy_workflow_execution_repository.SQLAlchemyWorkflowExecutionRepository + +# Core workflow node execution repository implementation +CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY=core.repositories.sqlalchemy_workflow_node_execution_repository.SQLAlchemyWorkflowNodeExecutionRepository + +# API workflow node execution repository implementation +API_WORKFLOW_NODE_EXECUTION_REPOSITORY=repositories.sqlalchemy_api_workflow_node_execution_repository.DifyAPISQLAlchemyWorkflowNodeExecutionRepository + +# API workflow run repository implementation +API_WORKFLOW_RUN_REPOSITORY=repositories.sqlalchemy_api_workflow_run_repository.DifyAPISQLAlchemyWorkflowRunRepository + +# HTTP request node in workflow configuration +HTTP_REQUEST_NODE_MAX_BINARY_SIZE=10485760 +HTTP_REQUEST_NODE_MAX_TEXT_SIZE=1048576 +HTTP_REQUEST_NODE_SSL_VERIFY=True + +# Respect X-* headers to redirect clients +RESPECT_XFORWARD_HEADERS_ENABLED=false + +# SSRF Proxy server HTTP URL +SSRF_PROXY_HTTP_URL=http://ssrf_proxy:3128 +# SSRF Proxy server HTTPS URL +SSRF_PROXY_HTTPS_URL=http://ssrf_proxy:3128 + +# Maximum loop count in the workflow +LOOP_NODE_MAX_COUNT=100 + +# The maximum number of tools that can be used in the agent. +MAX_TOOLS_NUM=10 + +# Maximum number of Parallelism branches in the workflow +MAX_PARALLEL_LIMIT=10 + +# The maximum number of iterations for agent setting +MAX_ITERATIONS_NUM=99 + +# ------------------------------ +# Environment Variables for web Service +# ------------------------------ + +# The timeout for the text generation in millisecond +TEXT_GENERATION_TIMEOUT_MS=60000 + +# Allow rendering unsafe URLs which have "data:" scheme. +ALLOW_UNSAFE_DATA_SCHEME=false + +# ------------------------------ +# Environment Variables for db Service +# ------------------------------ + +# The name of the default postgres user. +POSTGRES_USER=${DB_USERNAME} +# The password for the default postgres user. +POSTGRES_PASSWORD=${DB_PASSWORD} +# The name of the default postgres database. +POSTGRES_DB=${DB_DATABASE} +# postgres data directory +PGDATA=/var/lib/postgresql/data/pgdata + +# ------------------------------ +# Environment Variables for sandbox Service +# ------------------------------ + +# The API key for the sandbox service +SANDBOX_API_KEY=dify-sandbox +# The mode in which the Gin framework runs +SANDBOX_GIN_MODE=release +# The timeout for the worker in seconds +SANDBOX_WORKER_TIMEOUT=15 +# Enable network for the sandbox service +SANDBOX_ENABLE_NETWORK=true +# HTTP proxy URL for SSRF protection +SANDBOX_HTTP_PROXY=http://ssrf_proxy:3128 +# HTTPS proxy URL for SSRF protection +SANDBOX_HTTPS_PROXY=http://ssrf_proxy:3128 +# The port on which the sandbox service runs +SANDBOX_PORT=8194 + +# ------------------------------ +# Environment Variables for weaviate Service +# (only used when VECTOR_STORE is weaviate) +# ------------------------------ +WEAVIATE_PERSISTENCE_DATA_PATH=/var/lib/weaviate +WEAVIATE_QUERY_DEFAULTS_LIMIT=25 +WEAVIATE_AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED=true +WEAVIATE_DEFAULT_VECTORIZER_MODULE=none +WEAVIATE_CLUSTER_HOSTNAME=node1 +WEAVIATE_AUTHENTICATION_APIKEY_ENABLED=true +WEAVIATE_AUTHENTICATION_APIKEY_ALLOWED_KEYS=WVF5YThaHlkYwhGUSmCRgsX3tD5ngdN8pkih +WEAVIATE_AUTHENTICATION_APIKEY_USERS=hello@dify.ai +WEAVIATE_AUTHORIZATION_ADMINLIST_ENABLED=true +WEAVIATE_AUTHORIZATION_ADMINLIST_USERS=hello@dify.ai + +# ------------------------------ +# Environment Variables for Chroma +# (only used when VECTOR_STORE is chroma) +# ------------------------------ + +# Authentication credentials for Chroma server +CHROMA_SERVER_AUTHN_CREDENTIALS=difyai123456 +# Authentication provider for Chroma server +CHROMA_SERVER_AUTHN_PROVIDER=chromadb.auth.token_authn.TokenAuthenticationServerProvider +# Persistence setting for Chroma server +CHROMA_IS_PERSISTENT=TRUE + +# ------------------------------ +# Environment Variables for Oracle Service +# (only used when VECTOR_STORE is oracle) +# ------------------------------ +ORACLE_PWD=Dify123456 +ORACLE_CHARACTERSET=AL32UTF8 + +# ------------------------------ +# Environment Variables for milvus Service +# (only used when VECTOR_STORE is milvus) +# ------------------------------ +# ETCD configuration for auto compaction mode +ETCD_AUTO_COMPACTION_MODE=revision +# ETCD configuration for auto compaction retention in terms of number of revisions +ETCD_AUTO_COMPACTION_RETENTION=1000 +# ETCD configuration for backend quota in bytes +ETCD_QUOTA_BACKEND_BYTES=4294967296 +# ETCD configuration for the number of changes before triggering a snapshot +ETCD_SNAPSHOT_COUNT=50000 +# MinIO access key for authentication +MINIO_ACCESS_KEY=minioadmin +# MinIO secret key for authentication +MINIO_SECRET_KEY=minioadmin +# ETCD service endpoints +ETCD_ENDPOINTS=etcd:2379 +# MinIO service address +MINIO_ADDRESS=minio:9000 +# Enable or disable security authorization +MILVUS_AUTHORIZATION_ENABLED=true + +# ------------------------------ +# Environment Variables for pgvector / pgvector-rs Service +# (only used when VECTOR_STORE is pgvector / pgvector-rs) +# ------------------------------ +PGVECTOR_PGUSER=postgres +# The password for the default postgres user. +PGVECTOR_POSTGRES_PASSWORD=difyai123456 +# The name of the default postgres database. +PGVECTOR_POSTGRES_DB=dify +# postgres data directory +PGVECTOR_PGDATA=/var/lib/postgresql/data/pgdata + +# ------------------------------ +# Environment Variables for opensearch +# (only used when VECTOR_STORE is opensearch) +# ------------------------------ +OPENSEARCH_DISCOVERY_TYPE=single-node +OPENSEARCH_BOOTSTRAP_MEMORY_LOCK=true +OPENSEARCH_JAVA_OPTS_MIN=512m +OPENSEARCH_JAVA_OPTS_MAX=1024m +OPENSEARCH_INITIAL_ADMIN_PASSWORD=Qazwsxedc!@#123 +OPENSEARCH_MEMLOCK_SOFT=-1 +OPENSEARCH_MEMLOCK_HARD=-1 +OPENSEARCH_NOFILE_SOFT=65536 +OPENSEARCH_NOFILE_HARD=65536 + +# ------------------------------ +# Environment Variables for Nginx reverse proxy +# ------------------------------ +NGINX_SERVER_NAME=_ +NGINX_HTTPS_ENABLED=false +# HTTP port +NGINX_PORT=80 +# SSL settings are only applied when HTTPS_ENABLED is true +NGINX_SSL_PORT=443 +# if HTTPS_ENABLED is true, you're required to add your own SSL certificates/keys to the `./nginx/ssl` directory +# and modify the env vars below accordingly. +NGINX_SSL_CERT_FILENAME=dify.crt +NGINX_SSL_CERT_KEY_FILENAME=dify.key +NGINX_SSL_PROTOCOLS=TLSv1.1 TLSv1.2 TLSv1.3 + +# Nginx performance tuning +NGINX_WORKER_PROCESSES=auto +NGINX_CLIENT_MAX_BODY_SIZE=100M +NGINX_KEEPALIVE_TIMEOUT=65 + +# Proxy settings +NGINX_PROXY_READ_TIMEOUT=3600s +NGINX_PROXY_SEND_TIMEOUT=3600s + +# Set true to accept requests for /.well-known/acme-challenge/ +NGINX_ENABLE_CERTBOT_CHALLENGE=false + +# ------------------------------ +# Certbot Configuration +# ------------------------------ + +# Email address (required to get certificates from Let's Encrypt) +CERTBOT_EMAIL=your_email@example.com + +# Domain name +CERTBOT_DOMAIN=your_domain.com + +# certbot command options +# i.e: --force-renewal --dry-run --test-cert --debug +CERTBOT_OPTIONS= + +# ------------------------------ +# Environment Variables for SSRF Proxy +# ------------------------------ +SSRF_HTTP_PORT=3128 +SSRF_COREDUMP_DIR=/var/spool/squid +SSRF_REVERSE_PROXY_PORT=8194 +SSRF_SANDBOX_HOST=sandbox +SSRF_DEFAULT_TIME_OUT=5 +SSRF_DEFAULT_CONNECT_TIME_OUT=5 +SSRF_DEFAULT_READ_TIME_OUT=5 +SSRF_DEFAULT_WRITE_TIME_OUT=5 + +# ------------------------------ +# docker env var for specifying vector db type at startup +# (based on the vector db type, the corresponding docker +# compose profile will be used) +# if you want to use unstructured, add ',unstructured' to the end +# ------------------------------ +COMPOSE_PROFILES=${VECTOR_STORE:-weaviate} + +# ------------------------------ +# Docker Compose Service Expose Host Port Configurations +# ------------------------------ +EXPOSE_NGINX_PORT=80 +EXPOSE_NGINX_SSL_PORT=443 + +# ---------------------------------------------------------------------------- +# ModelProvider & Tool Position Configuration +# Used to specify the model providers and tools that can be used in the app. +# ---------------------------------------------------------------------------- + +# Pin, include, and exclude tools +# Use comma-separated values with no spaces between items. +# Example: POSITION_TOOL_PINS=bing,google +POSITION_TOOL_PINS= +POSITION_TOOL_INCLUDES= +POSITION_TOOL_EXCLUDES= + +# Pin, include, and exclude model providers +# Use comma-separated values with no spaces between items. +# Example: POSITION_PROVIDER_PINS=openai,openllm +POSITION_PROVIDER_PINS= +POSITION_PROVIDER_INCLUDES= +POSITION_PROVIDER_EXCLUDES= + +# CSP https://developer.mozilla.org/en-US/docs/Web/HTTP/CSP +CSP_WHITELIST= + +# Enable or disable create tidb service job +CREATE_TIDB_SERVICE_JOB_ENABLED=false + +# Maximum number of submitted thread count in a ThreadPool for parallel node execution +MAX_SUBMIT_COUNT=100 + +# The maximum number of top-k value for RAG. +TOP_K_MAX_VALUE=10 + +# ------------------------------ +# Plugin Daemon Configuration +# ------------------------------ + +DB_PLUGIN_DATABASE=dify_plugin +EXPOSE_PLUGIN_DAEMON_PORT=5002 +PLUGIN_DAEMON_PORT=5002 +PLUGIN_DAEMON_KEY=lYkiYYT6owG+71oLerGzA7GXCgOT++6ovaezWAjpCjf+Sjc3ZtU+qUEi +PLUGIN_DAEMON_URL=http://plugin_daemon:5002 +PLUGIN_MAX_PACKAGE_SIZE=52428800 +PLUGIN_PPROF_ENABLED=false + +PLUGIN_DEBUGGING_HOST=0.0.0.0 +PLUGIN_DEBUGGING_PORT=5003 +EXPOSE_PLUGIN_DEBUGGING_HOST=localhost +EXPOSE_PLUGIN_DEBUGGING_PORT=5003 + +# If this key is changed, DIFY_INNER_API_KEY in plugin_daemon service must also be updated or agent node will fail. +PLUGIN_DIFY_INNER_API_KEY=QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1 +PLUGIN_DIFY_INNER_API_URL=http://api:5001 + +ENDPOINT_URL_TEMPLATE=http://localhost/e/{hook_id} + +MARKETPLACE_ENABLED=true +MARKETPLACE_API_URL=https://marketplace.dify.ai + +FORCE_VERIFYING_SIGNATURE=true + +PLUGIN_PYTHON_ENV_INIT_TIMEOUT=120 +PLUGIN_MAX_EXECUTION_TIMEOUT=600 +# PIP_MIRROR_URL=https://pypi.tuna.tsinghua.edu.cn/simple +PIP_MIRROR_URL= + +# https://github.com/langgenius/dify-plugin-daemon/blob/main/.env.example +# Plugin storage type, local aws_s3 tencent_cos azure_blob aliyun_oss volcengine_tos +PLUGIN_STORAGE_TYPE=local +PLUGIN_STORAGE_LOCAL_ROOT=/app/storage +PLUGIN_WORKING_PATH=/app/storage/cwd +PLUGIN_INSTALLED_PATH=plugin +PLUGIN_PACKAGE_CACHE_PATH=plugin_packages +PLUGIN_MEDIA_CACHE_PATH=assets +# Plugin oss bucket +PLUGIN_STORAGE_OSS_BUCKET= +# Plugin oss s3 credentials +PLUGIN_S3_USE_AWS=false +PLUGIN_S3_USE_AWS_MANAGED_IAM=false +PLUGIN_S3_ENDPOINT= +PLUGIN_S3_USE_PATH_STYLE=false +PLUGIN_AWS_ACCESS_KEY= +PLUGIN_AWS_SECRET_KEY= +PLUGIN_AWS_REGION= +# Plugin oss azure blob +PLUGIN_AZURE_BLOB_STORAGE_CONTAINER_NAME= +PLUGIN_AZURE_BLOB_STORAGE_CONNECTION_STRING= +# Plugin oss tencent cos +PLUGIN_TENCENT_COS_SECRET_KEY= +PLUGIN_TENCENT_COS_SECRET_ID= +PLUGIN_TENCENT_COS_REGION= +# Plugin oss aliyun oss +PLUGIN_ALIYUN_OSS_REGION= +PLUGIN_ALIYUN_OSS_ENDPOINT= +PLUGIN_ALIYUN_OSS_ACCESS_KEY_ID= +PLUGIN_ALIYUN_OSS_ACCESS_KEY_SECRET= +PLUGIN_ALIYUN_OSS_AUTH_VERSION=v4 +PLUGIN_ALIYUN_OSS_PATH= +# Plugin oss volcengine tos +PLUGIN_VOLCENGINE_TOS_ENDPOINT= +PLUGIN_VOLCENGINE_TOS_ACCESS_KEY= +PLUGIN_VOLCENGINE_TOS_SECRET_KEY= +PLUGIN_VOLCENGINE_TOS_REGION= + +# ------------------------------ +# OTLP Collector Configuration +# ------------------------------ +ENABLE_OTEL=false +OTLP_TRACE_ENDPOINT= +OTLP_METRIC_ENDPOINT= +OTLP_BASE_ENDPOINT=http://localhost:4318 +OTLP_API_KEY= +OTEL_EXPORTER_OTLP_PROTOCOL= +OTEL_EXPORTER_TYPE=otlp +OTEL_SAMPLING_RATE=0.1 +OTEL_BATCH_EXPORT_SCHEDULE_DELAY=5000 +OTEL_MAX_QUEUE_SIZE=2048 +OTEL_MAX_EXPORT_BATCH_SIZE=512 +OTEL_METRIC_EXPORT_INTERVAL=60000 +OTEL_BATCH_EXPORT_TIMEOUT=10000 +OTEL_METRIC_EXPORT_TIMEOUT=30000 + +# Prevent Clickjacking +ALLOW_EMBED=false + +# Dataset queue monitor configuration +QUEUE_MONITOR_THRESHOLD=200 +# You can configure multiple ones, separated by commas. eg: test1@dify.ai,test2@dify.ai +QUEUE_MONITOR_ALERT_EMAILS= +# Monitor interval in minutes, default is 30 minutes +QUEUE_MONITOR_INTERVAL=30 diff --git a/api/configs/middleware/__init__.py b/api/configs/middleware/__init__.py index fe2c673fc4..0fe8a2da15 100644 --- a/api/configs/middleware/__init__.py +++ b/api/configs/middleware/__init__.py @@ -64,8 +64,8 @@ class StorageConfig(BaseSettings): "local", ] = Field( description="Type of storage to use." - " Options: 'opendal', '(deprecated) local', 's3', 'aliyun-oss', 'azure-blob', 'baidu-obs', 'clickzetta-volume', 'google-storage', " - "'huawei-obs', 'oci-storage', 'tencent-cos', 'volcengine-tos', 'supabase'. Default is 'opendal'.", + " Options: 'opendal', '(deprecated) local', 's3', 'aliyun-oss', 'azure-blob', 'baidu-obs', 'clickzetta-volume', " + "'google-storage', 'huawei-obs', 'oci-storage', 'tencent-cos', 'volcengine-tos', 'supabase'. Default is 'opendal'.", default="opendal", ) diff --git a/api/configs/middleware/storage/clickzetta_volume_storage_config.py b/api/configs/middleware/storage/clickzetta_volume_storage_config.py index 96eb6d3dd7..56e1b6a957 100644 --- a/api/configs/middleware/storage/clickzetta_volume_storage_config.py +++ b/api/configs/middleware/storage/clickzetta_volume_storage_config.py @@ -8,57 +8,57 @@ from pydantic_settings import BaseSettings class ClickZettaVolumeStorageConfig(BaseSettings): """Configuration for ClickZetta Volume storage.""" - + CLICKZETTA_VOLUME_USERNAME: Optional[str] = Field( description="Username for ClickZetta Volume authentication", default=None, ) - + CLICKZETTA_VOLUME_PASSWORD: Optional[str] = Field( description="Password for ClickZetta Volume authentication", default=None, ) - + CLICKZETTA_VOLUME_INSTANCE: Optional[str] = Field( description="ClickZetta instance identifier", default=None, ) - + CLICKZETTA_VOLUME_SERVICE: str = Field( description="ClickZetta service endpoint", default="api.clickzetta.com", ) - + CLICKZETTA_VOLUME_WORKSPACE: str = Field( description="ClickZetta workspace name", default="quick_start", ) - + CLICKZETTA_VOLUME_VCLUSTER: str = Field( description="ClickZetta virtual cluster name", default="default_ap", ) - + CLICKZETTA_VOLUME_SCHEMA: str = Field( description="ClickZetta schema name", default="dify", ) - + CLICKZETTA_VOLUME_TYPE: str = Field( description="ClickZetta volume type (table|user|external)", default="user", ) - + CLICKZETTA_VOLUME_NAME: Optional[str] = Field( description="ClickZetta volume name for external volumes", default=None, ) - + CLICKZETTA_VOLUME_TABLE_PREFIX: str = Field( description="Prefix for ClickZetta volume table names", default="dataset_", ) - + CLICKZETTA_VOLUME_DIFY_PREFIX: str = Field( description="Directory prefix for User Volume to organize Dify files", default="dify_km", diff --git a/api/configs/middleware/vdb/clickzetta_config.py b/api/configs/middleware/vdb/clickzetta_config.py index b08df7a5b5..04f81e25fc 100644 --- a/api/configs/middleware/vdb/clickzetta_config.py +++ b/api/configs/middleware/vdb/clickzetta_config.py @@ -67,4 +67,3 @@ class ClickzettaConfig(BaseModel): description="Distance function for vector similarity: l2_distance or cosine_distance", default="cosine_distance", ) - diff --git a/api/extensions/ext_storage.py b/api/extensions/ext_storage.py index d51ee2bdbe..d13393dd14 100644 --- a/api/extensions/ext_storage.py +++ b/api/extensions/ext_storage.py @@ -80,7 +80,7 @@ class Storage: # and fallback to CLICKZETTA_* config if CLICKZETTA_VOLUME_* is not set volume_config = ClickZettaVolumeConfig() return ClickZettaVolumeStorage(volume_config) - + return create_clickzetta_volume_storage case _: raise ValueError(f"unsupported storage type {storage_type}") diff --git a/api/extensions/storage/clickzetta_volume/clickzetta_volume_storage.py b/api/extensions/storage/clickzetta_volume/clickzetta_volume_storage.py index 150412a899..b83ddce800 100644 --- a/api/extensions/storage/clickzetta_volume/clickzetta_volume_storage.py +++ b/api/extensions/storage/clickzetta_volume/clickzetta_volume_storage.py @@ -16,6 +16,7 @@ import clickzetta # type: ignore[import] from pydantic import BaseModel, model_validator from extensions.storage.base_storage import BaseStorage + from .volume_permissions import VolumePermissionManager, check_volume_permission logger = logging.getLogger(__name__) @@ -23,7 +24,7 @@ logger = logging.getLogger(__name__) class ClickZettaVolumeConfig(BaseModel): """Configuration for ClickZetta Volume storage.""" - + username: str password: str instance: str @@ -36,52 +37,51 @@ class ClickZettaVolumeConfig(BaseModel): table_prefix: str = "dataset_" # Prefix for table volume names dify_prefix: str = "dify_km" # Directory prefix for User Volume permission_check: bool = True # Enable/disable permission checking - + @model_validator(mode="before") @classmethod def validate_config(cls, values: dict) -> dict: """Validate the configuration values. - + This method will first try to use CLICKZETTA_VOLUME_* environment variables, then fall back to CLICKZETTA_* environment variables (for vector DB config). """ import os - + # Helper function to get environment variable with fallback - def get_env_with_fallback(volume_key: str, fallback_key: str, default: str = None) -> str: + def get_env_with_fallback(volume_key: str, fallback_key: str, default: str | None = None) -> str: # First try CLICKZETTA_VOLUME_* specific config - volume_value = values.get(volume_key.lower().replace('clickzetta_volume_', '')) + volume_value = values.get(volume_key.lower().replace("clickzetta_volume_", "")) if volume_value: return volume_value - + # Then try environment variables volume_env = os.getenv(volume_key) if volume_env: return volume_env - + # Fall back to existing CLICKZETTA_* config fallback_env = os.getenv(fallback_key) if fallback_env: return fallback_env - + return default - + # Apply environment variables with fallback to existing CLICKZETTA_* config - values.setdefault("username", get_env_with_fallback( - "CLICKZETTA_VOLUME_USERNAME", "CLICKZETTA_USERNAME")) - values.setdefault("password", get_env_with_fallback( - "CLICKZETTA_VOLUME_PASSWORD", "CLICKZETTA_PASSWORD")) - values.setdefault("instance", get_env_with_fallback( - "CLICKZETTA_VOLUME_INSTANCE", "CLICKZETTA_INSTANCE")) - values.setdefault("service", get_env_with_fallback( - "CLICKZETTA_VOLUME_SERVICE", "CLICKZETTA_SERVICE", "api.clickzetta.com")) - values.setdefault("workspace", get_env_with_fallback( - "CLICKZETTA_VOLUME_WORKSPACE", "CLICKZETTA_WORKSPACE", "quick_start")) - values.setdefault("vcluster", get_env_with_fallback( - "CLICKZETTA_VOLUME_VCLUSTER", "CLICKZETTA_VCLUSTER", "default_ap")) - values.setdefault("schema_name", get_env_with_fallback( - "CLICKZETTA_VOLUME_SCHEMA", "CLICKZETTA_SCHEMA", "dify")) - + values.setdefault("username", get_env_with_fallback("CLICKZETTA_VOLUME_USERNAME", "CLICKZETTA_USERNAME")) + values.setdefault("password", get_env_with_fallback("CLICKZETTA_VOLUME_PASSWORD", "CLICKZETTA_PASSWORD")) + values.setdefault("instance", get_env_with_fallback("CLICKZETTA_VOLUME_INSTANCE", "CLICKZETTA_INSTANCE")) + values.setdefault( + "service", get_env_with_fallback("CLICKZETTA_VOLUME_SERVICE", "CLICKZETTA_SERVICE", "api.clickzetta.com") + ) + values.setdefault( + "workspace", get_env_with_fallback("CLICKZETTA_VOLUME_WORKSPACE", "CLICKZETTA_WORKSPACE", "quick_start") + ) + values.setdefault( + "vcluster", get_env_with_fallback("CLICKZETTA_VOLUME_VCLUSTER", "CLICKZETTA_VCLUSTER", "default_ap") + ) + values.setdefault("schema_name", get_env_with_fallback("CLICKZETTA_VOLUME_SCHEMA", "CLICKZETTA_SCHEMA", "dify")) + # Volume-specific configurations (no fallback to vector DB config) values.setdefault("volume_type", os.getenv("CLICKZETTA_VOLUME_TYPE", "table")) values.setdefault("volume_name", os.getenv("CLICKZETTA_VOLUME_NAME")) @@ -89,7 +89,7 @@ class ClickZettaVolumeConfig(BaseModel): values.setdefault("dify_prefix", os.getenv("CLICKZETTA_VOLUME_DIFY_PREFIX", "dify_km")) # 暂时禁用权限检查功能,直接设置为false values.setdefault("permission_check", False) - + # Validate required fields if not values.get("username"): raise ValueError("CLICKZETTA_VOLUME_USERNAME or CLICKZETTA_USERNAME is required") @@ -97,24 +97,24 @@ class ClickZettaVolumeConfig(BaseModel): raise ValueError("CLICKZETTA_VOLUME_PASSWORD or CLICKZETTA_PASSWORD is required") if not values.get("instance"): raise ValueError("CLICKZETTA_VOLUME_INSTANCE or CLICKZETTA_INSTANCE is required") - + # Validate volume type volume_type = values["volume_type"] if volume_type not in ["table", "user", "external"]: raise ValueError("CLICKZETTA_VOLUME_TYPE must be one of: table, user, external") - + if volume_type == "external" and not values.get("volume_name"): raise ValueError("CLICKZETTA_VOLUME_NAME is required for external volume type") - + return values class ClickZettaVolumeStorage(BaseStorage): """ClickZetta Volume storage implementation.""" - + def __init__(self, config: ClickZettaVolumeConfig): """Initialize ClickZetta Volume storage. - + Args: config: ClickZetta Volume configuration """ @@ -123,9 +123,9 @@ class ClickZettaVolumeStorage(BaseStorage): self._permission_manager = None self._init_connection() self._init_permission_manager() - + logger.info(f"ClickZetta Volume storage initialized with type: {config.volume_type}") - + def _init_connection(self): """Initialize ClickZetta connection.""" try: @@ -136,26 +136,24 @@ class ClickZettaVolumeStorage(BaseStorage): service=self._config.service, workspace=self._config.workspace, vcluster=self._config.vcluster, - schema=self._config.schema_name + schema=self._config.schema_name, ) logger.debug("ClickZetta connection established") except Exception as e: logger.error(f"Failed to connect to ClickZetta: {e}") raise - + def _init_permission_manager(self): """Initialize permission manager.""" try: self._permission_manager = VolumePermissionManager( - self._connection, - self._config.volume_type, - self._config.volume_name + self._connection, self._config.volume_type, self._config.volume_name ) logger.debug("Permission manager initialized") except Exception as e: logger.error(f"Failed to initialize permission manager: {e}") raise - + def _get_volume_path(self, filename: str, dataset_id: Optional[str] = None) -> str: """Get the appropriate volume path based on volume type.""" if self._config.volume_type == "user": @@ -166,7 +164,7 @@ class ClickZettaVolumeStorage(BaseStorage): if dataset_id in ["upload_files", "temp", "cache", "tools", "website_files"]: # Use User Volume with dify prefix for special directories return f"{self._config.dify_prefix}/{filename}" - + if dataset_id: return f"{self._config.table_prefix}{dataset_id}/{filename}" else: @@ -180,7 +178,7 @@ class ClickZettaVolumeStorage(BaseStorage): return filename else: raise ValueError(f"Unsupported volume type: {self._config.volume_type}") - + def _get_volume_sql_prefix(self, dataset_id: Optional[str] = None) -> str: """Get SQL prefix for volume operations.""" if self._config.volume_type == "user": @@ -191,7 +189,7 @@ class ClickZettaVolumeStorage(BaseStorage): # These should use USER VOLUME for better compatibility if dataset_id in ["upload_files", "temp", "cache", "tools", "website_files"]: return "USER VOLUME" - + # Only use TABLE VOLUME for actual dataset-specific paths # like "dataset_12345/file.pdf" or paths with dataset_ prefix if dataset_id: @@ -204,7 +202,7 @@ class ClickZettaVolumeStorage(BaseStorage): return f"VOLUME {self._config.volume_name}" else: raise ValueError(f"Unsupported volume type: {self._config.volume_type}") - + def _execute_sql(self, sql: str, fetch: bool = False): """Execute SQL command.""" try: @@ -216,23 +214,23 @@ class ClickZettaVolumeStorage(BaseStorage): except Exception as e: logger.error(f"SQL execution failed: {sql}, Error: {e}") raise - + def _ensure_table_volume_exists(self, dataset_id: str) -> None: """Ensure table volume exists for the given dataset_id.""" if self._config.volume_type != "table" or not dataset_id: return - + # Skip for upload_files and other special directories that use USER VOLUME if dataset_id in ["upload_files", "temp", "cache", "tools", "website_files"]: return - + table_name = f"{self._config.table_prefix}{dataset_id}" - + try: # Check if table exists check_sql = f"SHOW TABLES LIKE '{table_name}'" result = self._execute_sql(check_sql, fetch=True) - + if not result: # Create table with volume create_sql = f""" @@ -246,15 +244,15 @@ class ClickZettaVolumeStorage(BaseStorage): """ self._execute_sql(create_sql) logger.info(f"Created table volume: {table_name}") - + except Exception as e: logger.warning(f"Failed to create table volume {table_name}: {e}") # Don't raise exception, let the operation continue # The table might exist but not be visible due to permissions - + def save(self, filename: str, data: bytes) -> None: """Save data to ClickZetta Volume. - + Args: filename: File path in volume data: File content as bytes @@ -264,53 +262,53 @@ class ClickZettaVolumeStorage(BaseStorage): if "/" in filename and self._config.volume_type == "table": parts = filename.split("/", 1) if parts[0].startswith(self._config.table_prefix): - dataset_id = parts[0][len(self._config.table_prefix):] + dataset_id = parts[0][len(self._config.table_prefix) :] filename = parts[1] else: dataset_id = parts[0] filename = parts[1] - + # Ensure table volume exists (for table volumes) if dataset_id: self._ensure_table_volume_exists(dataset_id) - + # Check permissions (if enabled) if self._config.permission_check: # Skip permission check for special directories that use USER VOLUME if dataset_id not in ["upload_files", "temp", "cache", "tools", "website_files"]: check_volume_permission(self._permission_manager, "save", dataset_id) - + # Write data to temporary file with tempfile.NamedTemporaryFile(delete=False) as temp_file: temp_file.write(data) temp_file_path = temp_file.name - + try: # Upload to volume volume_prefix = self._get_volume_sql_prefix(dataset_id) - + # Get the actual volume path (may include dify_km prefix) volume_path = self._get_volume_path(filename, dataset_id) - actual_filename = volume_path.split('/')[-1] if '/' in volume_path else volume_path - + actual_filename = volume_path.split("/")[-1] if "/" in volume_path else volume_path + # For User Volume, use the full path with dify_km prefix if volume_prefix == "USER VOLUME": sql = f"PUT '{temp_file_path}' TO {volume_prefix} FILE '{volume_path}'" else: sql = f"PUT '{temp_file_path}' TO {volume_prefix} FILE '{filename}'" - + self._execute_sql(sql) logger.debug(f"File {filename} saved to ClickZetta Volume at path {volume_path}") finally: # Clean up temporary file Path(temp_file_path).unlink(missing_ok=True) - + def load_once(self, filename: str) -> bytes: """Load file content from ClickZetta Volume. - + Args: filename: File path in volume - + Returns: File content as bytes """ @@ -319,33 +317,33 @@ class ClickZettaVolumeStorage(BaseStorage): if "/" in filename and self._config.volume_type == "table": parts = filename.split("/", 1) if parts[0].startswith(self._config.table_prefix): - dataset_id = parts[0][len(self._config.table_prefix):] + dataset_id = parts[0][len(self._config.table_prefix) :] filename = parts[1] else: dataset_id = parts[0] filename = parts[1] - + # Check permissions (if enabled) if self._config.permission_check: # Skip permission check for special directories that use USER VOLUME if dataset_id not in ["upload_files", "temp", "cache", "tools", "website_files"]: check_volume_permission(self._permission_manager, "load_once", dataset_id) - + # Download to temporary directory with tempfile.TemporaryDirectory() as temp_dir: volume_prefix = self._get_volume_sql_prefix(dataset_id) - + # Get the actual volume path (may include dify_km prefix) volume_path = self._get_volume_path(filename, dataset_id) - + # For User Volume, use the full path with dify_km prefix if volume_prefix == "USER VOLUME": sql = f"GET {volume_prefix} FILE '{volume_path}' TO '{temp_dir}'" else: sql = f"GET {volume_prefix} FILE '{filename}' TO '{temp_dir}'" - + self._execute_sql(sql) - + # Find the downloaded file (may be in subdirectories) downloaded_file = None for root, dirs, files in os.walk(temp_dir): @@ -355,52 +353,52 @@ class ClickZettaVolumeStorage(BaseStorage): break if downloaded_file: break - + if not downloaded_file or not downloaded_file.exists(): raise FileNotFoundError(f"Downloaded file not found: {filename}") - + content = downloaded_file.read_bytes() logger.debug(f"File {filename} loaded from ClickZetta Volume") return content - + def load_stream(self, filename: str) -> Generator: """Load file as stream from ClickZetta Volume. - + Args: filename: File path in volume - + Yields: File content chunks """ content = self.load_once(filename) batch_size = 4096 stream = BytesIO(content) - + while chunk := stream.read(batch_size): yield chunk - + logger.debug(f"File {filename} loaded as stream from ClickZetta Volume") - + def download(self, filename: str, target_filepath: str): """Download file from ClickZetta Volume to local path. - + Args: filename: File path in volume target_filepath: Local target file path """ content = self.load_once(filename) - + with Path(target_filepath).open("wb") as f: f.write(content) - + logger.debug(f"File {filename} downloaded from ClickZetta Volume to {target_filepath}") - + def exists(self, filename: str) -> bool: """Check if file exists in ClickZetta Volume. - + Args: filename: File path in volume - + Returns: True if file exists, False otherwise """ @@ -410,76 +408,76 @@ class ClickZettaVolumeStorage(BaseStorage): if "/" in filename and self._config.volume_type == "table": parts = filename.split("/", 1) if parts[0].startswith(self._config.table_prefix): - dataset_id = parts[0][len(self._config.table_prefix):] + dataset_id = parts[0][len(self._config.table_prefix) :] filename = parts[1] else: dataset_id = parts[0] filename = parts[1] - + volume_prefix = self._get_volume_sql_prefix(dataset_id) - + # Get the actual volume path (may include dify_km prefix) volume_path = self._get_volume_path(filename, dataset_id) - + # For User Volume, use the full path with dify_km prefix if volume_prefix == "USER VOLUME": sql = f"LIST {volume_prefix} REGEXP = '^{volume_path}$'" else: sql = f"LIST {volume_prefix} REGEXP = '^{filename}$'" - + rows = self._execute_sql(sql, fetch=True) - + exists = len(rows) > 0 logger.debug(f"File {filename} exists check: {exists}") return exists except Exception as e: logger.warning(f"Error checking file existence for {filename}: {e}") return False - + def delete(self, filename: str): """Delete file from ClickZetta Volume. - + Args: filename: File path in volume """ if not self.exists(filename): logger.debug(f"File {filename} not found, skip delete") return - + # Extract dataset_id from filename if present dataset_id = None if "/" in filename and self._config.volume_type == "table": parts = filename.split("/", 1) if parts[0].startswith(self._config.table_prefix): - dataset_id = parts[0][len(self._config.table_prefix):] + dataset_id = parts[0][len(self._config.table_prefix) :] filename = parts[1] else: dataset_id = parts[0] filename = parts[1] - + volume_prefix = self._get_volume_sql_prefix(dataset_id) - + # Get the actual volume path (may include dify_km prefix) volume_path = self._get_volume_path(filename, dataset_id) - + # For User Volume, use the full path with dify_km prefix if volume_prefix == "USER VOLUME": sql = f"REMOVE {volume_prefix} FILE '{volume_path}'" else: sql = f"REMOVE {volume_prefix} FILE '{filename}'" - + self._execute_sql(sql) - + logger.debug(f"File {filename} deleted from ClickZetta Volume") - + def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]: """Scan files and directories in ClickZetta Volume. - + Args: path: Path to scan (dataset_id for table volumes) files: Include files in results directories: Include directories in results - + Returns: List of file/directory paths """ @@ -489,9 +487,9 @@ class ClickZettaVolumeStorage(BaseStorage): if self._config.volume_type == "table": dataset_id = path path = "" # Root of the table volume - + volume_prefix = self._get_volume_sql_prefix(dataset_id) - + # For User Volume, add dify prefix to path if volume_prefix == "USER VOLUME": if path: @@ -504,26 +502,24 @@ class ClickZettaVolumeStorage(BaseStorage): sql = f"LIST {volume_prefix} SUBDIRECTORY '{path}'" else: sql = f"LIST {volume_prefix}" - + rows = self._execute_sql(sql, fetch=True) - + result = [] for row in rows: file_path = row[0] # relative_path column - + # For User Volume, remove dify prefix from results dify_prefix_with_slash = f"{self._config.dify_prefix}/" if volume_prefix == "USER VOLUME" and file_path.startswith(dify_prefix_with_slash): - file_path = file_path[len(dify_prefix_with_slash):] # Remove prefix - - if files and not file_path.endswith("/"): - result.append(file_path) - elif directories and file_path.endswith("/"): + file_path = file_path[len(dify_prefix_with_slash) :] # Remove prefix + + if files and not file_path.endswith("/") or directories and file_path.endswith("/"): result.append(file_path) - + logger.debug(f"Scanned {len(result)} items in path {path}") return result - + except Exception as e: logger.error(f"Error scanning path {path}: {e}") return [] diff --git a/api/extensions/storage/clickzetta_volume/file_lifecycle.py b/api/extensions/storage/clickzetta_volume/file_lifecycle.py index 9e36e97328..5fca1d56cf 100644 --- a/api/extensions/storage/clickzetta_volume/file_lifecycle.py +++ b/api/extensions/storage/clickzetta_volume/file_lifecycle.py @@ -6,26 +6,27 @@ import json import logging +from dataclasses import asdict, dataclass from datetime import datetime, timedelta -from pathlib import Path -from typing import Dict, List, Optional -from dataclasses import dataclass, asdict from enum import Enum +from typing import Optional logger = logging.getLogger(__name__) class FileStatus(Enum): """文件状态枚举""" - ACTIVE = "active" # 活跃状态 + + ACTIVE = "active" # 活跃状态 ARCHIVED = "archived" # 已归档 - DELETED = "deleted" # 已删除(软删除) - BACKUP = "backup" # 备份文件 + DELETED = "deleted" # 已删除(软删除) + BACKUP = "backup" # 备份文件 @dataclass class FileMetadata: """文件元数据""" + filename: str size: int created_at: datetime @@ -33,33 +34,33 @@ class FileMetadata: version: int status: FileStatus checksum: Optional[str] = None - tags: Optional[Dict[str, str]] = None + tags: Optional[dict[str, str]] = None parent_version: Optional[int] = None - - def to_dict(self) -> Dict: + + def to_dict(self) -> dict: """转换为字典格式""" data = asdict(self) - data['created_at'] = self.created_at.isoformat() - data['modified_at'] = self.modified_at.isoformat() - data['status'] = self.status.value + data["created_at"] = self.created_at.isoformat() + data["modified_at"] = self.modified_at.isoformat() + data["status"] = self.status.value return data - + @classmethod - def from_dict(cls, data: Dict) -> 'FileMetadata': + def from_dict(cls, data: dict) -> "FileMetadata": """从字典创建实例""" data = data.copy() - data['created_at'] = datetime.fromisoformat(data['created_at']) - data['modified_at'] = datetime.fromisoformat(data['modified_at']) - data['status'] = FileStatus(data['status']) + data["created_at"] = datetime.fromisoformat(data["created_at"]) + data["modified_at"] = datetime.fromisoformat(data["modified_at"]) + data["status"] = FileStatus(data["status"]) return cls(**data) class FileLifecycleManager: """文件生命周期管理器""" - + def __init__(self, storage, dataset_id: Optional[str] = None): """初始化生命周期管理器 - + Args: storage: ClickZetta Volume存储实例 dataset_id: 数据集ID(用于Table Volume) @@ -70,61 +71,61 @@ class FileLifecycleManager: self._version_prefix = ".versions/" self._backup_prefix = ".backups/" self._deleted_prefix = ".deleted/" - + # 获取权限管理器(如果存在) - self._permission_manager = getattr(storage, '_permission_manager', None) - - def save_with_lifecycle(self, filename: str, data: bytes, - tags: Optional[Dict[str, str]] = None) -> FileMetadata: + self._permission_manager = getattr(storage, "_permission_manager", None) + + def save_with_lifecycle(self, filename: str, data: bytes, tags: Optional[dict[str, str]] = None) -> FileMetadata: """保存文件并管理生命周期 - + Args: filename: 文件名 data: 文件内容 tags: 文件标签 - + Returns: 文件元数据 """ # 权限检查 if not self._check_permission(filename, "save"): from .volume_permissions import VolumePermissionError + raise VolumePermissionError( f"Permission denied for lifecycle save operation on file: {filename}", operation="save", - volume_type=getattr(self._storage, '_config', {}).get('volume_type', 'unknown'), - dataset_id=self._dataset_id + volume_type=getattr(self._storage, "_config", {}).get("volume_type", "unknown"), + dataset_id=self._dataset_id, ) - + try: # 1. 检查是否存在旧版本 metadata_dict = self._load_metadata() current_metadata = metadata_dict.get(filename) - + # 2. 如果存在旧版本,创建版本备份 if current_metadata: self._create_version_backup(filename, current_metadata) - + # 3. 计算文件信息 now = datetime.now() checksum = self._calculate_checksum(data) - new_version = (current_metadata['version'] + 1) if current_metadata else 1 - + new_version = (current_metadata["version"] + 1) if current_metadata else 1 + # 4. 保存新文件 self._storage.save(filename, data) - + # 5. 创建元数据 created_at = now parent_version = None - + if current_metadata: # 如果created_at是字符串,转换为datetime - if isinstance(current_metadata['created_at'], str): - created_at = datetime.fromisoformat(current_metadata['created_at']) + if isinstance(current_metadata["created_at"], str): + created_at = datetime.fromisoformat(current_metadata["created_at"]) else: - created_at = current_metadata['created_at'] - parent_version = current_metadata['version'] - + created_at = current_metadata["created_at"] + parent_version = current_metadata["version"] + file_metadata = FileMetadata( filename=filename, size=len(data), @@ -134,26 +135,26 @@ class FileLifecycleManager: status=FileStatus.ACTIVE, checksum=checksum, tags=tags or {}, - parent_version=parent_version + parent_version=parent_version, ) - + # 6. 更新元数据 metadata_dict[filename] = file_metadata.to_dict() self._save_metadata(metadata_dict) - + logger.info(f"File {filename} saved with lifecycle management, version {new_version}") return file_metadata - + except Exception as e: logger.error(f"Failed to save file with lifecycle: {e}") raise - + def get_file_metadata(self, filename: str) -> Optional[FileMetadata]: """获取文件元数据 - + Args: filename: 文件名 - + Returns: 文件元数据,如果不存在返回None """ @@ -165,24 +166,24 @@ class FileLifecycleManager: except Exception as e: logger.error(f"Failed to get file metadata for {filename}: {e}") return None - - def list_file_versions(self, filename: str) -> List[FileMetadata]: + + def list_file_versions(self, filename: str) -> list[FileMetadata]: """列出文件的所有版本 - + Args: filename: 文件名 - + Returns: 文件版本列表,按版本号排序 """ try: versions = [] - + # 获取当前版本 current_metadata = self.get_file_metadata(filename) if current_metadata: versions.append(current_metadata) - + # 获取历史版本 version_pattern = f"{self._version_prefix}{filename}.v*" try: @@ -200,52 +201,52 @@ class FileLifecycleManager: except: # 如果无法扫描版本文件,只返回当前版本 pass - + return sorted(versions, key=lambda x: x.version, reverse=True) - + except Exception as e: logger.error(f"Failed to list file versions for {filename}: {e}") return [] - + def restore_version(self, filename: str, version: int) -> bool: """恢复文件到指定版本 - + Args: filename: 文件名 version: 要恢复的版本号 - + Returns: 恢复是否成功 """ try: version_filename = f"{self._version_prefix}{filename}.v{version}" - + # 检查版本文件是否存在 if not self._storage.exists(version_filename): logger.warning(f"Version {version} of {filename} not found") return False - + # 读取版本文件内容 version_data = self._storage.load_once(version_filename) - + # 保存当前版本为备份 current_metadata = self.get_file_metadata(filename) if current_metadata: self._create_version_backup(filename, current_metadata.to_dict()) - + # 恢复文件 return self.save_with_lifecycle(filename, version_data, {"restored_from": str(version)}) - + except Exception as e: logger.error(f"Failed to restore {filename} to version {version}: {e}") return False - + def archive_file(self, filename: str) -> bool: """归档文件 - + Args: filename: 文件名 - + Returns: 归档是否成功 """ @@ -253,32 +254,32 @@ class FileLifecycleManager: if not self._check_permission(filename, "archive"): logger.warning(f"Permission denied for archive operation on file: {filename}") return False - + try: # 更新文件状态为归档 metadata_dict = self._load_metadata() if filename not in metadata_dict: logger.warning(f"File {filename} not found in metadata") return False - - metadata_dict[filename]['status'] = FileStatus.ARCHIVED.value - metadata_dict[filename]['modified_at'] = datetime.now().isoformat() - + + metadata_dict[filename]["status"] = FileStatus.ARCHIVED.value + metadata_dict[filename]["modified_at"] = datetime.now().isoformat() + self._save_metadata(metadata_dict) - + logger.info(f"File {filename} archived successfully") return True - + except Exception as e: logger.error(f"Failed to archive file {filename}: {e}") return False - + def soft_delete_file(self, filename: str) -> bool: """软删除文件(移动到删除目录) - + Args: filename: 文件名 - + Returns: 删除是否成功 """ @@ -286,61 +287,61 @@ class FileLifecycleManager: if not self._check_permission(filename, "delete"): logger.warning(f"Permission denied for soft delete operation on file: {filename}") return False - + try: # 检查文件是否存在 if not self._storage.exists(filename): logger.warning(f"File {filename} not found") return False - + # 读取文件内容 file_data = self._storage.load_once(filename) - + # 移动到删除目录 deleted_filename = f"{self._deleted_prefix}{filename}.{datetime.now().strftime('%Y%m%d_%H%M%S')}" self._storage.save(deleted_filename, file_data) - + # 删除原文件 self._storage.delete(filename) - + # 更新元数据 metadata_dict = self._load_metadata() if filename in metadata_dict: - metadata_dict[filename]['status'] = FileStatus.DELETED.value - metadata_dict[filename]['modified_at'] = datetime.now().isoformat() + metadata_dict[filename]["status"] = FileStatus.DELETED.value + metadata_dict[filename]["modified_at"] = datetime.now().isoformat() self._save_metadata(metadata_dict) - + logger.info(f"File {filename} soft deleted successfully") return True - + except Exception as e: logger.error(f"Failed to soft delete file {filename}: {e}") return False - + def cleanup_old_versions(self, max_versions: int = 5, max_age_days: int = 30) -> int: """清理旧版本文件 - + Args: max_versions: 保留的最大版本数 max_age_days: 版本文件的最大保留天数 - + Returns: 清理的文件数量 """ try: cleaned_count = 0 cutoff_date = datetime.now() - timedelta(days=max_age_days) - + # 获取所有版本文件 try: all_files = self._storage.scan(self._dataset_id or "", files=True) version_files = [f for f in all_files if f.startswith(self._version_prefix)] - + # 按文件分组 file_versions = {} for version_file in version_files: # 解析文件名和版本 - parts = version_file[len(self._version_prefix):].split(".v") + parts = version_file[len(self._version_prefix) :].split(".v") if len(parts) >= 2: base_filename = parts[0] version_part = parts[1].split(".")[0] @@ -351,12 +352,12 @@ class FileLifecycleManager: file_versions[base_filename].append((version_num, version_file)) except ValueError: continue - + # 清理每个文件的旧版本 for base_filename, versions in file_versions.items(): # 按版本号排序 versions.sort(key=lambda x: x[0], reverse=True) - + # 保留最新的max_versions个版本,删除其余的 if len(versions) > max_versions: to_delete = versions[max_versions:] @@ -364,27 +365,27 @@ class FileLifecycleManager: self._storage.delete(version_file) cleaned_count += 1 logger.debug(f"Cleaned old version: {version_file}") - + logger.info(f"Cleaned {cleaned_count} old version files") - + except Exception as e: logger.warning(f"Could not scan for version files: {e}") - + return cleaned_count - + except Exception as e: logger.error(f"Failed to cleanup old versions: {e}") return 0 - - def get_storage_statistics(self) -> Dict[str, any]: + + def get_storage_statistics(self) -> dict[str, any]: """获取存储统计信息 - + Returns: 存储统计字典 """ try: metadata_dict = self._load_metadata() - + stats = { "total_files": len(metadata_dict), "active_files": 0, @@ -393,15 +394,15 @@ class FileLifecycleManager: "total_size": 0, "versions_count": 0, "oldest_file": None, - "newest_file": None + "newest_file": None, } - + oldest_date = None newest_date = None - + for filename, metadata in metadata_dict.items(): file_meta = FileMetadata.from_dict(metadata) - + # 统计文件状态 if file_meta.status == FileStatus.ACTIVE: stats["active_files"] += 1 @@ -409,84 +410,85 @@ class FileLifecycleManager: stats["archived_files"] += 1 elif file_meta.status == FileStatus.DELETED: stats["deleted_files"] += 1 - + # 统计大小 stats["total_size"] += file_meta.size - + # 统计版本 stats["versions_count"] += file_meta.version - + # 找出最新和最旧的文件 if oldest_date is None or file_meta.created_at < oldest_date: oldest_date = file_meta.created_at stats["oldest_file"] = filename - + if newest_date is None or file_meta.modified_at > newest_date: newest_date = file_meta.modified_at stats["newest_file"] = filename - + return stats - + except Exception as e: logger.error(f"Failed to get storage statistics: {e}") return {} - - def _create_version_backup(self, filename: str, metadata: Dict): + + def _create_version_backup(self, filename: str, metadata: dict): """创建版本备份""" try: # 读取当前文件内容 current_data = self._storage.load_once(filename) - + # 保存为版本文件 version_filename = f"{self._version_prefix}{filename}.v{metadata['version']}" self._storage.save(version_filename, current_data) - + logger.debug(f"Created version backup: {version_filename}") - + except Exception as e: logger.warning(f"Failed to create version backup for {filename}: {e}") - - def _load_metadata(self) -> Dict: + + def _load_metadata(self) -> dict: """加载元数据文件""" try: if self._storage.exists(self._metadata_file): metadata_content = self._storage.load_once(self._metadata_file) - return json.loads(metadata_content.decode('utf-8')) + return json.loads(metadata_content.decode("utf-8")) else: return {} except Exception as e: logger.warning(f"Failed to load metadata: {e}") return {} - - def _save_metadata(self, metadata_dict: Dict): + + def _save_metadata(self, metadata_dict: dict): """保存元数据文件""" try: metadata_content = json.dumps(metadata_dict, indent=2, ensure_ascii=False) - self._storage.save(self._metadata_file, metadata_content.encode('utf-8')) + self._storage.save(self._metadata_file, metadata_content.encode("utf-8")) logger.debug("Metadata saved successfully") except Exception as e: logger.error(f"Failed to save metadata: {e}") raise - + def _calculate_checksum(self, data: bytes) -> str: """计算文件校验和""" import hashlib + return hashlib.md5(data).hexdigest() - + def _check_permission(self, filename: str, operation: str) -> bool: """检查文件操作权限 - + Args: filename: 文件名 operation: 操作类型 - + Returns: True if permission granted, False otherwise """ # 如果没有权限管理器,默认允许 if not self._permission_manager: return True - + try: # 根据操作类型映射到权限 operation_mapping = { @@ -494,17 +496,17 @@ class FileLifecycleManager: "load": "load_once", "delete": "delete", "archive": "delete", # 归档需要删除权限 - "restore": "save", # 恢复需要写权限 + "restore": "save", # 恢复需要写权限 "cleanup": "delete", # 清理需要删除权限 "read": "load_once", - "write": "save" + "write": "save", } - + mapped_operation = operation_mapping.get(operation, operation) - + # 检查权限 return self._permission_manager.validate_operation(mapped_operation, self._dataset_id) - + except Exception as e: logger.error(f"Permission check failed for {filename} operation {operation}: {e}") # 安全默认:权限检查失败时拒绝访问 diff --git a/api/extensions/storage/clickzetta_volume/volume_permissions.py b/api/extensions/storage/clickzetta_volume/volume_permissions.py index 9d52b80b46..99838bcdf6 100644 --- a/api/extensions/storage/clickzetta_volume/volume_permissions.py +++ b/api/extensions/storage/clickzetta_volume/volume_permissions.py @@ -6,13 +6,14 @@ import logging from enum import Enum -from typing import Dict, Optional, Set +from typing import Optional logger = logging.getLogger(__name__) class VolumePermission(Enum): """Volume权限类型枚举""" + READ = "SELECT" # 对应ClickZetta的SELECT权限 WRITE = "INSERT,UPDATE,DELETE" # 对应ClickZetta的写权限 LIST = "SELECT" # 列出文件需要SELECT权限 @@ -35,18 +36,19 @@ class VolumePermissionManager: if isinstance(connection_or_config, dict): # 从配置字典创建连接 import clickzetta + config = connection_or_config self._connection = clickzetta.connect( - username=config.get('username'), - password=config.get('password'), - instance=config.get('instance'), - service=config.get('service'), - workspace=config.get('workspace'), - vcluster=config.get('vcluster'), - schema=config.get('schema') or config.get('database') + username=config.get("username"), + password=config.get("password"), + instance=config.get("instance"), + service=config.get("service"), + workspace=config.get("workspace"), + vcluster=config.get("vcluster"), + schema=config.get("schema") or config.get("database"), ) - self._volume_type = config.get('volume_type', volume_type) - self._volume_name = config.get('volume_name', volume_name) + self._volume_type = config.get("volume_type", volume_type) + self._volume_name = config.get("volume_name", volume_name) else: # 直接使用连接对象 self._connection = connection_or_config @@ -58,7 +60,7 @@ class VolumePermissionManager: if not self._volume_type: raise ValueError("volume_type is required") - self._permission_cache: Dict[str, Set[str]] = {} + self._permission_cache: dict[str, set[str]] = {} self._current_username = None # 将从连接中获取当前用户名 def check_permission(self, operation: VolumePermission, dataset_id: Optional[str] = None) -> bool: @@ -119,7 +121,7 @@ class VolumePermissionManager: except Exception as e: logger.error(f"User Volume permission check failed: {e}") # 对于User Volume,如果权限检查失败,可能是配置问题,给出更友好的错误提示 - logger.info(f"User Volume permission check failed, but permission checking is disabled in this version") + logger.info("User Volume permission check failed, but permission checking is disabled in this version") return False def _check_table_volume_permission(self, operation: VolumePermission, dataset_id: Optional[str]) -> bool: @@ -144,8 +146,10 @@ class VolumePermissionManager: # 检查是否有所需的所有权限 has_permission = required_permissions.issubset(permissions) - logger.debug(f"Table Volume permission check for {table_name}, operation {operation.name}: " - f"required={required_permissions}, has={permissions}, granted={has_permission}") + logger.debug( + f"Table Volume permission check for {table_name}, operation {operation.name}: " + f"required={required_permissions}, has={permissions}, granted={has_permission}" + ) return has_permission @@ -180,8 +184,10 @@ class VolumePermissionManager: # 检查是否有所需的所有权限 has_permission = required_permissions.issubset(permissions) - logger.debug(f"External Volume permission check for {self._volume_name}, operation {operation.name}: " - f"required={required_permissions}, has={permissions}, granted={has_permission}") + logger.debug( + f"External Volume permission check for {self._volume_name}, operation {operation.name}: " + f"required={required_permissions}, has={permissions}, granted={has_permission}" + ) # 如果权限检查失败,尝试备选验证 if not has_permission: @@ -203,10 +209,10 @@ class VolumePermissionManager: except Exception as e: logger.error(f"External volume permission check failed for {self._volume_name}: {e}") - logger.info(f"External Volume permission check failed, but permission checking is disabled in this version") + logger.info("External Volume permission check failed, but permission checking is disabled in this version") return False - def _get_table_permissions(self, table_name: str) -> Set[str]: + def _get_table_permissions(self, table_name: str) -> set[str]: """获取用户对指定表的权限 Args: @@ -236,14 +242,12 @@ class VolumePermissionManager: object_name = grant[2] if len(grant) > 2 else "" # 检查是否是对该表的权限 - if object_type == "TABLE" and object_name == table_name: - if privilege in ["SELECT", "INSERT", "UPDATE", "DELETE", "ALL"]: - if privilege == "ALL": - permissions.update(["SELECT", "INSERT", "UPDATE", "DELETE"]) - else: - permissions.add(privilege) - # 检查是否是对整个schema的权限 - elif object_type == "SCHEMA" and object_name in table_name: + if ( + object_type == "TABLE" + and object_name == table_name + or object_type == "SCHEMA" + and object_name in table_name + ): if privilege in ["SELECT", "INSERT", "UPDATE", "DELETE", "ALL"]: if privilege == "ALL": permissions.update(["SELECT", "INSERT", "UPDATE", "DELETE"]) @@ -284,7 +288,7 @@ class VolumePermissionManager: return "unknown" - def _get_user_permissions(self, username: str) -> Set[str]: + def _get_user_permissions(self, username: str) -> set[str]: """获取用户的基本权限集合""" cache_key = f"user_permissions:{username}" @@ -321,7 +325,7 @@ class VolumePermissionManager: self._permission_cache[cache_key] = permissions return permissions - def _get_external_volume_permissions(self, volume_name: str) -> Set[str]: + def _get_external_volume_permissions(self, volume_name: str) -> set[str]: """获取用户对指定External Volume的权限 Args: @@ -363,10 +367,9 @@ class VolumePermissionManager: ) # 检查是否是对该Volume的权限或者是层级权限 - if ((granted_type == "PRIVILEGE" and granted_on == "VOLUME" and - object_name.endswith(volume_name)) or - (granted_type == "OBJECT_HIERARCHY" and granted_on == "VOLUME")): - + if ( + granted_type == "PRIVILEGE" and granted_on == "VOLUME" and object_name.endswith(volume_name) + ) or (granted_type == "OBJECT_HIERARCHY" and granted_on == "VOLUME"): logger.info(f"Matching grant found for {volume_name}") if "READ" in privilege: @@ -424,7 +427,7 @@ class VolumePermissionManager: self._permission_cache.clear() logger.debug("Permission cache cleared") - def get_permission_summary(self, dataset_id: Optional[str] = None) -> Dict[str, bool]: + def get_permission_summary(self, dataset_id: Optional[str] = None) -> dict[str, bool]: """获取权限摘要 Args: @@ -514,10 +517,16 @@ class VolumePermissionManager: """检查路径是否包含路径遍历攻击""" # 检查常见的路径遍历模式 traversal_patterns = [ - "../", "..\\", - "..%2f", "..%2F", "..%5c", "..%5C", - "%2e%2e%2f", "%2e%2e%5c", - "....//", "....\\\\", + "../", + "..\\", + "..%2f", + "..%2F", + "..%5c", + "..%5C", + "%2e%2e%2f", + "%2e%2e%5c", + "....//", + "....\\\\", ] file_path_lower = file_path.lower() @@ -539,9 +548,21 @@ class VolumePermissionManager: def _is_sensitive_path(self, file_path: str) -> bool: """检查路径是否为敏感路径""" sensitive_patterns = [ - "passwd", "shadow", "hosts", "config", "secrets", - "private", "key", "certificate", "cert", "ssl", - "database", "backup", "dump", "log", "tmp" + "passwd", + "shadow", + "hosts", + "config", + "secrets", + "private", + "key", + "certificate", + "cert", + "ssl", + "database", + "backup", + "dump", + "log", + "tmp", ] file_path_lower = file_path.lower() @@ -591,9 +612,9 @@ class VolumePermissionError(Exception): super().__init__(message) -def check_volume_permission(permission_manager: VolumePermissionManager, - operation: str, - dataset_id: Optional[str] = None) -> None: +def check_volume_permission( + permission_manager: VolumePermissionManager, operation: str, dataset_id: Optional[str] = None +) -> None: """权限检查装饰器函数 Args: @@ -610,8 +631,5 @@ def check_volume_permission(permission_manager: VolumePermissionManager, error_message += f" (dataset: {dataset_id})" raise VolumePermissionError( - error_message, - operation=operation, - volume_type=permission_manager._volume_type, - dataset_id=dataset_id + error_message, operation=operation, volume_type=permission_manager._volume_type, dataset_id=dataset_id ) diff --git a/api/tests/integration_tests/storage/test_clickzetta_volume.py b/api/tests/integration_tests/storage/test_clickzetta_volume.py index 2ae8b27210..3f42a61bb5 100644 --- a/api/tests/integration_tests/storage/test_clickzetta_volume.py +++ b/api/tests/integration_tests/storage/test_clickzetta_volume.py @@ -3,7 +3,6 @@ import os import tempfile import unittest -from unittest.mock import patch import pytest @@ -15,7 +14,7 @@ from extensions.storage.clickzetta_volume.clickzetta_volume_storage import ( class TestClickZettaVolumeStorage(unittest.TestCase): """Test cases for ClickZetta Volume Storage.""" - + def setUp(self): """Set up test environment.""" self.config = ClickZettaVolumeConfig( @@ -27,89 +26,83 @@ class TestClickZettaVolumeStorage(unittest.TestCase): vcluster=os.getenv("CLICKZETTA_VCLUSTER", "default_ap"), schema_name=os.getenv("CLICKZETTA_SCHEMA", "dify"), volume_type="table", - table_prefix="test_dataset_" + table_prefix="test_dataset_", ) - - @pytest.mark.skipif( - not os.getenv("CLICKZETTA_USERNAME"), - reason="ClickZetta credentials not provided" - ) + + @pytest.mark.skipif(not os.getenv("CLICKZETTA_USERNAME"), reason="ClickZetta credentials not provided") def test_user_volume_operations(self): """Test basic operations with User Volume.""" config = self.config config.volume_type = "user" - + storage = ClickZettaVolumeStorage(config) - + # Test file operations test_filename = "test_file.txt" test_content = b"Hello, ClickZetta Volume!" - + # Save file storage.save(test_filename, test_content) - + # Check if file exists self.assertTrue(storage.exists(test_filename)) - + # Load file loaded_content = storage.load_once(test_filename) self.assertEqual(loaded_content, test_content) - + # Test streaming stream_content = b"" for chunk in storage.load_stream(test_filename): stream_content += chunk self.assertEqual(stream_content, test_content) - + # Test download with tempfile.NamedTemporaryFile() as temp_file: storage.download(test_filename, temp_file.name) with open(temp_file.name, "rb") as f: downloaded_content = f.read() self.assertEqual(downloaded_content, test_content) - + # Test scan files = storage.scan("", files=True, directories=False) self.assertIn(test_filename, files) - + # Delete file storage.delete(test_filename) self.assertFalse(storage.exists(test_filename)) - - @pytest.mark.skipif( - not os.getenv("CLICKZETTA_USERNAME"), - reason="ClickZetta credentials not provided" - ) + + @pytest.mark.skipif(not os.getenv("CLICKZETTA_USERNAME"), reason="ClickZetta credentials not provided") def test_table_volume_operations(self): """Test basic operations with Table Volume.""" config = self.config config.volume_type = "table" - + storage = ClickZettaVolumeStorage(config) - + # Test file operations with dataset_id dataset_id = "12345" test_filename = f"{dataset_id}/test_file.txt" test_content = b"Hello, Table Volume!" - + # Save file storage.save(test_filename, test_content) - + # Check if file exists self.assertTrue(storage.exists(test_filename)) - + # Load file loaded_content = storage.load_once(test_filename) self.assertEqual(loaded_content, test_content) - + # Test scan for dataset files = storage.scan(dataset_id, files=True, directories=False) self.assertIn("test_file.txt", files) - + # Delete file storage.delete(test_filename) self.assertFalse(storage.exists(test_filename)) - + def test_config_validation(self): """Test configuration validation.""" # Test missing required fields @@ -119,56 +112,51 @@ class TestClickZettaVolumeStorage(unittest.TestCase): password="pass", instance="instance", ) - + # Test invalid volume type with self.assertRaises(ValueError): - ClickZettaVolumeConfig( - username="user", - password="pass", - instance="instance", - volume_type="invalid_type" - ) - + ClickZettaVolumeConfig(username="user", password="pass", instance="instance", volume_type="invalid_type") + # Test external volume without volume_name with self.assertRaises(ValueError): ClickZettaVolumeConfig( username="user", password="pass", instance="instance", - volume_type="external" + volume_type="external", # Missing volume_name ) - + def test_volume_path_generation(self): """Test volume path generation for different types.""" storage = ClickZettaVolumeStorage(self.config) - + # Test table volume path path = storage._get_volume_path("test.txt", "12345") self.assertEqual(path, "test_dataset_12345/test.txt") - + # Test path with existing dataset_id prefix path = storage._get_volume_path("12345/test.txt") self.assertEqual(path, "12345/test.txt") - + # Test user volume storage._config.volume_type = "user" path = storage._get_volume_path("test.txt") self.assertEqual(path, "test.txt") - + def test_sql_prefix_generation(self): """Test SQL prefix generation for different volume types.""" storage = ClickZettaVolumeStorage(self.config) - + # Test table volume SQL prefix prefix = storage._get_volume_sql_prefix("12345") self.assertEqual(prefix, "TABLE VOLUME test_dataset_12345") - + # Test user volume SQL prefix storage._config.volume_type = "user" prefix = storage._get_volume_sql_prefix() self.assertEqual(prefix, "USER VOLUME") - + # Test external volume SQL prefix storage._config.volume_type = "external" storage._config.volume_name = "my_external_volume"