import logging import time from io import BytesIO from minio import Minio from minio.commonconfig import CopySource from minio.error import S3Error from constant_3d import MINIO_CONFIG MINIO = MINIO_CONFIG class MinioServer: def __init__(self): self.conn = None self.__open__() def __open__(self): try: if self.conn: self.__close__() except Exception: pass try: self.conn = Minio(MINIO["host"], access_key=MINIO["user"], secret_key=MINIO["password"], secure=False ) except Exception: logging.exception( "Fail to connect %s " % MINIO["host"]) def __close__(self): del self.conn self.conn = None def health(self): bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1" if not self.conn.bucket_exists(bucket): self.conn.make_bucket(bucket) r = self.conn.put_object(bucket, fnm, BytesIO(binary), len(binary) ) return r def put(self, bucket, fnm, binary): for _ in range(3): try: if not self.conn.bucket_exists(bucket): self.conn.make_bucket(bucket) r = self.conn.put_object(bucket, fnm, BytesIO(binary), len(binary) ) return r except Exception: logging.exception(f"Fail to put {bucket}/{fnm}:") self.__open__() time.sleep(1) def rm(self, bucket, fnm): try: self.conn.remove_object(bucket, fnm) except Exception: logging.exception(f"Fail to remove {bucket}/{fnm}:") def get(self, bucket, filename): for _ in range(3): try: r = self.conn.get_object(bucket, filename) return r.read() except Exception: logging.exception(f"Fail to get {bucket}/{filename}") self.__open__() time.sleep(1) return None def get_file(self, bucket, filename): for _ in range(2): try: r = self.conn.get_object(bucket, filename) return r except Exception: logging.exception(f"Fail to get {bucket}/{filename}") self.__open__() time.sleep(1) return None def obj_exist(self, bucket, filename): try: if not self.conn.bucket_exists(bucket): return False if self.conn.stat_object(bucket, filename): return True else: return False except S3Error as e: if e.code in ["NoSuchKey", "NoSuchBucket", "ResourceNotFound"]: return False except Exception: logging.exception(f"obj_exist {bucket}/{filename} got exception") return False def get_presigned_url(self, bucket, fnm, expires): for _ in range(10): try: return self.conn.get_presigned_url("GET", bucket, fnm, expires) except Exception: logging.exception(f"Fail to get_presigned {bucket}/{fnm}:") self.__open__() time.sleep(1) return def remove_bucket(self, bucket): try: if self.conn.bucket_exists(bucket): objects_to_delete = self.conn.list_objects(bucket, recursive=True) for obj in objects_to_delete: self.conn.remove_object(bucket, obj.object_name) self.conn.remove_bucket(bucket) except Exception: logging.exception(f"Fail to remove bucket {bucket}") def init_directory(self, bucket, fnm): for _ in range(3): try: if not self.conn.bucket_exists(bucket): self.conn.make_bucket(bucket) r = self.conn.put_object(bucket, fnm, BytesIO(b''), 0) return True except Exception: logging.exception(f"Fail to init directory {bucket}/{fnm}:") self.__open__() time.sleep(1) return False def initCreateBucket(self, bucket_name): # 检查桶是否存在 found = self.conn.bucket_exists(bucket_name) if not found: # 桶不存在,创建桶 try: self.conn.make_bucket(bucket_name) policy = """ { "Version": "2012-10-17", "Statement": [ { "Sid": "PublicRead", "Effect": "Allow", "Principal": "*", "Action": ["s3:GetObject"], "Resource": ["arn:aws:s3:::%s/*" ] } ] } """ % bucket_name self.conn.set_bucket_policy(bucket_name, policy) logging.info(f"桶 '{bucket_name}' 创建成功.") return True except S3Error as err: logging.error(f"Error occurred: {err}") else: logging.info(f"桶 '{bucket_name}' 存在.") return False def copy_file_in_bucket(self, source_bucket, source_file_path, target_bucket, target_path): """ 在同一个桶内复制文件 :param source_bucket: 存放元数据的桶名称 :param source_file_path: 源文件路径(如 "aa/a") :param target_bucket: 存放目标位置的桶名称 :param target_path: 目标文件路径(如 "bb/a") """ copy_source = CopySource(source_bucket, source_file_path) try: # 服务端复制对象 self.conn.copy_object( bucket_name=target_bucket, object_name=target_path, # 目标路径 source=copy_source # 源路径 ) logging.info(f"文件从桶‘{source_bucket}’的‘{source_file_path}’ 复制到桶‘{target_bucket}’的‘{target_path}’") except S3Error as e: logging.error(f"复制失败: {e}")