|
|
|
|
@ -35,21 +35,21 @@ class OpenDALStorage(BaseStorage):
|
|
|
|
|
Path(root).mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
self.op = opendal.Operator(scheme=scheme, **kwargs) # type: ignore
|
|
|
|
|
logger.debug(f"opendal operator created with scheme {scheme}")
|
|
|
|
|
logger.debug("opendal operator created with scheme %s", scheme)
|
|
|
|
|
retry_layer = opendal.layers.RetryLayer(max_times=3, factor=2.0, jitter=True)
|
|
|
|
|
self.op = self.op.layer(retry_layer)
|
|
|
|
|
logger.debug("added retry layer to opendal operator")
|
|
|
|
|
|
|
|
|
|
def save(self, filename: str, data: bytes) -> None:
|
|
|
|
|
self.op.write(path=filename, bs=data)
|
|
|
|
|
logger.debug(f"file {filename} saved")
|
|
|
|
|
logger.debug("file %s saved", filename)
|
|
|
|
|
|
|
|
|
|
def load_once(self, filename: str) -> bytes:
|
|
|
|
|
if not self.exists(filename):
|
|
|
|
|
raise FileNotFoundError("File not found")
|
|
|
|
|
|
|
|
|
|
content: bytes = self.op.read(path=filename)
|
|
|
|
|
logger.debug(f"file {filename} loaded")
|
|
|
|
|
logger.debug("file %s loaded", filename)
|
|
|
|
|
return content
|
|
|
|
|
|
|
|
|
|
def load_stream(self, filename: str) -> Generator:
|
|
|
|
|
@ -60,7 +60,7 @@ class OpenDALStorage(BaseStorage):
|
|
|
|
|
file = self.op.open(path=filename, mode="rb")
|
|
|
|
|
while chunk := file.read(batch_size):
|
|
|
|
|
yield chunk
|
|
|
|
|
logger.debug(f"file {filename} loaded as stream")
|
|
|
|
|
logger.debug("file %s loaded as stream", filename)
|
|
|
|
|
|
|
|
|
|
def download(self, filename: str, target_filepath: str):
|
|
|
|
|
if not self.exists(filename):
|
|
|
|
|
@ -68,7 +68,7 @@ class OpenDALStorage(BaseStorage):
|
|
|
|
|
|
|
|
|
|
with Path(target_filepath).open("wb") as f:
|
|
|
|
|
f.write(self.op.read(path=filename))
|
|
|
|
|
logger.debug(f"file {filename} downloaded to {target_filepath}")
|
|
|
|
|
logger.debug("file %s downloaded to %s", filename, target_filepath)
|
|
|
|
|
|
|
|
|
|
def exists(self, filename: str) -> bool:
|
|
|
|
|
res: bool = self.op.exists(path=filename)
|
|
|
|
|
@ -77,9 +77,9 @@ class OpenDALStorage(BaseStorage):
|
|
|
|
|
def delete(self, filename: str):
|
|
|
|
|
if self.exists(filename):
|
|
|
|
|
self.op.delete(path=filename)
|
|
|
|
|
logger.debug(f"file {filename} deleted")
|
|
|
|
|
logger.debug("file %s deleted", filename)
|
|
|
|
|
return
|
|
|
|
|
logger.debug(f"file {filename} not found, skip delete")
|
|
|
|
|
logger.debug("file %s not found, skip delete", filename)
|
|
|
|
|
|
|
|
|
|
def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]:
|
|
|
|
|
if not self.exists(path):
|
|
|
|
|
@ -87,13 +87,13 @@ class OpenDALStorage(BaseStorage):
|
|
|
|
|
|
|
|
|
|
all_files = self.op.scan(path=path)
|
|
|
|
|
if files and directories:
|
|
|
|
|
logger.debug(f"files and directories on {path} scanned")
|
|
|
|
|
logger.debug("files and directories on %s scanned", path)
|
|
|
|
|
return [f.path for f in all_files]
|
|
|
|
|
if files:
|
|
|
|
|
logger.debug(f"files on {path} scanned")
|
|
|
|
|
logger.debug("files on %s scanned", path)
|
|
|
|
|
return [f.path for f in all_files if not f.path.endswith("/")]
|
|
|
|
|
elif directories:
|
|
|
|
|
logger.debug(f"directories on {path} scanned")
|
|
|
|
|
logger.debug("directories on %s scanned", path)
|
|
|
|
|
return [f.path for f in all_files if f.path.endswith("/")]
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError("At least one of files or directories must be True")
|
|
|
|
|
|