@ -16,6 +16,7 @@ import clickzetta # type: ignore[import]
from pydantic import BaseModel , model_validator
from pydantic import BaseModel , model_validator
from extensions . storage . base_storage import BaseStorage
from extensions . storage . base_storage import BaseStorage
from . volume_permissions import VolumePermissionManager , check_volume_permission
from . volume_permissions import VolumePermissionManager , check_volume_permission
logger = logging . getLogger ( __name__ )
logger = logging . getLogger ( __name__ )
@ -23,7 +24,7 @@ logger = logging.getLogger(__name__)
class ClickZettaVolumeConfig ( BaseModel ) :
class ClickZettaVolumeConfig ( BaseModel ) :
""" Configuration for ClickZetta Volume storage. """
""" Configuration for ClickZetta Volume storage. """
username : str
username : str
password : str
password : str
instance : str
instance : str
@ -36,52 +37,51 @@ class ClickZettaVolumeConfig(BaseModel):
table_prefix : str = " dataset_ " # Prefix for table volume names
table_prefix : str = " dataset_ " # Prefix for table volume names
dify_prefix : str = " dify_km " # Directory prefix for User Volume
dify_prefix : str = " dify_km " # Directory prefix for User Volume
permission_check : bool = True # Enable/disable permission checking
permission_check : bool = True # Enable/disable permission checking
@model_validator ( mode = " before " )
@model_validator ( mode = " before " )
@classmethod
@classmethod
def validate_config ( cls , values : dict ) - > dict :
def validate_config ( cls , values : dict ) - > dict :
""" Validate the configuration values.
""" Validate the configuration values.
This method will first try to use CLICKZETTA_VOLUME_ * environment variables ,
This method will first try to use CLICKZETTA_VOLUME_ * environment variables ,
then fall back to CLICKZETTA_ * environment variables ( for vector DB config ) .
then fall back to CLICKZETTA_ * environment variables ( for vector DB config ) .
"""
"""
import os
import os
# Helper function to get environment variable with fallback
# Helper function to get environment variable with fallback
def get_env_with_fallback ( volume_key : str , fallback_key : str , default : str = None ) - > str :
def get_env_with_fallback ( volume_key : str , fallback_key : str , default : str | None = None ) - > str :
# First try CLICKZETTA_VOLUME_* specific config
# First try CLICKZETTA_VOLUME_* specific config
volume_value = values . get ( volume_key . lower ( ) . replace ( ' clickzetta_volume_ ' , ' ' ) )
volume_value = values . get ( volume_key . lower ( ) . replace ( " clickzetta_volume_ " , " " ) )
if volume_value :
if volume_value :
return volume_value
return volume_value
# Then try environment variables
# Then try environment variables
volume_env = os . getenv ( volume_key )
volume_env = os . getenv ( volume_key )
if volume_env :
if volume_env :
return volume_env
return volume_env
# Fall back to existing CLICKZETTA_* config
# Fall back to existing CLICKZETTA_* config
fallback_env = os . getenv ( fallback_key )
fallback_env = os . getenv ( fallback_key )
if fallback_env :
if fallback_env :
return fallback_env
return fallback_env
return default
return default
# Apply environment variables with fallback to existing CLICKZETTA_* config
# Apply environment variables with fallback to existing CLICKZETTA_* config
values . setdefault ( " username " , get_env_with_fallback (
values . setdefault ( " username " , get_env_with_fallback ( " CLICKZETTA_VOLUME_USERNAME " , " CLICKZETTA_USERNAME " ) )
" CLICKZETTA_VOLUME_USERNAME " , " CLICKZETTA_USERNAME " ) )
values . setdefault ( " password " , get_env_with_fallback ( " CLICKZETTA_VOLUME_PASSWORD " , " CLICKZETTA_PASSWORD " ) )
values . setdefault ( " password " , get_env_with_fallback (
values . setdefault ( " instance " , get_env_with_fallback ( " CLICKZETTA_VOLUME_INSTANCE " , " CLICKZETTA_INSTANCE " ) )
" CLICKZETTA_VOLUME_PASSWORD " , " CLICKZETTA_PASSWORD " ) )
values . setdefault (
values . setdefault ( " instance " , get_env_with_fallback (
" service " , get_env_with_fallback ( " CLICKZETTA_VOLUME_SERVICE " , " CLICKZETTA_SERVICE " , " api.clickzetta.com " )
" CLICKZETTA_VOLUME_INSTANCE " , " CLICKZETTA_INSTANCE " ) )
)
values . setdefault ( " service " , get_env_with_fallback (
values . setdefault (
" CLICKZETTA_VOLUME_SERVICE " , " CLICKZETTA_SERVICE " , " api.clickzetta.com " ) )
" workspace " , get_env_with_fallback ( " CLICKZETTA_VOLUME_WORKSPACE " , " CLICKZETTA_WORKSPACE " , " quick_start " )
values . setdefault ( " workspace " , get_env_with_fallback (
)
" CLICKZETTA_VOLUME_WORKSPACE " , " CLICKZETTA_WORKSPACE " , " quick_start " ) )
values . setdefault (
values . setdefault ( " vcluster " , get_env_with_fallback (
" vcluster " , get_env_with_fallback ( " CLICKZETTA_VOLUME_VCLUSTER " , " CLICKZETTA_VCLUSTER " , " default_ap " )
" CLICKZETTA_VOLUME_VCLUSTER " , " CLICKZETTA_VCLUSTER " , " default_ap " ) )
)
values . setdefault ( " schema_name " , get_env_with_fallback (
values . setdefault ( " schema_name " , get_env_with_fallback ( " CLICKZETTA_VOLUME_SCHEMA " , " CLICKZETTA_SCHEMA " , " dify " ) )
" CLICKZETTA_VOLUME_SCHEMA " , " CLICKZETTA_SCHEMA " , " dify " ) )
# Volume-specific configurations (no fallback to vector DB config)
# Volume-specific configurations (no fallback to vector DB config)
values . setdefault ( " volume_type " , os . getenv ( " CLICKZETTA_VOLUME_TYPE " , " table " ) )
values . setdefault ( " volume_type " , os . getenv ( " CLICKZETTA_VOLUME_TYPE " , " table " ) )
values . setdefault ( " volume_name " , os . getenv ( " CLICKZETTA_VOLUME_NAME " ) )
values . setdefault ( " volume_name " , os . getenv ( " CLICKZETTA_VOLUME_NAME " ) )
@ -89,7 +89,7 @@ class ClickZettaVolumeConfig(BaseModel):
values . setdefault ( " dify_prefix " , os . getenv ( " CLICKZETTA_VOLUME_DIFY_PREFIX " , " dify_km " ) )
values . setdefault ( " dify_prefix " , os . getenv ( " CLICKZETTA_VOLUME_DIFY_PREFIX " , " dify_km " ) )
# 暂时禁用权限检查功能, 直接设置为false
# 暂时禁用权限检查功能, 直接设置为false
values . setdefault ( " permission_check " , False )
values . setdefault ( " permission_check " , False )
# Validate required fields
# Validate required fields
if not values . get ( " username " ) :
if not values . get ( " username " ) :
raise ValueError ( " CLICKZETTA_VOLUME_USERNAME or CLICKZETTA_USERNAME is required " )
raise ValueError ( " CLICKZETTA_VOLUME_USERNAME or CLICKZETTA_USERNAME is required " )
@ -97,24 +97,24 @@ class ClickZettaVolumeConfig(BaseModel):
raise ValueError ( " CLICKZETTA_VOLUME_PASSWORD or CLICKZETTA_PASSWORD is required " )
raise ValueError ( " CLICKZETTA_VOLUME_PASSWORD or CLICKZETTA_PASSWORD is required " )
if not values . get ( " instance " ) :
if not values . get ( " instance " ) :
raise ValueError ( " CLICKZETTA_VOLUME_INSTANCE or CLICKZETTA_INSTANCE is required " )
raise ValueError ( " CLICKZETTA_VOLUME_INSTANCE or CLICKZETTA_INSTANCE is required " )
# Validate volume type
# Validate volume type
volume_type = values [ " volume_type " ]
volume_type = values [ " volume_type " ]
if volume_type not in [ " table " , " user " , " external " ] :
if volume_type not in [ " table " , " user " , " external " ] :
raise ValueError ( " CLICKZETTA_VOLUME_TYPE must be one of: table, user, external " )
raise ValueError ( " CLICKZETTA_VOLUME_TYPE must be one of: table, user, external " )
if volume_type == " external " and not values . get ( " volume_name " ) :
if volume_type == " external " and not values . get ( " volume_name " ) :
raise ValueError ( " CLICKZETTA_VOLUME_NAME is required for external volume type " )
raise ValueError ( " CLICKZETTA_VOLUME_NAME is required for external volume type " )
return values
return values
class ClickZettaVolumeStorage ( BaseStorage ) :
class ClickZettaVolumeStorage ( BaseStorage ) :
""" ClickZetta Volume storage implementation. """
""" ClickZetta Volume storage implementation. """
def __init__ ( self , config : ClickZettaVolumeConfig ) :
def __init__ ( self , config : ClickZettaVolumeConfig ) :
""" Initialize ClickZetta Volume storage.
""" Initialize ClickZetta Volume storage.
Args :
Args :
config : ClickZetta Volume configuration
config : ClickZetta Volume configuration
"""
"""
@ -123,9 +123,9 @@ class ClickZettaVolumeStorage(BaseStorage):
self . _permission_manager = None
self . _permission_manager = None
self . _init_connection ( )
self . _init_connection ( )
self . _init_permission_manager ( )
self . _init_permission_manager ( )
logger . info ( f " ClickZetta Volume storage initialized with type: { config . volume_type } " )
logger . info ( f " ClickZetta Volume storage initialized with type: { config . volume_type } " )
def _init_connection ( self ) :
def _init_connection ( self ) :
""" Initialize ClickZetta connection. """
""" Initialize ClickZetta connection. """
try :
try :
@ -136,26 +136,24 @@ class ClickZettaVolumeStorage(BaseStorage):
service = self . _config . service ,
service = self . _config . service ,
workspace = self . _config . workspace ,
workspace = self . _config . workspace ,
vcluster = self . _config . vcluster ,
vcluster = self . _config . vcluster ,
schema = self . _config . schema_name
schema = self . _config . schema_name ,
)
)
logger . debug ( " ClickZetta connection established " )
logger . debug ( " ClickZetta connection established " )
except Exception as e :
except Exception as e :
logger . error ( f " Failed to connect to ClickZetta: { e } " )
logger . error ( f " Failed to connect to ClickZetta: { e } " )
raise
raise
def _init_permission_manager ( self ) :
def _init_permission_manager ( self ) :
""" Initialize permission manager. """
""" Initialize permission manager. """
try :
try :
self . _permission_manager = VolumePermissionManager (
self . _permission_manager = VolumePermissionManager (
self . _connection ,
self . _connection , self . _config . volume_type , self . _config . volume_name
self . _config . volume_type ,
self . _config . volume_name
)
)
logger . debug ( " Permission manager initialized " )
logger . debug ( " Permission manager initialized " )
except Exception as e :
except Exception as e :
logger . error ( f " Failed to initialize permission manager: { e } " )
logger . error ( f " Failed to initialize permission manager: { e } " )
raise
raise
def _get_volume_path ( self , filename : str , dataset_id : Optional [ str ] = None ) - > str :
def _get_volume_path ( self , filename : str , dataset_id : Optional [ str ] = None ) - > str :
""" Get the appropriate volume path based on volume type. """
""" Get the appropriate volume path based on volume type. """
if self . _config . volume_type == " user " :
if self . _config . volume_type == " user " :
@ -166,7 +164,7 @@ class ClickZettaVolumeStorage(BaseStorage):
if dataset_id in [ " upload_files " , " temp " , " cache " , " tools " , " website_files " ] :
if dataset_id in [ " upload_files " , " temp " , " cache " , " tools " , " website_files " ] :
# Use User Volume with dify prefix for special directories
# Use User Volume with dify prefix for special directories
return f " { self . _config . dify_prefix } / { filename } "
return f " { self . _config . dify_prefix } / { filename } "
if dataset_id :
if dataset_id :
return f " { self . _config . table_prefix } { dataset_id } / { filename } "
return f " { self . _config . table_prefix } { dataset_id } / { filename } "
else :
else :
@ -180,7 +178,7 @@ class ClickZettaVolumeStorage(BaseStorage):
return filename
return filename
else :
else :
raise ValueError ( f " Unsupported volume type: { self . _config . volume_type } " )
raise ValueError ( f " Unsupported volume type: { self . _config . volume_type } " )
def _get_volume_sql_prefix ( self , dataset_id : Optional [ str ] = None ) - > str :
def _get_volume_sql_prefix ( self , dataset_id : Optional [ str ] = None ) - > str :
""" Get SQL prefix for volume operations. """
""" Get SQL prefix for volume operations. """
if self . _config . volume_type == " user " :
if self . _config . volume_type == " user " :
@ -191,7 +189,7 @@ class ClickZettaVolumeStorage(BaseStorage):
# These should use USER VOLUME for better compatibility
# These should use USER VOLUME for better compatibility
if dataset_id in [ " upload_files " , " temp " , " cache " , " tools " , " website_files " ] :
if dataset_id in [ " upload_files " , " temp " , " cache " , " tools " , " website_files " ] :
return " USER VOLUME "
return " USER VOLUME "
# Only use TABLE VOLUME for actual dataset-specific paths
# Only use TABLE VOLUME for actual dataset-specific paths
# like "dataset_12345/file.pdf" or paths with dataset_ prefix
# like "dataset_12345/file.pdf" or paths with dataset_ prefix
if dataset_id :
if dataset_id :
@ -204,7 +202,7 @@ class ClickZettaVolumeStorage(BaseStorage):
return f " VOLUME { self . _config . volume_name } "
return f " VOLUME { self . _config . volume_name } "
else :
else :
raise ValueError ( f " Unsupported volume type: { self . _config . volume_type } " )
raise ValueError ( f " Unsupported volume type: { self . _config . volume_type } " )
def _execute_sql ( self , sql : str , fetch : bool = False ) :
def _execute_sql ( self , sql : str , fetch : bool = False ) :
""" Execute SQL command. """
""" Execute SQL command. """
try :
try :
@ -216,23 +214,23 @@ class ClickZettaVolumeStorage(BaseStorage):
except Exception as e :
except Exception as e :
logger . error ( f " SQL execution failed: { sql } , Error: { e } " )
logger . error ( f " SQL execution failed: { sql } , Error: { e } " )
raise
raise
def _ensure_table_volume_exists ( self , dataset_id : str ) - > None :
def _ensure_table_volume_exists ( self , dataset_id : str ) - > None :
""" Ensure table volume exists for the given dataset_id. """
""" Ensure table volume exists for the given dataset_id. """
if self . _config . volume_type != " table " or not dataset_id :
if self . _config . volume_type != " table " or not dataset_id :
return
return
# Skip for upload_files and other special directories that use USER VOLUME
# Skip for upload_files and other special directories that use USER VOLUME
if dataset_id in [ " upload_files " , " temp " , " cache " , " tools " , " website_files " ] :
if dataset_id in [ " upload_files " , " temp " , " cache " , " tools " , " website_files " ] :
return
return
table_name = f " { self . _config . table_prefix } { dataset_id } "
table_name = f " { self . _config . table_prefix } { dataset_id } "
try :
try :
# Check if table exists
# Check if table exists
check_sql = f " SHOW TABLES LIKE ' { table_name } ' "
check_sql = f " SHOW TABLES LIKE ' { table_name } ' "
result = self . _execute_sql ( check_sql , fetch = True )
result = self . _execute_sql ( check_sql , fetch = True )
if not result :
if not result :
# Create table with volume
# Create table with volume
create_sql = f """
create_sql = f """
@ -246,15 +244,15 @@ class ClickZettaVolumeStorage(BaseStorage):
"""
"""
self . _execute_sql ( create_sql )
self . _execute_sql ( create_sql )
logger . info ( f " Created table volume: { table_name } " )
logger . info ( f " Created table volume: { table_name } " )
except Exception as e :
except Exception as e :
logger . warning ( f " Failed to create table volume { table_name } : { e } " )
logger . warning ( f " Failed to create table volume { table_name } : { e } " )
# Don't raise exception, let the operation continue
# Don't raise exception, let the operation continue
# The table might exist but not be visible due to permissions
# The table might exist but not be visible due to permissions
def save ( self , filename : str , data : bytes ) - > None :
def save ( self , filename : str , data : bytes ) - > None :
""" Save data to ClickZetta Volume.
""" Save data to ClickZetta Volume.
Args :
Args :
filename : File path in volume
filename : File path in volume
data : File content as bytes
data : File content as bytes
@ -264,53 +262,53 @@ class ClickZettaVolumeStorage(BaseStorage):
if " / " in filename and self . _config . volume_type == " table " :
if " / " in filename and self . _config . volume_type == " table " :
parts = filename . split ( " / " , 1 )
parts = filename . split ( " / " , 1 )
if parts [ 0 ] . startswith ( self . _config . table_prefix ) :
if parts [ 0 ] . startswith ( self . _config . table_prefix ) :
dataset_id = parts [ 0 ] [ len ( self . _config . table_prefix ) : ]
dataset_id = parts [ 0 ] [ len ( self . _config . table_prefix ) : ]
filename = parts [ 1 ]
filename = parts [ 1 ]
else :
else :
dataset_id = parts [ 0 ]
dataset_id = parts [ 0 ]
filename = parts [ 1 ]
filename = parts [ 1 ]
# Ensure table volume exists (for table volumes)
# Ensure table volume exists (for table volumes)
if dataset_id :
if dataset_id :
self . _ensure_table_volume_exists ( dataset_id )
self . _ensure_table_volume_exists ( dataset_id )
# Check permissions (if enabled)
# Check permissions (if enabled)
if self . _config . permission_check :
if self . _config . permission_check :
# Skip permission check for special directories that use USER VOLUME
# Skip permission check for special directories that use USER VOLUME
if dataset_id not in [ " upload_files " , " temp " , " cache " , " tools " , " website_files " ] :
if dataset_id not in [ " upload_files " , " temp " , " cache " , " tools " , " website_files " ] :
check_volume_permission ( self . _permission_manager , " save " , dataset_id )
check_volume_permission ( self . _permission_manager , " save " , dataset_id )
# Write data to temporary file
# Write data to temporary file
with tempfile . NamedTemporaryFile ( delete = False ) as temp_file :
with tempfile . NamedTemporaryFile ( delete = False ) as temp_file :
temp_file . write ( data )
temp_file . write ( data )
temp_file_path = temp_file . name
temp_file_path = temp_file . name
try :
try :
# Upload to volume
# Upload to volume
volume_prefix = self . _get_volume_sql_prefix ( dataset_id )
volume_prefix = self . _get_volume_sql_prefix ( dataset_id )
# Get the actual volume path (may include dify_km prefix)
# Get the actual volume path (may include dify_km prefix)
volume_path = self . _get_volume_path ( filename , dataset_id )
volume_path = self . _get_volume_path ( filename , dataset_id )
actual_filename = volume_path . split ( ' / ' ) [ - 1 ] if ' / ' in volume_path else volume_path
actual_filename = volume_path . split ( " / " ) [ - 1 ] if " / " in volume_path else volume_path
# For User Volume, use the full path with dify_km prefix
# For User Volume, use the full path with dify_km prefix
if volume_prefix == " USER VOLUME " :
if volume_prefix == " USER VOLUME " :
sql = f " PUT ' { temp_file_path } ' TO { volume_prefix } FILE ' { volume_path } ' "
sql = f " PUT ' { temp_file_path } ' TO { volume_prefix } FILE ' { volume_path } ' "
else :
else :
sql = f " PUT ' { temp_file_path } ' TO { volume_prefix } FILE ' { filename } ' "
sql = f " PUT ' { temp_file_path } ' TO { volume_prefix } FILE ' { filename } ' "
self . _execute_sql ( sql )
self . _execute_sql ( sql )
logger . debug ( f " File { filename } saved to ClickZetta Volume at path { volume_path } " )
logger . debug ( f " File { filename } saved to ClickZetta Volume at path { volume_path } " )
finally :
finally :
# Clean up temporary file
# Clean up temporary file
Path ( temp_file_path ) . unlink ( missing_ok = True )
Path ( temp_file_path ) . unlink ( missing_ok = True )
def load_once ( self , filename : str ) - > bytes :
def load_once ( self , filename : str ) - > bytes :
""" Load file content from ClickZetta Volume.
""" Load file content from ClickZetta Volume.
Args :
Args :
filename : File path in volume
filename : File path in volume
Returns :
Returns :
File content as bytes
File content as bytes
"""
"""
@ -319,33 +317,33 @@ class ClickZettaVolumeStorage(BaseStorage):
if " / " in filename and self . _config . volume_type == " table " :
if " / " in filename and self . _config . volume_type == " table " :
parts = filename . split ( " / " , 1 )
parts = filename . split ( " / " , 1 )
if parts [ 0 ] . startswith ( self . _config . table_prefix ) :
if parts [ 0 ] . startswith ( self . _config . table_prefix ) :
dataset_id = parts [ 0 ] [ len ( self . _config . table_prefix ) : ]
dataset_id = parts [ 0 ] [ len ( self . _config . table_prefix ) : ]
filename = parts [ 1 ]
filename = parts [ 1 ]
else :
else :
dataset_id = parts [ 0 ]
dataset_id = parts [ 0 ]
filename = parts [ 1 ]
filename = parts [ 1 ]
# Check permissions (if enabled)
# Check permissions (if enabled)
if self . _config . permission_check :
if self . _config . permission_check :
# Skip permission check for special directories that use USER VOLUME
# Skip permission check for special directories that use USER VOLUME
if dataset_id not in [ " upload_files " , " temp " , " cache " , " tools " , " website_files " ] :
if dataset_id not in [ " upload_files " , " temp " , " cache " , " tools " , " website_files " ] :
check_volume_permission ( self . _permission_manager , " load_once " , dataset_id )
check_volume_permission ( self . _permission_manager , " load_once " , dataset_id )
# Download to temporary directory
# Download to temporary directory
with tempfile . TemporaryDirectory ( ) as temp_dir :
with tempfile . TemporaryDirectory ( ) as temp_dir :
volume_prefix = self . _get_volume_sql_prefix ( dataset_id )
volume_prefix = self . _get_volume_sql_prefix ( dataset_id )
# Get the actual volume path (may include dify_km prefix)
# Get the actual volume path (may include dify_km prefix)
volume_path = self . _get_volume_path ( filename , dataset_id )
volume_path = self . _get_volume_path ( filename , dataset_id )
# For User Volume, use the full path with dify_km prefix
# For User Volume, use the full path with dify_km prefix
if volume_prefix == " USER VOLUME " :
if volume_prefix == " USER VOLUME " :
sql = f " GET { volume_prefix } FILE ' { volume_path } ' TO ' { temp_dir } ' "
sql = f " GET { volume_prefix } FILE ' { volume_path } ' TO ' { temp_dir } ' "
else :
else :
sql = f " GET { volume_prefix } FILE ' { filename } ' TO ' { temp_dir } ' "
sql = f " GET { volume_prefix } FILE ' { filename } ' TO ' { temp_dir } ' "
self . _execute_sql ( sql )
self . _execute_sql ( sql )
# Find the downloaded file (may be in subdirectories)
# Find the downloaded file (may be in subdirectories)
downloaded_file = None
downloaded_file = None
for root , dirs , files in os . walk ( temp_dir ) :
for root , dirs , files in os . walk ( temp_dir ) :
@ -355,52 +353,52 @@ class ClickZettaVolumeStorage(BaseStorage):
break
break
if downloaded_file :
if downloaded_file :
break
break
if not downloaded_file or not downloaded_file . exists ( ) :
if not downloaded_file or not downloaded_file . exists ( ) :
raise FileNotFoundError ( f " Downloaded file not found: { filename } " )
raise FileNotFoundError ( f " Downloaded file not found: { filename } " )
content = downloaded_file . read_bytes ( )
content = downloaded_file . read_bytes ( )
logger . debug ( f " File { filename } loaded from ClickZetta Volume " )
logger . debug ( f " File { filename } loaded from ClickZetta Volume " )
return content
return content
def load_stream ( self , filename : str ) - > Generator :
def load_stream ( self , filename : str ) - > Generator :
""" Load file as stream from ClickZetta Volume.
""" Load file as stream from ClickZetta Volume.
Args :
Args :
filename : File path in volume
filename : File path in volume
Yields :
Yields :
File content chunks
File content chunks
"""
"""
content = self . load_once ( filename )
content = self . load_once ( filename )
batch_size = 4096
batch_size = 4096
stream = BytesIO ( content )
stream = BytesIO ( content )
while chunk := stream . read ( batch_size ) :
while chunk := stream . read ( batch_size ) :
yield chunk
yield chunk
logger . debug ( f " File { filename } loaded as stream from ClickZetta Volume " )
logger . debug ( f " File { filename } loaded as stream from ClickZetta Volume " )
def download ( self , filename : str , target_filepath : str ) :
def download ( self , filename : str , target_filepath : str ) :
""" Download file from ClickZetta Volume to local path.
""" Download file from ClickZetta Volume to local path.
Args :
Args :
filename : File path in volume
filename : File path in volume
target_filepath : Local target file path
target_filepath : Local target file path
"""
"""
content = self . load_once ( filename )
content = self . load_once ( filename )
with Path ( target_filepath ) . open ( " wb " ) as f :
with Path ( target_filepath ) . open ( " wb " ) as f :
f . write ( content )
f . write ( content )
logger . debug ( f " File { filename } downloaded from ClickZetta Volume to { target_filepath } " )
logger . debug ( f " File { filename } downloaded from ClickZetta Volume to { target_filepath } " )
def exists ( self , filename : str ) - > bool :
def exists ( self , filename : str ) - > bool :
""" Check if file exists in ClickZetta Volume.
""" Check if file exists in ClickZetta Volume.
Args :
Args :
filename : File path in volume
filename : File path in volume
Returns :
Returns :
True if file exists , False otherwise
True if file exists , False otherwise
"""
"""
@ -410,76 +408,76 @@ class ClickZettaVolumeStorage(BaseStorage):
if " / " in filename and self . _config . volume_type == " table " :
if " / " in filename and self . _config . volume_type == " table " :
parts = filename . split ( " / " , 1 )
parts = filename . split ( " / " , 1 )
if parts [ 0 ] . startswith ( self . _config . table_prefix ) :
if parts [ 0 ] . startswith ( self . _config . table_prefix ) :
dataset_id = parts [ 0 ] [ len ( self . _config . table_prefix ) : ]
dataset_id = parts [ 0 ] [ len ( self . _config . table_prefix ) : ]
filename = parts [ 1 ]
filename = parts [ 1 ]
else :
else :
dataset_id = parts [ 0 ]
dataset_id = parts [ 0 ]
filename = parts [ 1 ]
filename = parts [ 1 ]
volume_prefix = self . _get_volume_sql_prefix ( dataset_id )
volume_prefix = self . _get_volume_sql_prefix ( dataset_id )
# Get the actual volume path (may include dify_km prefix)
# Get the actual volume path (may include dify_km prefix)
volume_path = self . _get_volume_path ( filename , dataset_id )
volume_path = self . _get_volume_path ( filename , dataset_id )
# For User Volume, use the full path with dify_km prefix
# For User Volume, use the full path with dify_km prefix
if volume_prefix == " USER VOLUME " :
if volume_prefix == " USER VOLUME " :
sql = f " LIST { volume_prefix } REGEXP = ' ^ { volume_path } $ ' "
sql = f " LIST { volume_prefix } REGEXP = ' ^ { volume_path } $ ' "
else :
else :
sql = f " LIST { volume_prefix } REGEXP = ' ^ { filename } $ ' "
sql = f " LIST { volume_prefix } REGEXP = ' ^ { filename } $ ' "
rows = self . _execute_sql ( sql , fetch = True )
rows = self . _execute_sql ( sql , fetch = True )
exists = len ( rows ) > 0
exists = len ( rows ) > 0
logger . debug ( f " File { filename } exists check: { exists } " )
logger . debug ( f " File { filename } exists check: { exists } " )
return exists
return exists
except Exception as e :
except Exception as e :
logger . warning ( f " Error checking file existence for { filename } : { e } " )
logger . warning ( f " Error checking file existence for { filename } : { e } " )
return False
return False
def delete ( self , filename : str ) :
def delete ( self , filename : str ) :
""" Delete file from ClickZetta Volume.
""" Delete file from ClickZetta Volume.
Args :
Args :
filename : File path in volume
filename : File path in volume
"""
"""
if not self . exists ( filename ) :
if not self . exists ( filename ) :
logger . debug ( f " File { filename } not found, skip delete " )
logger . debug ( f " File { filename } not found, skip delete " )
return
return
# Extract dataset_id from filename if present
# Extract dataset_id from filename if present
dataset_id = None
dataset_id = None
if " / " in filename and self . _config . volume_type == " table " :
if " / " in filename and self . _config . volume_type == " table " :
parts = filename . split ( " / " , 1 )
parts = filename . split ( " / " , 1 )
if parts [ 0 ] . startswith ( self . _config . table_prefix ) :
if parts [ 0 ] . startswith ( self . _config . table_prefix ) :
dataset_id = parts [ 0 ] [ len ( self . _config . table_prefix ) : ]
dataset_id = parts [ 0 ] [ len ( self . _config . table_prefix ) : ]
filename = parts [ 1 ]
filename = parts [ 1 ]
else :
else :
dataset_id = parts [ 0 ]
dataset_id = parts [ 0 ]
filename = parts [ 1 ]
filename = parts [ 1 ]
volume_prefix = self . _get_volume_sql_prefix ( dataset_id )
volume_prefix = self . _get_volume_sql_prefix ( dataset_id )
# Get the actual volume path (may include dify_km prefix)
# Get the actual volume path (may include dify_km prefix)
volume_path = self . _get_volume_path ( filename , dataset_id )
volume_path = self . _get_volume_path ( filename , dataset_id )
# For User Volume, use the full path with dify_km prefix
# For User Volume, use the full path with dify_km prefix
if volume_prefix == " USER VOLUME " :
if volume_prefix == " USER VOLUME " :
sql = f " REMOVE { volume_prefix } FILE ' { volume_path } ' "
sql = f " REMOVE { volume_prefix } FILE ' { volume_path } ' "
else :
else :
sql = f " REMOVE { volume_prefix } FILE ' { filename } ' "
sql = f " REMOVE { volume_prefix } FILE ' { filename } ' "
self . _execute_sql ( sql )
self . _execute_sql ( sql )
logger . debug ( f " File { filename } deleted from ClickZetta Volume " )
logger . debug ( f " File { filename } deleted from ClickZetta Volume " )
def scan ( self , path : str , files : bool = True , directories : bool = False ) - > list [ str ] :
def scan ( self , path : str , files : bool = True , directories : bool = False ) - > list [ str ] :
""" Scan files and directories in ClickZetta Volume.
""" Scan files and directories in ClickZetta Volume.
Args :
Args :
path : Path to scan ( dataset_id for table volumes )
path : Path to scan ( dataset_id for table volumes )
files : Include files in results
files : Include files in results
directories : Include directories in results
directories : Include directories in results
Returns :
Returns :
List of file / directory paths
List of file / directory paths
"""
"""
@ -489,9 +487,9 @@ class ClickZettaVolumeStorage(BaseStorage):
if self . _config . volume_type == " table " :
if self . _config . volume_type == " table " :
dataset_id = path
dataset_id = path
path = " " # Root of the table volume
path = " " # Root of the table volume
volume_prefix = self . _get_volume_sql_prefix ( dataset_id )
volume_prefix = self . _get_volume_sql_prefix ( dataset_id )
# For User Volume, add dify prefix to path
# For User Volume, add dify prefix to path
if volume_prefix == " USER VOLUME " :
if volume_prefix == " USER VOLUME " :
if path :
if path :
@ -504,26 +502,24 @@ class ClickZettaVolumeStorage(BaseStorage):
sql = f " LIST { volume_prefix } SUBDIRECTORY ' { path } ' "
sql = f " LIST { volume_prefix } SUBDIRECTORY ' { path } ' "
else :
else :
sql = f " LIST { volume_prefix } "
sql = f " LIST { volume_prefix } "
rows = self . _execute_sql ( sql , fetch = True )
rows = self . _execute_sql ( sql , fetch = True )
result = [ ]
result = [ ]
for row in rows :
for row in rows :
file_path = row [ 0 ] # relative_path column
file_path = row [ 0 ] # relative_path column
# For User Volume, remove dify prefix from results
# For User Volume, remove dify prefix from results
dify_prefix_with_slash = f " { self . _config . dify_prefix } / "
dify_prefix_with_slash = f " { self . _config . dify_prefix } / "
if volume_prefix == " USER VOLUME " and file_path . startswith ( dify_prefix_with_slash ) :
if volume_prefix == " USER VOLUME " and file_path . startswith ( dify_prefix_with_slash ) :
file_path = file_path [ len ( dify_prefix_with_slash ) : ] # Remove prefix
file_path = file_path [ len ( dify_prefix_with_slash ) : ] # Remove prefix
if files and not file_path . endswith ( " / " ) :
if files and not file_path . endswith ( " / " ) or directories and file_path . endswith ( " / " ) :
result . append ( file_path )
elif directories and file_path . endswith ( " / " ) :
result . append ( file_path )
result . append ( file_path )
logger . debug ( f " Scanned { len ( result ) } items in path { path } " )
logger . debug ( f " Scanned { len ( result ) } items in path { path } " )
return result
return result
except Exception as e :
except Exception as e :
logger . error ( f " Error scanning path { path } : { e } " )
logger . error ( f " Error scanning path { path } : { e } " )
return [ ]
return [ ]