Merge branch 'main' into feat/tool-oauth
commit
bda76080a9
@ -1,67 +0,0 @@
|
|||||||
import base64
|
|
||||||
import logging
|
|
||||||
import time
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from configs import dify_config
|
|
||||||
from constants import IMAGE_EXTENSIONS
|
|
||||||
from core.helper.url_signer import UrlSigner
|
|
||||||
from extensions.ext_storage import storage
|
|
||||||
|
|
||||||
|
|
||||||
class UploadFileParser:
|
|
||||||
@classmethod
|
|
||||||
def get_image_data(cls, upload_file, force_url: bool = False) -> Optional[str]:
|
|
||||||
if not upload_file:
|
|
||||||
return None
|
|
||||||
|
|
||||||
if upload_file.extension not in IMAGE_EXTENSIONS:
|
|
||||||
return None
|
|
||||||
|
|
||||||
if dify_config.MULTIMODAL_SEND_FORMAT == "url" or force_url:
|
|
||||||
return cls.get_signed_temp_image_url(upload_file.id)
|
|
||||||
else:
|
|
||||||
# get image file base64
|
|
||||||
try:
|
|
||||||
data = storage.load(upload_file.key)
|
|
||||||
except FileNotFoundError:
|
|
||||||
logging.exception(f"File not found: {upload_file.key}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
encoded_string = base64.b64encode(data).decode("utf-8")
|
|
||||||
return f"data:{upload_file.mime_type};base64,{encoded_string}"
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_signed_temp_image_url(cls, upload_file_id) -> str:
|
|
||||||
"""
|
|
||||||
get signed url from upload file
|
|
||||||
|
|
||||||
:param upload_file_id: the id of UploadFile object
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
base_url = dify_config.FILES_URL
|
|
||||||
image_preview_url = f"{base_url}/files/{upload_file_id}/image-preview"
|
|
||||||
|
|
||||||
return UrlSigner.get_signed_url(url=image_preview_url, sign_key=upload_file_id, prefix="image-preview")
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def verify_image_file_signature(cls, upload_file_id: str, timestamp: str, nonce: str, sign: str) -> bool:
|
|
||||||
"""
|
|
||||||
verify signature
|
|
||||||
|
|
||||||
:param upload_file_id: file id
|
|
||||||
:param timestamp: timestamp
|
|
||||||
:param nonce: nonce
|
|
||||||
:param sign: signature
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
result = UrlSigner.verify(
|
|
||||||
sign_key=upload_file_id, timestamp=timestamp, nonce=nonce, sign=sign, prefix="image-preview"
|
|
||||||
)
|
|
||||||
|
|
||||||
# verify signature
|
|
||||||
if not result:
|
|
||||||
return False
|
|
||||||
|
|
||||||
current_time = int(time.time())
|
|
||||||
return current_time - int(timestamp) <= dify_config.FILES_ACCESS_TIMEOUT
|
|
||||||
@ -1,22 +0,0 @@
|
|||||||
from collections import OrderedDict
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
|
|
||||||
class LRUCache:
|
|
||||||
def __init__(self, capacity: int):
|
|
||||||
self.cache: OrderedDict[Any, Any] = OrderedDict()
|
|
||||||
self.capacity = capacity
|
|
||||||
|
|
||||||
def get(self, key: Any) -> Any:
|
|
||||||
if key not in self.cache:
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
self.cache.move_to_end(key) # move the key to the end of the OrderedDict
|
|
||||||
return self.cache[key]
|
|
||||||
|
|
||||||
def put(self, key: Any, value: Any) -> None:
|
|
||||||
if key in self.cache:
|
|
||||||
self.cache.move_to_end(key)
|
|
||||||
self.cache[key] = value
|
|
||||||
if len(self.cache) > self.capacity:
|
|
||||||
self.cache.popitem(last=False) # pop the first item
|
|
||||||
@ -0,0 +1,194 @@
|
|||||||
|
from unittest.mock import patch
|
||||||
|
from urllib.parse import parse_qs, urlparse
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from core.helper.url_signer import SignedUrlParams, UrlSigner
|
||||||
|
|
||||||
|
|
||||||
|
class TestUrlSigner:
|
||||||
|
"""Test cases for UrlSigner class"""
|
||||||
|
|
||||||
|
@patch("configs.dify_config.SECRET_KEY", "test-secret-key-12345")
|
||||||
|
def test_should_generate_signed_url_params(self):
|
||||||
|
"""Test generation of signed URL parameters with all required fields"""
|
||||||
|
sign_key = "test-sign-key"
|
||||||
|
prefix = "test-prefix"
|
||||||
|
|
||||||
|
params = UrlSigner.get_signed_url_params(sign_key, prefix)
|
||||||
|
|
||||||
|
# Verify the returned object and required fields
|
||||||
|
assert isinstance(params, SignedUrlParams)
|
||||||
|
assert params.sign_key == sign_key
|
||||||
|
assert params.timestamp is not None
|
||||||
|
assert params.nonce is not None
|
||||||
|
assert params.sign is not None
|
||||||
|
|
||||||
|
# Verify nonce format (32 character hex string)
|
||||||
|
assert len(params.nonce) == 32
|
||||||
|
assert all(c in "0123456789abcdef" for c in params.nonce)
|
||||||
|
|
||||||
|
@patch("configs.dify_config.SECRET_KEY", "test-secret-key-12345")
|
||||||
|
def test_should_generate_complete_signed_url(self):
|
||||||
|
"""Test generation of complete signed URL with query parameters"""
|
||||||
|
base_url = "https://example.com/api/test"
|
||||||
|
sign_key = "test-sign-key"
|
||||||
|
prefix = "test-prefix"
|
||||||
|
|
||||||
|
signed_url = UrlSigner.get_signed_url(base_url, sign_key, prefix)
|
||||||
|
|
||||||
|
# Parse URL and verify structure
|
||||||
|
parsed = urlparse(signed_url)
|
||||||
|
assert f"{parsed.scheme}://{parsed.netloc}{parsed.path}" == base_url
|
||||||
|
|
||||||
|
# Verify query parameters
|
||||||
|
query_params = parse_qs(parsed.query)
|
||||||
|
assert "timestamp" in query_params
|
||||||
|
assert "nonce" in query_params
|
||||||
|
assert "sign" in query_params
|
||||||
|
|
||||||
|
# Verify each parameter has exactly one value
|
||||||
|
assert len(query_params["timestamp"]) == 1
|
||||||
|
assert len(query_params["nonce"]) == 1
|
||||||
|
assert len(query_params["sign"]) == 1
|
||||||
|
|
||||||
|
# Verify parameter values are not empty
|
||||||
|
assert query_params["timestamp"][0]
|
||||||
|
assert query_params["nonce"][0]
|
||||||
|
assert query_params["sign"][0]
|
||||||
|
|
||||||
|
@patch("configs.dify_config.SECRET_KEY", "test-secret-key-12345")
|
||||||
|
def test_should_verify_valid_signature(self):
|
||||||
|
"""Test verification of valid signature"""
|
||||||
|
sign_key = "test-sign-key"
|
||||||
|
prefix = "test-prefix"
|
||||||
|
|
||||||
|
# Generate and verify signature
|
||||||
|
params = UrlSigner.get_signed_url_params(sign_key, prefix)
|
||||||
|
|
||||||
|
is_valid = UrlSigner.verify(
|
||||||
|
sign_key=sign_key, timestamp=params.timestamp, nonce=params.nonce, sign=params.sign, prefix=prefix
|
||||||
|
)
|
||||||
|
|
||||||
|
assert is_valid is True
|
||||||
|
|
||||||
|
@patch("configs.dify_config.SECRET_KEY", "test-secret-key-12345")
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("field", "modifier"),
|
||||||
|
[
|
||||||
|
("sign_key", lambda _: "wrong-sign-key"),
|
||||||
|
("timestamp", lambda t: str(int(t) + 1000)),
|
||||||
|
("nonce", lambda _: "different-nonce-123456789012345"),
|
||||||
|
("prefix", lambda _: "wrong-prefix"),
|
||||||
|
("sign", lambda s: s + "tampered"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_should_reject_invalid_signature_params(self, field, modifier):
|
||||||
|
"""Test signature verification rejects invalid parameters"""
|
||||||
|
sign_key = "test-sign-key"
|
||||||
|
prefix = "test-prefix"
|
||||||
|
|
||||||
|
# Generate valid signed parameters
|
||||||
|
params = UrlSigner.get_signed_url_params(sign_key, prefix)
|
||||||
|
|
||||||
|
# Prepare verification parameters
|
||||||
|
verify_params = {
|
||||||
|
"sign_key": sign_key,
|
||||||
|
"timestamp": params.timestamp,
|
||||||
|
"nonce": params.nonce,
|
||||||
|
"sign": params.sign,
|
||||||
|
"prefix": prefix,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Modify the specific field
|
||||||
|
verify_params[field] = modifier(verify_params[field])
|
||||||
|
|
||||||
|
# Verify should fail
|
||||||
|
is_valid = UrlSigner.verify(**verify_params)
|
||||||
|
assert is_valid is False
|
||||||
|
|
||||||
|
@patch("configs.dify_config.SECRET_KEY", None)
|
||||||
|
def test_should_raise_error_without_secret_key(self):
|
||||||
|
"""Test that signing fails when SECRET_KEY is not configured"""
|
||||||
|
with pytest.raises(Exception) as exc_info:
|
||||||
|
UrlSigner.get_signed_url_params("key", "prefix")
|
||||||
|
|
||||||
|
assert "SECRET_KEY is not set" in str(exc_info.value)
|
||||||
|
|
||||||
|
@patch("configs.dify_config.SECRET_KEY", "test-secret-key-12345")
|
||||||
|
def test_should_generate_unique_signatures(self):
|
||||||
|
"""Test that different inputs produce different signatures"""
|
||||||
|
params1 = UrlSigner.get_signed_url_params("key1", "prefix1")
|
||||||
|
params2 = UrlSigner.get_signed_url_params("key2", "prefix2")
|
||||||
|
|
||||||
|
# Different inputs should produce different signatures
|
||||||
|
assert params1.sign != params2.sign
|
||||||
|
assert params1.nonce != params2.nonce
|
||||||
|
|
||||||
|
@patch("configs.dify_config.SECRET_KEY", "test-secret-key-12345")
|
||||||
|
def test_should_handle_special_characters(self):
|
||||||
|
"""Test handling of special characters in parameters"""
|
||||||
|
special_cases = [
|
||||||
|
"test with spaces",
|
||||||
|
"test/with/slashes",
|
||||||
|
"test中文字符",
|
||||||
|
]
|
||||||
|
|
||||||
|
for sign_key in special_cases:
|
||||||
|
params = UrlSigner.get_signed_url_params(sign_key, "prefix")
|
||||||
|
|
||||||
|
# Should generate valid signature and verify correctly
|
||||||
|
is_valid = UrlSigner.verify(
|
||||||
|
sign_key=sign_key, timestamp=params.timestamp, nonce=params.nonce, sign=params.sign, prefix="prefix"
|
||||||
|
)
|
||||||
|
assert is_valid is True
|
||||||
|
|
||||||
|
@patch("configs.dify_config.SECRET_KEY", "test-secret-key-12345")
|
||||||
|
def test_should_ensure_nonce_randomness(self):
|
||||||
|
"""Test that nonce is random for each generation - critical for security"""
|
||||||
|
sign_key = "test-sign-key"
|
||||||
|
prefix = "test-prefix"
|
||||||
|
|
||||||
|
# Generate multiple nonces
|
||||||
|
nonces = set()
|
||||||
|
for _ in range(5):
|
||||||
|
params = UrlSigner.get_signed_url_params(sign_key, prefix)
|
||||||
|
nonces.add(params.nonce)
|
||||||
|
|
||||||
|
# All nonces should be unique
|
||||||
|
assert len(nonces) == 5
|
||||||
|
|
||||||
|
@patch("configs.dify_config.SECRET_KEY", "test-secret-key-12345")
|
||||||
|
@patch("time.time", return_value=1234567890)
|
||||||
|
@patch("os.urandom", return_value=b"\xab\xcd\xef\x12\x34\x56\x78\x90\xab\xcd\xef\x12\x34\x56\x78\x90")
|
||||||
|
def test_should_produce_consistent_signatures(self, mock_urandom, mock_time):
|
||||||
|
"""Test that same inputs produce same signature - ensures deterministic behavior"""
|
||||||
|
sign_key = "test-sign-key"
|
||||||
|
prefix = "test-prefix"
|
||||||
|
|
||||||
|
# Generate signature multiple times with same inputs (time and nonce are mocked)
|
||||||
|
params1 = UrlSigner.get_signed_url_params(sign_key, prefix)
|
||||||
|
params2 = UrlSigner.get_signed_url_params(sign_key, prefix)
|
||||||
|
|
||||||
|
# With mocked time and random, should produce identical results
|
||||||
|
assert params1.timestamp == params2.timestamp
|
||||||
|
assert params1.nonce == params2.nonce
|
||||||
|
assert params1.sign == params2.sign
|
||||||
|
|
||||||
|
# Verify the signature is valid
|
||||||
|
assert UrlSigner.verify(
|
||||||
|
sign_key=sign_key, timestamp=params1.timestamp, nonce=params1.nonce, sign=params1.sign, prefix=prefix
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch("configs.dify_config.SECRET_KEY", "test-secret-key-12345")
|
||||||
|
def test_should_handle_empty_strings(self):
|
||||||
|
"""Test handling of empty string parameters - common edge case"""
|
||||||
|
# Empty sign_key and prefix should still work
|
||||||
|
params = UrlSigner.get_signed_url_params("", "")
|
||||||
|
assert params.sign is not None
|
||||||
|
|
||||||
|
# Should verify correctly
|
||||||
|
is_valid = UrlSigner.verify(
|
||||||
|
sign_key="", timestamp=params.timestamp, nonce=params.nonce, sign=params.sign, prefix=""
|
||||||
|
)
|
||||||
|
assert is_valid is True
|
||||||
@ -0,0 +1,65 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from libs.helper import extract_tenant_id
|
||||||
|
from models.account import Account
|
||||||
|
from models.model import EndUser
|
||||||
|
|
||||||
|
|
||||||
|
class TestExtractTenantId:
|
||||||
|
"""Test cases for the extract_tenant_id utility function."""
|
||||||
|
|
||||||
|
def test_extract_tenant_id_from_account_with_tenant(self):
|
||||||
|
"""Test extracting tenant_id from Account with current_tenant_id."""
|
||||||
|
# Create a mock Account object
|
||||||
|
account = Account()
|
||||||
|
# Mock the current_tenant_id property
|
||||||
|
account._current_tenant = type("MockTenant", (), {"id": "account-tenant-123"})()
|
||||||
|
|
||||||
|
tenant_id = extract_tenant_id(account)
|
||||||
|
assert tenant_id == "account-tenant-123"
|
||||||
|
|
||||||
|
def test_extract_tenant_id_from_account_without_tenant(self):
|
||||||
|
"""Test extracting tenant_id from Account without current_tenant_id."""
|
||||||
|
# Create a mock Account object
|
||||||
|
account = Account()
|
||||||
|
account._current_tenant = None
|
||||||
|
|
||||||
|
tenant_id = extract_tenant_id(account)
|
||||||
|
assert tenant_id is None
|
||||||
|
|
||||||
|
def test_extract_tenant_id_from_enduser_with_tenant(self):
|
||||||
|
"""Test extracting tenant_id from EndUser with tenant_id."""
|
||||||
|
# Create a mock EndUser object
|
||||||
|
end_user = EndUser()
|
||||||
|
end_user.tenant_id = "enduser-tenant-456"
|
||||||
|
|
||||||
|
tenant_id = extract_tenant_id(end_user)
|
||||||
|
assert tenant_id == "enduser-tenant-456"
|
||||||
|
|
||||||
|
def test_extract_tenant_id_from_enduser_without_tenant(self):
|
||||||
|
"""Test extracting tenant_id from EndUser without tenant_id."""
|
||||||
|
# Create a mock EndUser object
|
||||||
|
end_user = EndUser()
|
||||||
|
end_user.tenant_id = None
|
||||||
|
|
||||||
|
tenant_id = extract_tenant_id(end_user)
|
||||||
|
assert tenant_id is None
|
||||||
|
|
||||||
|
def test_extract_tenant_id_with_invalid_user_type(self):
|
||||||
|
"""Test extracting tenant_id with invalid user type raises ValueError."""
|
||||||
|
invalid_user = "not_a_user_object"
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Invalid user type.*Expected Account or EndUser"):
|
||||||
|
extract_tenant_id(invalid_user)
|
||||||
|
|
||||||
|
def test_extract_tenant_id_with_none_user(self):
|
||||||
|
"""Test extracting tenant_id with None user raises ValueError."""
|
||||||
|
with pytest.raises(ValueError, match="Invalid user type.*Expected Account or EndUser"):
|
||||||
|
extract_tenant_id(None)
|
||||||
|
|
||||||
|
def test_extract_tenant_id_with_dict_user(self):
|
||||||
|
"""Test extracting tenant_id with dict user raises ValueError."""
|
||||||
|
dict_user = {"id": "123", "tenant_id": "456"}
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Invalid user type.*Expected Account or EndUser"):
|
||||||
|
extract_tenant_id(dict_user)
|
||||||
@ -1,3 +1,7 @@
|
|||||||
|
import { ALLOW_UNSAFE_DATA_SCHEME } from '@/config'
|
||||||
|
|
||||||
export const isValidUrl = (url: string): boolean => {
|
export const isValidUrl = (url: string): boolean => {
|
||||||
return ['http:', 'https:', '//', 'mailto:'].some(prefix => url.startsWith(prefix))
|
const validPrefixes = ['http:', 'https:', '//', 'mailto:']
|
||||||
|
if (ALLOW_UNSAFE_DATA_SCHEME) validPrefixes.push('data:')
|
||||||
|
return validPrefixes.some(prefix => url.startsWith(prefix))
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue