You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

191 lines
6.5 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import logging
import time
from io import BytesIO
from minio import Minio
from minio.commonconfig import CopySource
from minio.error import S3Error
from constant_3d import MINIO_CONFIG
MINIO = MINIO_CONFIG
class MinioServer:
def __init__(self):
self.conn = None
self.__open__()
def __open__(self):
try:
if self.conn:
self.__close__()
except Exception:
pass
try:
self.conn = Minio(MINIO["host"],
access_key=MINIO["user"],
secret_key=MINIO["password"],
secure=False
)
except Exception:
logging.exception(
"Fail to connect %s " % MINIO["host"])
def __close__(self):
del self.conn
self.conn = None
def health(self):
bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
if not self.conn.bucket_exists(bucket):
self.conn.make_bucket(bucket)
r = self.conn.put_object(bucket, fnm,
BytesIO(binary),
len(binary)
)
return r
def put(self, bucket, fnm, binary):
for _ in range(3):
try:
if not self.conn.bucket_exists(bucket):
self.conn.make_bucket(bucket)
r = self.conn.put_object(bucket, fnm,
BytesIO(binary),
len(binary)
)
return r
except Exception:
logging.exception(f"Fail to put {bucket}/{fnm}:")
self.__open__()
time.sleep(1)
def rm(self, bucket, fnm):
try:
self.conn.remove_object(bucket, fnm)
except Exception:
logging.exception(f"Fail to remove {bucket}/{fnm}:")
def get(self, bucket, filename):
for _ in range(3):
try:
r = self.conn.get_object(bucket, filename)
return r.read()
except Exception:
logging.exception(f"Fail to get {bucket}/{filename}")
self.__open__()
time.sleep(1)
return None
def get_file(self, bucket, filename):
for _ in range(2):
try:
r = self.conn.get_object(bucket, filename)
return r
except Exception:
logging.exception(f"Fail to get {bucket}{filename}")
self.__open__()
time.sleep(0.5)
return None
def obj_exist(self, bucket, filename):
try:
if not self.conn.bucket_exists(bucket):
return False
if self.conn.stat_object(bucket, filename):
return True
else:
return False
except S3Error as e:
if e.code in ["NoSuchKey", "NoSuchBucket", "ResourceNotFound"]:
return False
except Exception:
logging.exception(f"obj_exist {bucket}/{filename} got exception")
return False
def get_presigned_url(self, bucket, fnm, expires):
for _ in range(10):
try:
return self.conn.get_presigned_url("GET", bucket, fnm, expires)
except Exception:
logging.exception(f"Fail to get_presigned {bucket}/{fnm}:")
self.__open__()
time.sleep(1)
return
def remove_bucket(self, bucket):
try:
if self.conn.bucket_exists(bucket):
objects_to_delete = self.conn.list_objects(bucket, recursive=True)
for obj in objects_to_delete:
self.conn.remove_object(bucket, obj.object_name)
self.conn.remove_bucket(bucket)
except Exception:
logging.exception(f"Fail to remove bucket {bucket}")
def init_directory(self, bucket, fnm):
for _ in range(3):
try:
if not self.conn.bucket_exists(bucket):
self.conn.make_bucket(bucket)
r = self.conn.put_object(bucket, fnm, BytesIO(b''), 0)
return True
except Exception:
logging.exception(f"Fail to init directory {bucket}/{fnm}:")
self.__open__()
time.sleep(1)
return False
def initCreateBucket(self, bucket_name):
# 检查桶是否存在
found = self.conn.bucket_exists(bucket_name)
if not found:
# 桶不存在,创建桶
try:
self.conn.make_bucket(bucket_name)
policy = """
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "PublicRead",
"Effect": "Allow",
"Principal": "*",
"Action": ["s3:GetObject"],
"Resource": ["arn:aws:s3:::%s/*" ]
}
]
}
""" % bucket_name
self.conn.set_bucket_policy(bucket_name, policy)
logging.info(f"'{bucket_name}' 创建成功.")
return True
except S3Error as err:
logging.error(f"Error occurred: {err}")
else:
logging.info(f"'{bucket_name}' 存在.")
return False
def copy_file_in_bucket(self, source_bucket, source_file_path, target_bucket, target_path):
"""
在同一个桶内复制文件
:param source_bucket: 存放元数据的桶名称
:param source_file_path: 源文件路径(如 "aa/a"
:param target_bucket: 存放目标位置的桶名称
:param target_path: 目标文件路径(如 "bb/a"
"""
copy_source = CopySource(source_bucket, source_file_path)
try:
# 服务端复制对象
self.conn.copy_object(
bucket_name=target_bucket,
object_name=target_path, # 目标路径
source=copy_source # 源路径
)
logging.info(f"文件从桶‘{source_bucket}’的‘{source_file_path} 复制到桶‘{target_bucket}’的‘{target_path}")
except S3Error as e:
logging.error(f"复制失败: {e}")