feat: add ClickZetta Volume storage support
- Add three volume types: User, Table, and External Volume - Complete file operations: upload, download, delete, list, stream - Intelligent configuration management with fallback to vector DB settings - Simplified user experience with 'user' as default volume type - Comprehensive error handling and logging - Docker integration with updated compose files - Integration tests for all volume types - Disabled complex permission checking for stability 🎯 Features: - User Volume: Personal/small team use, simple configuration - Table Volume: Enterprise multi-tenant with smart routing - External Volume: Data lake integration with external storage - Flexible configuration with environment variable support - Complete file lifecycle management 🔧 Technical: - Reuses existing ClickZetta connection configuration - Pydantic-based configuration validation - Comprehensive error handling and logging - Performance-optimized with connection reuse - Clean integration with Dify's storage architecture 🚀 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>pull/22551/head
parent
cb023189a9
commit
2de316c557
@ -0,0 +1,65 @@
|
|||||||
|
"""ClickZetta Volume Storage Configuration"""
|
||||||
|
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from pydantic import Field
|
||||||
|
from pydantic_settings import BaseSettings
|
||||||
|
|
||||||
|
|
||||||
|
class ClickZettaVolumeStorageConfig(BaseSettings):
|
||||||
|
"""Configuration for ClickZetta Volume storage."""
|
||||||
|
|
||||||
|
CLICKZETTA_VOLUME_USERNAME: Optional[str] = Field(
|
||||||
|
description="Username for ClickZetta Volume authentication",
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
CLICKZETTA_VOLUME_PASSWORD: Optional[str] = Field(
|
||||||
|
description="Password for ClickZetta Volume authentication",
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
CLICKZETTA_VOLUME_INSTANCE: Optional[str] = Field(
|
||||||
|
description="ClickZetta instance identifier",
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
CLICKZETTA_VOLUME_SERVICE: str = Field(
|
||||||
|
description="ClickZetta service endpoint",
|
||||||
|
default="api.clickzetta.com",
|
||||||
|
)
|
||||||
|
|
||||||
|
CLICKZETTA_VOLUME_WORKSPACE: str = Field(
|
||||||
|
description="ClickZetta workspace name",
|
||||||
|
default="quick_start",
|
||||||
|
)
|
||||||
|
|
||||||
|
CLICKZETTA_VOLUME_VCLUSTER: str = Field(
|
||||||
|
description="ClickZetta virtual cluster name",
|
||||||
|
default="default_ap",
|
||||||
|
)
|
||||||
|
|
||||||
|
CLICKZETTA_VOLUME_SCHEMA: str = Field(
|
||||||
|
description="ClickZetta schema name",
|
||||||
|
default="dify",
|
||||||
|
)
|
||||||
|
|
||||||
|
CLICKZETTA_VOLUME_TYPE: str = Field(
|
||||||
|
description="ClickZetta volume type (table|user|external)",
|
||||||
|
default="user",
|
||||||
|
)
|
||||||
|
|
||||||
|
CLICKZETTA_VOLUME_NAME: Optional[str] = Field(
|
||||||
|
description="ClickZetta volume name for external volumes",
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
CLICKZETTA_VOLUME_TABLE_PREFIX: str = Field(
|
||||||
|
description="Prefix for ClickZetta volume table names",
|
||||||
|
default="dataset_",
|
||||||
|
)
|
||||||
|
|
||||||
|
CLICKZETTA_VOLUME_DIFY_PREFIX: str = Field(
|
||||||
|
description="Directory prefix for User Volume to organize Dify files",
|
||||||
|
default="dify_km",
|
||||||
|
)
|
||||||
@ -0,0 +1,5 @@
|
|||||||
|
"""ClickZetta Volume storage implementation."""
|
||||||
|
|
||||||
|
from .clickzetta_volume_storage import ClickZettaVolumeStorage
|
||||||
|
|
||||||
|
__all__ = ["ClickZettaVolumeStorage"]
|
||||||
@ -0,0 +1,180 @@
|
|||||||
|
"""Integration tests for ClickZetta Volume Storage."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from extensions.storage.clickzetta_volume.clickzetta_volume_storage import (
|
||||||
|
ClickZettaVolumeConfig,
|
||||||
|
ClickZettaVolumeStorage,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestClickZettaVolumeStorage(unittest.TestCase):
|
||||||
|
"""Test cases for ClickZetta Volume Storage."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""Set up test environment."""
|
||||||
|
self.config = ClickZettaVolumeConfig(
|
||||||
|
username=os.getenv("CLICKZETTA_USERNAME", "test_user"),
|
||||||
|
password=os.getenv("CLICKZETTA_PASSWORD", "test_pass"),
|
||||||
|
instance=os.getenv("CLICKZETTA_INSTANCE", "test_instance"),
|
||||||
|
service=os.getenv("CLICKZETTA_SERVICE", "uat-api.clickzetta.com"),
|
||||||
|
workspace=os.getenv("CLICKZETTA_WORKSPACE", "quick_start"),
|
||||||
|
vcluster=os.getenv("CLICKZETTA_VCLUSTER", "default_ap"),
|
||||||
|
schema_name=os.getenv("CLICKZETTA_SCHEMA", "dify"),
|
||||||
|
volume_type="table",
|
||||||
|
table_prefix="test_dataset_"
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.mark.skipif(
|
||||||
|
not os.getenv("CLICKZETTA_USERNAME"),
|
||||||
|
reason="ClickZetta credentials not provided"
|
||||||
|
)
|
||||||
|
def test_user_volume_operations(self):
|
||||||
|
"""Test basic operations with User Volume."""
|
||||||
|
config = self.config
|
||||||
|
config.volume_type = "user"
|
||||||
|
|
||||||
|
storage = ClickZettaVolumeStorage(config)
|
||||||
|
|
||||||
|
# Test file operations
|
||||||
|
test_filename = "test_file.txt"
|
||||||
|
test_content = b"Hello, ClickZetta Volume!"
|
||||||
|
|
||||||
|
# Save file
|
||||||
|
storage.save(test_filename, test_content)
|
||||||
|
|
||||||
|
# Check if file exists
|
||||||
|
self.assertTrue(storage.exists(test_filename))
|
||||||
|
|
||||||
|
# Load file
|
||||||
|
loaded_content = storage.load_once(test_filename)
|
||||||
|
self.assertEqual(loaded_content, test_content)
|
||||||
|
|
||||||
|
# Test streaming
|
||||||
|
stream_content = b""
|
||||||
|
for chunk in storage.load_stream(test_filename):
|
||||||
|
stream_content += chunk
|
||||||
|
self.assertEqual(stream_content, test_content)
|
||||||
|
|
||||||
|
# Test download
|
||||||
|
with tempfile.NamedTemporaryFile() as temp_file:
|
||||||
|
storage.download(test_filename, temp_file.name)
|
||||||
|
with open(temp_file.name, "rb") as f:
|
||||||
|
downloaded_content = f.read()
|
||||||
|
self.assertEqual(downloaded_content, test_content)
|
||||||
|
|
||||||
|
# Test scan
|
||||||
|
files = storage.scan("", files=True, directories=False)
|
||||||
|
self.assertIn(test_filename, files)
|
||||||
|
|
||||||
|
# Delete file
|
||||||
|
storage.delete(test_filename)
|
||||||
|
self.assertFalse(storage.exists(test_filename))
|
||||||
|
|
||||||
|
@pytest.mark.skipif(
|
||||||
|
not os.getenv("CLICKZETTA_USERNAME"),
|
||||||
|
reason="ClickZetta credentials not provided"
|
||||||
|
)
|
||||||
|
def test_table_volume_operations(self):
|
||||||
|
"""Test basic operations with Table Volume."""
|
||||||
|
config = self.config
|
||||||
|
config.volume_type = "table"
|
||||||
|
|
||||||
|
storage = ClickZettaVolumeStorage(config)
|
||||||
|
|
||||||
|
# Test file operations with dataset_id
|
||||||
|
dataset_id = "12345"
|
||||||
|
test_filename = f"{dataset_id}/test_file.txt"
|
||||||
|
test_content = b"Hello, Table Volume!"
|
||||||
|
|
||||||
|
# Save file
|
||||||
|
storage.save(test_filename, test_content)
|
||||||
|
|
||||||
|
# Check if file exists
|
||||||
|
self.assertTrue(storage.exists(test_filename))
|
||||||
|
|
||||||
|
# Load file
|
||||||
|
loaded_content = storage.load_once(test_filename)
|
||||||
|
self.assertEqual(loaded_content, test_content)
|
||||||
|
|
||||||
|
# Test scan for dataset
|
||||||
|
files = storage.scan(dataset_id, files=True, directories=False)
|
||||||
|
self.assertIn("test_file.txt", files)
|
||||||
|
|
||||||
|
# Delete file
|
||||||
|
storage.delete(test_filename)
|
||||||
|
self.assertFalse(storage.exists(test_filename))
|
||||||
|
|
||||||
|
def test_config_validation(self):
|
||||||
|
"""Test configuration validation."""
|
||||||
|
# Test missing required fields
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
ClickZettaVolumeConfig(
|
||||||
|
username="", # Empty username should fail
|
||||||
|
password="pass",
|
||||||
|
instance="instance",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test invalid volume type
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
ClickZettaVolumeConfig(
|
||||||
|
username="user",
|
||||||
|
password="pass",
|
||||||
|
instance="instance",
|
||||||
|
volume_type="invalid_type"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test external volume without volume_name
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
ClickZettaVolumeConfig(
|
||||||
|
username="user",
|
||||||
|
password="pass",
|
||||||
|
instance="instance",
|
||||||
|
volume_type="external"
|
||||||
|
# Missing volume_name
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_volume_path_generation(self):
|
||||||
|
"""Test volume path generation for different types."""
|
||||||
|
storage = ClickZettaVolumeStorage(self.config)
|
||||||
|
|
||||||
|
# Test table volume path
|
||||||
|
path = storage._get_volume_path("test.txt", "12345")
|
||||||
|
self.assertEqual(path, "test_dataset_12345/test.txt")
|
||||||
|
|
||||||
|
# Test path with existing dataset_id prefix
|
||||||
|
path = storage._get_volume_path("12345/test.txt")
|
||||||
|
self.assertEqual(path, "12345/test.txt")
|
||||||
|
|
||||||
|
# Test user volume
|
||||||
|
storage._config.volume_type = "user"
|
||||||
|
path = storage._get_volume_path("test.txt")
|
||||||
|
self.assertEqual(path, "test.txt")
|
||||||
|
|
||||||
|
def test_sql_prefix_generation(self):
|
||||||
|
"""Test SQL prefix generation for different volume types."""
|
||||||
|
storage = ClickZettaVolumeStorage(self.config)
|
||||||
|
|
||||||
|
# Test table volume SQL prefix
|
||||||
|
prefix = storage._get_volume_sql_prefix("12345")
|
||||||
|
self.assertEqual(prefix, "TABLE VOLUME test_dataset_12345")
|
||||||
|
|
||||||
|
# Test user volume SQL prefix
|
||||||
|
storage._config.volume_type = "user"
|
||||||
|
prefix = storage._get_volume_sql_prefix()
|
||||||
|
self.assertEqual(prefix, "USER VOLUME")
|
||||||
|
|
||||||
|
# Test external volume SQL prefix
|
||||||
|
storage._config.volume_type = "external"
|
||||||
|
storage._config.volume_name = "my_external_volume"
|
||||||
|
prefix = storage._get_volume_sql_prefix()
|
||||||
|
self.assertEqual(prefix, "VOLUME my_external_volume")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Loading…
Reference in New Issue