@ -17,6 +17,7 @@ from core.rag.models.document import Document
from events . app_event import app_was_created
from extensions . ext_database import db
from extensions . ext_redis import redis_client
from extensions . ext_storage import storage
from libs . helper import email as email_validate
from libs . password import hash_password , password_pattern , valid_password
from libs . rsa import generate_key_pair
@ -271,6 +272,7 @@ def migrate_knowledge_vector_database():
upper_collection_vector_types = {
VectorType . MILVUS ,
VectorType . PGVECTOR ,
VectorType . VASTBASE ,
VectorType . RELYT ,
VectorType . WEAVIATE ,
VectorType . ORACLE ,
@ -666,7 +668,7 @@ def upgrade_db():
click . echo ( click . style ( " Starting database migration. " , fg = " green " ) )
# run db migration
import flask_migrate # type: ignore
import flask_migrate
flask_migrate . upgrade ( )
@ -814,3 +816,331 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
ClearFreePlanTenantExpiredLogs . process ( days , batch , tenant_ids )
click . echo ( click . style ( " Clear free plan tenant expired logs completed. " , fg = " green " ) )
@click.option ( " -f " , " --force " , is_flag = True , help = " Skip user confirmation and force the command to execute. " )
@click.command ( " clear-orphaned-file-records " , help = " Clear orphaned file records. " )
def clear_orphaned_file_records ( force : bool ) :
"""
Clear orphaned file records in the database .
"""
# define tables and columns to process
files_tables = [
{ " table " : " upload_files " , " id_column " : " id " , " key_column " : " key " } ,
{ " table " : " tool_files " , " id_column " : " id " , " key_column " : " file_key " } ,
]
ids_tables = [
{ " type " : " uuid " , " table " : " message_files " , " column " : " upload_file_id " } ,
{ " type " : " text " , " table " : " documents " , " column " : " data_source_info " } ,
{ " type " : " text " , " table " : " document_segments " , " column " : " content " } ,
{ " type " : " text " , " table " : " messages " , " column " : " answer " } ,
{ " type " : " text " , " table " : " workflow_node_executions " , " column " : " inputs " } ,
{ " type " : " text " , " table " : " workflow_node_executions " , " column " : " process_data " } ,
{ " type " : " text " , " table " : " workflow_node_executions " , " column " : " outputs " } ,
{ " type " : " text " , " table " : " conversations " , " column " : " introduction " } ,
{ " type " : " text " , " table " : " conversations " , " column " : " system_instruction " } ,
{ " type " : " json " , " table " : " messages " , " column " : " inputs " } ,
{ " type " : " json " , " table " : " messages " , " column " : " message " } ,
]
# notify user and ask for confirmation
click . echo (
click . style (
" This command will first find and delete orphaned file records from the message_files table, " , fg = " yellow "
)
)
click . echo (
click . style (
" and then it will find and delete orphaned file records in the following tables: " ,
fg = " yellow " ,
)
)
for files_table in files_tables :
click . echo ( click . style ( f " - { files_table [ ' table ' ] } " , fg = " yellow " ) )
click . echo (
click . style ( " The following tables and columns will be scanned to find orphaned file records: " , fg = " yellow " )
)
for ids_table in ids_tables :
click . echo ( click . style ( f " - { ids_table [ ' table ' ] } ( { ids_table [ ' column ' ] } ) " , fg = " yellow " ) )
click . echo ( " " )
click . echo ( click . style ( " !!! USE WITH CAUTION !!! " , fg = " red " ) )
click . echo (
click . style (
(
" Since not all patterns have been fully tested, "
" please note that this command may delete unintended file records. "
) ,
fg = " yellow " ,
)
)
click . echo (
click . style ( " This cannot be undone. Please make sure to back up your database before proceeding. " , fg = " yellow " )
)
click . echo (
click . style (
(
" It is also recommended to run this during the maintenance window, "
" as this may cause high load on your instance. "
) ,
fg = " yellow " ,
)
)
if not force :
click . confirm ( " Do you want to proceed? " , abort = True )
# start the cleanup process
click . echo ( click . style ( " Starting orphaned file records cleanup. " , fg = " white " ) )
# clean up the orphaned records in the message_files table where message_id doesn't exist in messages table
try :
click . echo (
click . style ( " - Listing message_files records where message_id doesn ' t exist in messages table " , fg = " white " )
)
query = (
" SELECT mf.id, mf.message_id "
" FROM message_files mf LEFT JOIN messages m ON mf.message_id = m.id "
" WHERE m.id IS NULL "
)
orphaned_message_files = [ ]
with db . engine . begin ( ) as conn :
rs = conn . execute ( db . text ( query ) )
for i in rs :
orphaned_message_files . append ( { " id " : str ( i [ 0 ] ) , " message_id " : str ( i [ 1 ] ) } )
if orphaned_message_files :
click . echo ( click . style ( f " Found { len ( orphaned_message_files ) } orphaned message_files records: " , fg = " white " ) )
for record in orphaned_message_files :
click . echo ( click . style ( f " - id: { record [ ' id ' ] } , message_id: { record [ ' message_id ' ] } " , fg = " black " ) )
if not force :
click . confirm (
(
f " Do you want to proceed "
f " to delete all { len ( orphaned_message_files ) } orphaned message_files records? "
) ,
abort = True ,
)
click . echo ( click . style ( " - Deleting orphaned message_files records " , fg = " white " ) )
query = " DELETE FROM message_files WHERE id IN :ids "
with db . engine . begin ( ) as conn :
conn . execute ( db . text ( query ) , { " ids " : tuple ( [ record [ " id " ] for record in orphaned_message_files ] ) } )
click . echo (
click . style ( f " Removed { len ( orphaned_message_files ) } orphaned message_files records. " , fg = " green " )
)
else :
click . echo ( click . style ( " No orphaned message_files records found. There is nothing to delete. " , fg = " green " ) )
except Exception as e :
click . echo ( click . style ( f " Error deleting orphaned message_files records: { str ( e ) } " , fg = " red " ) )
# clean up the orphaned records in the rest of the *_files tables
try :
# fetch file id and keys from each table
all_files_in_tables = [ ]
for files_table in files_tables :
click . echo ( click . style ( f " - Listing file records in table { files_table [ ' table ' ] } " , fg = " white " ) )
query = f " SELECT { files_table [ ' id_column ' ] } , { files_table [ ' key_column ' ] } FROM { files_table [ ' table ' ] } "
with db . engine . begin ( ) as conn :
rs = conn . execute ( db . text ( query ) )
for i in rs :
all_files_in_tables . append ( { " table " : files_table [ " table " ] , " id " : str ( i [ 0 ] ) , " key " : i [ 1 ] } )
click . echo ( click . style ( f " Found { len ( all_files_in_tables ) } files in tables. " , fg = " white " ) )
# fetch referred table and columns
guid_regexp = " [0-9a-fA-F] {8} -[0-9a-fA-F] {4} -[0-9a-fA-F] {4} -[0-9a-fA-F] {4} -[0-9a-fA-F] {12} "
all_ids_in_tables = [ ]
for ids_table in ids_tables :
query = " "
if ids_table [ " type " ] == " uuid " :
click . echo (
click . style (
f " - Listing file ids in column { ids_table [ ' column ' ] } in table { ids_table [ ' table ' ] } " , fg = " white "
)
)
query = (
f " SELECT { ids_table [ ' column ' ] } FROM { ids_table [ ' table ' ] } WHERE { ids_table [ ' column ' ] } IS NOT NULL "
)
with db . engine . begin ( ) as conn :
rs = conn . execute ( db . text ( query ) )
for i in rs :
all_ids_in_tables . append ( { " table " : ids_table [ " table " ] , " id " : str ( i [ 0 ] ) } )
elif ids_table [ " type " ] == " text " :
click . echo (
click . style (
f " - Listing file-id-like strings in column { ids_table [ ' column ' ] } in table { ids_table [ ' table ' ] } " ,
fg = " white " ,
)
)
query = (
f " SELECT regexp_matches( { ids_table [ ' column ' ] } , ' { guid_regexp } ' , ' g ' ) AS extracted_id "
f " FROM { ids_table [ ' table ' ] } "
)
with db . engine . begin ( ) as conn :
rs = conn . execute ( db . text ( query ) )
for i in rs :
for j in i [ 0 ] :
all_ids_in_tables . append ( { " table " : ids_table [ " table " ] , " id " : j } )
elif ids_table [ " type " ] == " json " :
click . echo (
click . style (
(
f " - Listing file-id-like JSON string in column { ids_table [ ' column ' ] } "
f " in table { ids_table [ ' table ' ] } "
) ,
fg = " white " ,
)
)
query = (
f " SELECT regexp_matches( { ids_table [ ' column ' ] } ::text, ' { guid_regexp } ' , ' g ' ) AS extracted_id "
f " FROM { ids_table [ ' table ' ] } "
)
with db . engine . begin ( ) as conn :
rs = conn . execute ( db . text ( query ) )
for i in rs :
for j in i [ 0 ] :
all_ids_in_tables . append ( { " table " : ids_table [ " table " ] , " id " : j } )
click . echo ( click . style ( f " Found { len ( all_ids_in_tables ) } file ids in tables. " , fg = " white " ) )
except Exception as e :
click . echo ( click . style ( f " Error fetching keys: { str ( e ) } " , fg = " red " ) )
return
# find orphaned files
all_files = [ file [ " id " ] for file in all_files_in_tables ]
all_ids = [ file [ " id " ] for file in all_ids_in_tables ]
orphaned_files = list ( set ( all_files ) - set ( all_ids ) )
if not orphaned_files :
click . echo ( click . style ( " No orphaned file records found. There is nothing to delete. " , fg = " green " ) )
return
click . echo ( click . style ( f " Found { len ( orphaned_files ) } orphaned file records. " , fg = " white " ) )
for file in orphaned_files :
click . echo ( click . style ( f " - orphaned file id: { file } " , fg = " black " ) )
if not force :
click . confirm ( f " Do you want to proceed to delete all { len ( orphaned_files ) } orphaned file records? " , abort = True )
# delete orphaned records for each file
try :
for files_table in files_tables :
click . echo ( click . style ( f " - Deleting orphaned file records in table { files_table [ ' table ' ] } " , fg = " white " ) )
query = f " DELETE FROM { files_table [ ' table ' ] } WHERE { files_table [ ' id_column ' ] } IN :ids "
with db . engine . begin ( ) as conn :
conn . execute ( db . text ( query ) , { " ids " : tuple ( orphaned_files ) } )
except Exception as e :
click . echo ( click . style ( f " Error deleting orphaned file records: { str ( e ) } " , fg = " red " ) )
return
click . echo ( click . style ( f " Removed { len ( orphaned_files ) } orphaned file records. " , fg = " green " ) )
@click.option ( " -f " , " --force " , is_flag = True , help = " Skip user confirmation and force the command to execute. " )
@click.command ( " remove-orphaned-files-on-storage " , help = " Remove orphaned files on the storage. " )
def remove_orphaned_files_on_storage ( force : bool ) :
"""
Remove orphaned files on the storage .
"""
# define tables and columns to process
files_tables = [
{ " table " : " upload_files " , " key_column " : " key " } ,
{ " table " : " tool_files " , " key_column " : " file_key " } ,
]
storage_paths = [ " image_files " , " tools " , " upload_files " ]
# notify user and ask for confirmation
click . echo ( click . style ( " This command will find and remove orphaned files on the storage, " , fg = " yellow " ) )
click . echo (
click . style ( " by comparing the files on the storage with the records in the following tables: " , fg = " yellow " )
)
for files_table in files_tables :
click . echo ( click . style ( f " - { files_table [ ' table ' ] } " , fg = " yellow " ) )
click . echo ( click . style ( " The following paths on the storage will be scanned to find orphaned files: " , fg = " yellow " ) )
for storage_path in storage_paths :
click . echo ( click . style ( f " - { storage_path } " , fg = " yellow " ) )
click . echo ( " " )
click . echo ( click . style ( " !!! USE WITH CAUTION !!! " , fg = " red " ) )
click . echo (
click . style (
" Currently, this command will work only for opendal based storage (STORAGE_TYPE=opendal). " , fg = " yellow "
)
)
click . echo (
click . style (
" Since not all patterns have been fully tested, please note that this command may delete unintended files. " ,
fg = " yellow " ,
)
)
click . echo (
click . style ( " This cannot be undone. Please make sure to back up your storage before proceeding. " , fg = " yellow " )
)
click . echo (
click . style (
(
" It is also recommended to run this during the maintenance window, "
" as this may cause high load on your instance. "
) ,
fg = " yellow " ,
)
)
if not force :
click . confirm ( " Do you want to proceed? " , abort = True )
# start the cleanup process
click . echo ( click . style ( " Starting orphaned files cleanup. " , fg = " white " ) )
# fetch file id and keys from each table
all_files_in_tables = [ ]
try :
for files_table in files_tables :
click . echo ( click . style ( f " - Listing files from table { files_table [ ' table ' ] } " , fg = " white " ) )
query = f " SELECT { files_table [ ' key_column ' ] } FROM { files_table [ ' table ' ] } "
with db . engine . begin ( ) as conn :
rs = conn . execute ( db . text ( query ) )
for i in rs :
all_files_in_tables . append ( str ( i [ 0 ] ) )
click . echo ( click . style ( f " Found { len ( all_files_in_tables ) } files in tables. " , fg = " white " ) )
except Exception as e :
click . echo ( click . style ( f " Error fetching keys: { str ( e ) } " , fg = " red " ) )
all_files_on_storage = [ ]
for storage_path in storage_paths :
try :
click . echo ( click . style ( f " - Scanning files on storage path { storage_path } " , fg = " white " ) )
files = storage . scan ( path = storage_path , files = True , directories = False )
all_files_on_storage . extend ( files )
except FileNotFoundError as e :
click . echo ( click . style ( f " -> Skipping path { storage_path } as it does not exist. " , fg = " yellow " ) )
continue
except Exception as e :
click . echo ( click . style ( f " -> Error scanning files on storage path { storage_path } : { str ( e ) } " , fg = " red " ) )
continue
click . echo ( click . style ( f " Found { len ( all_files_on_storage ) } files on storage. " , fg = " white " ) )
# find orphaned files
orphaned_files = list ( set ( all_files_on_storage ) - set ( all_files_in_tables ) )
if not orphaned_files :
click . echo ( click . style ( " No orphaned files found. There is nothing to remove. " , fg = " green " ) )
return
click . echo ( click . style ( f " Found { len ( orphaned_files ) } orphaned files. " , fg = " white " ) )
for file in orphaned_files :
click . echo ( click . style ( f " - orphaned file: { file } " , fg = " black " ) )
if not force :
click . confirm ( f " Do you want to proceed to remove all { len ( orphaned_files ) } orphaned files? " , abort = True )
# delete orphaned files
removed_files = 0
error_files = 0
for file in orphaned_files :
try :
storage . delete ( file )
removed_files + = 1
click . echo ( click . style ( f " - Removing orphaned file: { file } " , fg = " white " ) )
except Exception as e :
error_files + = 1
click . echo ( click . style ( f " - Error deleting orphaned file { file } : { str ( e ) } " , fg = " red " ) )
continue
if error_files == 0 :
click . echo ( click . style ( f " Removed { removed_files } orphaned files without errors. " , fg = " green " ) )
else :
click . echo ( click . style ( f " Removed { removed_files } orphaned files, with { error_files } errors. " , fg = " yellow " ) )