|
|
|
|
@ -1,9 +1,9 @@
|
|
|
|
|
import mimetypes
|
|
|
|
|
import re
|
|
|
|
|
import uuid
|
|
|
|
|
from collections.abc import Callable, Mapping, Sequence
|
|
|
|
|
from typing import Any, cast
|
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
|
|
import httpx
|
|
|
|
|
from sqlalchemy import select
|
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
|
|
|
|
|
@ -224,10 +224,16 @@ def _build_from_remote_url(
|
|
|
|
|
mime_type, filename, file_size = _get_remote_file_info(url)
|
|
|
|
|
extension = mimetypes.guess_extension(mime_type) or ("." + filename.split(".")[-1] if "." in filename else ".bin")
|
|
|
|
|
|
|
|
|
|
file_type = _standardize_file_type(extension=extension, mime_type=mime_type)
|
|
|
|
|
if file_type.value != mapping.get("type", "custom"):
|
|
|
|
|
specified_type = mapping.get("type")
|
|
|
|
|
detected_file_type = _standardize_file_type(extension=extension, mime_type=mime_type)
|
|
|
|
|
|
|
|
|
|
if strict_type_validation and specified_type and detected_file_type.value != specified_type:
|
|
|
|
|
raise ValueError("Detected file type does not match the specified type. Please verify the file.")
|
|
|
|
|
|
|
|
|
|
file_type = (
|
|
|
|
|
FileType(specified_type) if specified_type and specified_type != FileType.CUSTOM.value else detected_file_type
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return File(
|
|
|
|
|
id=mapping.get("id"),
|
|
|
|
|
filename=filename,
|
|
|
|
|
@ -244,16 +250,32 @@ def _build_from_remote_url(
|
|
|
|
|
|
|
|
|
|
def _get_remote_file_info(url: str):
|
|
|
|
|
file_size = -1
|
|
|
|
|
filename = url.split("/")[-1].split("?")[0] or "unknown_file"
|
|
|
|
|
mime_type = mimetypes.guess_type(filename)[0] or ""
|
|
|
|
|
filename = ""
|
|
|
|
|
|
|
|
|
|
resp = ssrf_proxy.head(url, follow_redirects=True)
|
|
|
|
|
resp = cast(httpx.Response, resp)
|
|
|
|
|
if resp.status_code == httpx.codes.OK:
|
|
|
|
|
if content_disposition := resp.headers.get("Content-Disposition"):
|
|
|
|
|
filename = str(content_disposition.split("filename=")[-1].strip('"'))
|
|
|
|
|
file_size = int(resp.headers.get("Content-Length", file_size))
|
|
|
|
|
mime_type = mime_type or str(resp.headers.get("Content-Type", ""))
|
|
|
|
|
resp.raise_for_status()
|
|
|
|
|
|
|
|
|
|
content_disposition = resp.headers.get("Content-Disposition")
|
|
|
|
|
if content_disposition:
|
|
|
|
|
# Use regex to parse filename from content-disposition header
|
|
|
|
|
# RFC 2616, Section 19.5.1
|
|
|
|
|
filename_match = re.search(r'filename="([^"]+)"', content_disposition)
|
|
|
|
|
if filename_match:
|
|
|
|
|
filename = filename_match.group(1)
|
|
|
|
|
|
|
|
|
|
if not filename:
|
|
|
|
|
filename = url.split("/")[-1].split("?")[0] or "unknown_file"
|
|
|
|
|
|
|
|
|
|
mime_type = resp.headers.get("Content-Type", "")
|
|
|
|
|
if not mime_type:
|
|
|
|
|
mime_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"
|
|
|
|
|
else:
|
|
|
|
|
# strip charset or other parameters from mime type
|
|
|
|
|
mime_type = mime_type.split(";")[0].strip()
|
|
|
|
|
|
|
|
|
|
content_length = resp.headers.get("Content-Length")
|
|
|
|
|
if content_length and content_length.isdigit():
|
|
|
|
|
file_size = int(content_length)
|
|
|
|
|
|
|
|
|
|
return mime_type, filename, file_size
|
|
|
|
|
|
|
|
|
|
|