feat(libs): add pure Python UUIDv7 implementation

Add native Python implementation of UUIDv7 generation algorithm with
comprehensive test coverage for time-ordered UUID generation.
pull/22058/head
QuantumGhost 11 months ago
parent 30820de5a3
commit 8a665302f8

@ -0,0 +1,164 @@
import secrets
import struct
import time
import uuid
# Reference for UUIDv7 specification:
# RFC 9562, Section 5.7 - https://www.rfc-editor.org/rfc/rfc9562.html#section-5.7
# Define the format for packing the timestamp as an unsigned 64-bit integer (big-endian).
#
# For details on the `struct.pack` format, refer to:
# https://docs.python.org/3/library/struct.html#byte-order-size-and-alignment
_PACK_TIMESTAMP = ">Q"
# Define the format for packing the 12-bit random data A (as specified in RFC 9562 Section 5.7)
# into an unsigned 16-bit integer (big-endian).
_PACK_RAND_A = ">H"
def _create_uuidv7_bytes(timestamp_ms: int, random_bytes: bytes) -> bytes:
"""Create UUIDv7 byte structure with given timestamp and random bytes.
This is a private helper function that handles the common logic for creating
UUIDv7 byte structure according to RFC 9562 specification.
UUIDv7 Structure:
- 48 bits: timestamp (milliseconds since Unix epoch)
- 12 bits: random data A (with version bits)
- 62 bits: random data B (with variant bits)
The function performs the following operations:
1. Creates a 128-bit (16-byte) UUID structure
2. Packs the timestamp into the first 48 bits (6 bytes)
3. Sets the version bits to 7 (0111) in the correct position
4. Sets the variant bits to 10 (binary) in the correct position
5. Fills the remaining bits with the provided random bytes
Args:
timestamp_ms: The timestamp in milliseconds since Unix epoch (48 bits).
random_bytes: Random bytes to use for the random portions (must be 10 bytes).
First 2 bytes are used for random data A (12 bits after version).
Last 8 bytes are used for random data B (62 bits after variant).
Returns:
A 16-byte bytes object representing the complete UUIDv7 structure.
Note:
This function assumes the random_bytes parameter is exactly 10 bytes.
The caller is responsible for providing appropriate random data.
"""
# Create the 128-bit UUID structure
uuid_bytes = bytearray(16)
# Pack timestamp (48 bits) into first 6 bytes
uuid_bytes[0:6] = struct.pack(_PACK_TIMESTAMP, timestamp_ms)[2:8] # Take last 6 bytes of 8-byte big-endian
# Next 16 bits: random data A (12 bits) + version (4 bits)
# Take first 2 random bytes and set version to 7
rand_a = struct.unpack(_PACK_RAND_A, random_bytes[0:2])[0]
# Clear the highest 4 bits to make room for the version field
# by performing a bitwise AND with 0x0FFF (binary: 0b0000_1111_1111_1111).
rand_a = rand_a & 0x0FFF
# Set the version field to 7 (binary: 0111) by performing a bitwise OR with 0x7000 (binary: 0b0111_0000_0000_0000).
rand_a = rand_a | 0x7000
uuid_bytes[6:8] = struct.pack(_PACK_RAND_A, rand_a)
# Last 64 bits: random data B (62 bits) + variant (2 bits)
# Use remaining 8 random bytes and set variant to 10 (binary)
uuid_bytes[8:16] = random_bytes[2:10]
# Set variant bits (first 2 bits of byte 8 should be '10')
uuid_bytes[8] = (uuid_bytes[8] & 0x3F) | 0x80 # Set variant to 10xxxxxx
return bytes(uuid_bytes)
def uuidv7(timestamp_ms: int | None = None) -> uuid.UUID:
"""Generate a UUID version 7 according to RFC 9562 specification.
UUIDv7 features a time-ordered value field derived from the widely
implemented and well known Unix Epoch timestamp source, the number of
milliseconds since midnight 1 Jan 1970 UTC, leap seconds excluded.
Structure:
- 48 bits: timestamp (milliseconds since Unix epoch)
- 12 bits: random data A (with version bits)
- 62 bits: random data B (with variant bits)
Args:
timestamp_ms: The timestamp used when generating UUID, use the current time if unspecified.
Should be an integer representing milliseconds since Unix epoch.
Returns:
A UUID object representing a UUIDv7.
Example:
>>> import time
>>> # Generate UUIDv7 with current time
>>> uuid_current = uuidv7()
>>> # Generate UUIDv7 with specific timestamp
>>> uuid_specific = uuidv7(int(time.time() * 1000))
"""
if timestamp_ms is None:
timestamp_ms = int(time.time() * 1000)
# Generate 10 random bytes for the random portions
random_bytes = secrets.token_bytes(10)
# Create UUIDv7 bytes using the helper function
uuid_bytes = _create_uuidv7_bytes(timestamp_ms, random_bytes)
return uuid.UUID(bytes=uuid_bytes)
def uuidv7_timestamp(id_: uuid.UUID) -> int:
"""Extract the timestamp from a UUIDv7.
UUIDv7 contains a 48-bit timestamp field representing milliseconds since
the Unix epoch (1970-01-01 00:00:00 UTC). This function extracts and
returns that timestamp as an integer representing milliseconds since the epoch.
Args:
id_: A UUID object that should be a UUIDv7 (version 7).
Returns:
The timestamp as an integer representing milliseconds since Unix epoch.
Raises:
ValueError: If the provided UUID is not version 7.
Example:
>>> uuid_v7 = uuidv7()
>>> timestamp = uuidv7_timestamp(uuid_v7)
>>> print(f"UUID was created at: {timestamp} ms")
"""
# Verify this is a UUIDv7
if id_.version != 7:
raise ValueError(f"Expected UUIDv7 (version 7), got version {id_.version}")
# Extract the UUID bytes
uuid_bytes = id_.bytes
# Extract the first 48 bits (6 bytes) as the timestamp in milliseconds
# Pad with 2 zero bytes at the beginning to make it 8 bytes for unpacking as Q (unsigned long long)
timestamp_bytes = b"\x00\x00" + uuid_bytes[0:6]
ts_in_ms = struct.unpack(_PACK_TIMESTAMP, timestamp_bytes)[0]
# Return timestamp directly in milliseconds as integer
assert isinstance(ts_in_ms, int)
return ts_in_ms
def uuidv7_boundary(timestamp_ms: int) -> uuid.UUID:
"""Generate a non-random uuidv7 with the given timestamp (first 48 bits) and
all random bits to 0. As the smallest possible uuidv7 for that timestamp,
it may be used as a boundary for partitions.
"""
# Use zero bytes for all random portions
zero_random_bytes = b"\x00" * 10
# Create UUIDv7 bytes using the helper function
uuid_bytes = _create_uuidv7_bytes(timestamp_ms, zero_random_bytes)
return uuid.UUID(bytes=uuid_bytes)

@ -0,0 +1,351 @@
import struct
import time
import uuid
from unittest import mock
import pytest
from hypothesis import given
from hypothesis import strategies as st
from libs.uuid_utils import _create_uuidv7_bytes, uuidv7, uuidv7_boundary, uuidv7_timestamp
# Tests for private helper function _create_uuidv7_bytes
def test_create_uuidv7_bytes_basic_structure():
"""Test basic byte structure creation."""
timestamp_ms = 1609459200000 # 2021-01-01 00:00:00 UTC in milliseconds
random_bytes = b"\x12\x34\x56\x78\x9a\xbc\xde\xf0\x11\x22"
result = _create_uuidv7_bytes(timestamp_ms, random_bytes)
# Should be exactly 16 bytes
assert len(result) == 16
assert isinstance(result, bytes)
# Create UUID from bytes to verify it's valid
uuid_obj = uuid.UUID(bytes=result)
assert uuid_obj.version == 7
def test_create_uuidv7_bytes_timestamp_encoding():
"""Test timestamp is correctly encoded in first 48 bits."""
timestamp_ms = 1609459200000 # 2021-01-01 00:00:00 UTC in milliseconds
random_bytes = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
result = _create_uuidv7_bytes(timestamp_ms, random_bytes)
# Extract timestamp from first 6 bytes
timestamp_bytes = b"\x00\x00" + result[0:6]
extracted_timestamp = struct.unpack(">Q", timestamp_bytes)[0]
assert extracted_timestamp == timestamp_ms
def test_create_uuidv7_bytes_version_bits():
"""Test version bits are set to 7."""
timestamp_ms = 1609459200000
random_bytes = b"\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00" # Set first 2 bytes to all 1s
result = _create_uuidv7_bytes(timestamp_ms, random_bytes)
# Extract version from bytes 6-7
version_and_rand_a = struct.unpack(">H", result[6:8])[0]
version = (version_and_rand_a >> 12) & 0x0F
assert version == 7
def test_create_uuidv7_bytes_variant_bits():
"""Test variant bits are set correctly."""
timestamp_ms = 1609459200000
random_bytes = b"\x00\x00\xff\x00\x00\x00\x00\x00\x00\x00" # Set byte 8 to all 1s
result = _create_uuidv7_bytes(timestamp_ms, random_bytes)
# Check variant bits in byte 8 (should be 10xxxxxx)
variant_byte = result[8]
variant_bits = (variant_byte >> 6) & 0b11
assert variant_bits == 0b10 # Should be binary 10
def test_create_uuidv7_bytes_random_data():
"""Test random bytes are placed correctly."""
timestamp_ms = 1609459200000
random_bytes = b"\x12\x34\x56\x78\x9a\xbc\xde\xf0\x11\x22"
result = _create_uuidv7_bytes(timestamp_ms, random_bytes)
# Check random data A (12 bits from bytes 6-7, excluding version)
version_and_rand_a = struct.unpack(">H", result[6:8])[0]
rand_a = version_and_rand_a & 0x0FFF
expected_rand_a = struct.unpack(">H", random_bytes[0:2])[0] & 0x0FFF
assert rand_a == expected_rand_a
# Check random data B (bytes 8-15, with variant bits preserved)
# Byte 8 should have variant bits set but preserve lower 6 bits
expected_byte_8 = (random_bytes[2] & 0x3F) | 0x80
assert result[8] == expected_byte_8
# Bytes 9-15 should match random_bytes[3:10]
assert result[9:16] == random_bytes[3:10]
def test_create_uuidv7_bytes_zero_random():
"""Test with zero random bytes (boundary case)."""
timestamp_ms = 1609459200000
zero_random_bytes = b"\x00" * 10
result = _create_uuidv7_bytes(timestamp_ms, zero_random_bytes)
# Should still be valid UUIDv7
uuid_obj = uuid.UUID(bytes=result)
assert uuid_obj.version == 7
# Version bits should be 0x7000
version_and_rand_a = struct.unpack(">H", result[6:8])[0]
assert version_and_rand_a == 0x7000
# Variant byte should be 0x80 (variant bits + zero random bits)
assert result[8] == 0x80
# Remaining bytes should be zero
assert result[9:16] == b"\x00" * 7
def test_uuidv7_basic_generation():
"""Test basic UUID generation produces valid UUIDv7."""
result = uuidv7()
# Should be a UUID object
assert isinstance(result, uuid.UUID)
# Should be version 7
assert result.version == 7
# Should have correct variant (RFC 4122 variant)
# Variant bits should be 10xxxxxx (0x80-0xBF range)
variant_byte = result.bytes[8]
assert (variant_byte >> 6) == 0b10
def test_uuidv7_with_custom_timestamp():
"""Test UUID generation with custom timestamp."""
custom_timestamp = 1609459200000 # 2021-01-01 00:00:00 UTC in milliseconds
result = uuidv7(custom_timestamp)
assert isinstance(result, uuid.UUID)
assert result.version == 7
# Extract and verify timestamp
extracted_timestamp = uuidv7_timestamp(result)
assert isinstance(extracted_timestamp, int)
assert extracted_timestamp == custom_timestamp # Exact match for integer milliseconds
def test_uuidv7_with_none_timestamp(monkeypatch):
"""Test UUID generation with None timestamp uses current time."""
mock_time = 1609459200
mock_time_func = mock.Mock(return_value=mock_time)
monkeypatch.setattr("time.time", mock_time_func)
result = uuidv7(None)
assert isinstance(result, uuid.UUID)
assert result.version == 7
# Should use the mocked current time (converted to milliseconds)
assert mock_time_func.called
extracted_timestamp = uuidv7_timestamp(result)
assert extracted_timestamp == mock_time * 1000 # 1609459200.0 * 1000
def test_uuidv7_time_ordering():
"""Test that sequential UUIDs have increasing timestamps."""
# Generate UUIDs with incrementing timestamps (in milliseconds)
timestamp1 = 1609459200000 # 2021-01-01 00:00:00 UTC
timestamp2 = 1609459201000 # 2021-01-01 00:00:01 UTC
timestamp3 = 1609459202000 # 2021-01-01 00:00:02 UTC
uuid1 = uuidv7(timestamp1)
uuid2 = uuidv7(timestamp2)
uuid3 = uuidv7(timestamp3)
# Extract timestamps
ts1 = uuidv7_timestamp(uuid1)
ts2 = uuidv7_timestamp(uuid2)
ts3 = uuidv7_timestamp(uuid3)
# Should be in ascending order
assert ts1 < ts2 < ts3
# UUIDs should be lexicographically ordered by their string representation
# due to time-ordering property of UUIDv7
uuid_strings = [str(uuid1), str(uuid2), str(uuid3)]
assert uuid_strings == sorted(uuid_strings)
def test_uuidv7_uniqueness():
"""Test that multiple calls generate different UUIDs."""
# Generate multiple UUIDs with the same timestamp (in milliseconds)
timestamp = 1609459200000
uuids = [uuidv7(timestamp) for _ in range(100)]
# All should be unique despite same timestamp (due to random bits)
assert len(set(uuids)) == 100
# All should have the same extracted timestamp
for uuid_obj in uuids:
extracted_ts = uuidv7_timestamp(uuid_obj)
assert extracted_ts == timestamp
def test_uuidv7_timestamp_error_handling_wrong_version():
"""Test error handling for non-UUIDv7 inputs."""
uuid_v4 = uuid.uuid4()
with pytest.raises(ValueError) as exc_ctx:
uuidv7_timestamp(uuid_v4)
assert "Expected UUIDv7 (version 7)" in str(exc_ctx.value)
assert f"got version {uuid_v4.version}" in str(exc_ctx.value)
@given(st.integers(max_value=2**48 - 1, min_value=0))
def test_uuidv7_timestamp_round_trip(timestamp_ms):
# Generate UUID with timestamp
uuid_obj = uuidv7(timestamp_ms)
# Extract timestamp back
extracted_timestamp = uuidv7_timestamp(uuid_obj)
# Should match exactly for integer millisecond timestamps
assert extracted_timestamp == timestamp_ms
def test_uuidv7_timestamp_edge_cases():
"""Test timestamp extraction with edge case values."""
# Test with very small timestamp
small_timestamp = 1 # 1ms after epoch
uuid_small = uuidv7(small_timestamp)
extracted_small = uuidv7_timestamp(uuid_small)
assert extracted_small == small_timestamp
# Test with large timestamp (year 2038+)
large_timestamp = 2147483647000 # 2038-01-19 03:14:07 UTC in milliseconds
uuid_large = uuidv7(large_timestamp)
extracted_large = uuidv7_timestamp(uuid_large)
assert extracted_large == large_timestamp
def test_uuidv7_boundary_basic_generation():
"""Test basic boundary UUID generation with a known timestamp."""
timestamp = 1609459200000 # 2021-01-01 00:00:00 UTC in milliseconds
result = uuidv7_boundary(timestamp)
# Should be a UUID object
assert isinstance(result, uuid.UUID)
# Should be version 7
assert result.version == 7
# Should have correct variant (RFC 4122 variant)
# Variant bits should be 10xxxxxx (0x80-0xBF range)
variant_byte = result.bytes[8]
assert (variant_byte >> 6) == 0b10
def test_uuidv7_boundary_timestamp_extraction():
"""Test that boundary UUID timestamp can be extracted correctly."""
timestamp = 1609459200000 # 2021-01-01 00:00:00 UTC in milliseconds
boundary_uuid = uuidv7_boundary(timestamp)
# Extract timestamp using existing function
extracted_timestamp = uuidv7_timestamp(boundary_uuid)
# Should match exactly
assert extracted_timestamp == timestamp
def test_uuidv7_boundary_deterministic():
"""Test that boundary UUIDs are deterministic for same timestamp."""
timestamp = 1609459200000 # 2021-01-01 00:00:00 UTC in milliseconds
# Generate multiple boundary UUIDs with same timestamp
uuid1 = uuidv7_boundary(timestamp)
uuid2 = uuidv7_boundary(timestamp)
uuid3 = uuidv7_boundary(timestamp)
# Should all be identical
assert uuid1 == uuid2 == uuid3
assert str(uuid1) == str(uuid2) == str(uuid3)
def test_uuidv7_boundary_is_minimum():
"""Test that boundary UUID is lexicographically smaller than regular UUIDs."""
timestamp = 1609459200000 # 2021-01-01 00:00:00 UTC in milliseconds
# Generate boundary UUID
boundary_uuid = uuidv7_boundary(timestamp)
# Generate multiple regular UUIDs with same timestamp
regular_uuids = [uuidv7(timestamp) for _ in range(50)]
# Boundary UUID should be lexicographically smaller than all regular UUIDs
boundary_str = str(boundary_uuid)
for regular_uuid in regular_uuids:
regular_str = str(regular_uuid)
assert boundary_str < regular_str, f"Boundary {boundary_str} should be < regular {regular_str}"
# Also test with bytes comparison
boundary_bytes = boundary_uuid.bytes
for regular_uuid in regular_uuids:
regular_bytes = regular_uuid.bytes
assert boundary_bytes < regular_bytes
def test_uuidv7_boundary_different_timestamps():
"""Test that boundary UUIDs with different timestamps are ordered correctly."""
timestamp1 = 1609459200000 # 2021-01-01 00:00:00 UTC
timestamp2 = 1609459201000 # 2021-01-01 00:00:01 UTC
timestamp3 = 1609459202000 # 2021-01-01 00:00:02 UTC
uuid1 = uuidv7_boundary(timestamp1)
uuid2 = uuidv7_boundary(timestamp2)
uuid3 = uuidv7_boundary(timestamp3)
# Extract timestamps to verify
ts1 = uuidv7_timestamp(uuid1)
ts2 = uuidv7_timestamp(uuid2)
ts3 = uuidv7_timestamp(uuid3)
# Should be in ascending order
assert ts1 < ts2 < ts3
# UUIDs should be lexicographically ordered
uuid_strings = [str(uuid1), str(uuid2), str(uuid3)]
assert uuid_strings == sorted(uuid_strings)
# Bytes should also be ordered
assert uuid1.bytes < uuid2.bytes < uuid3.bytes
def test_uuidv7_boundary_edge_cases():
"""Test boundary UUID generation with edge case timestamp values."""
# Test with timestamp 0 (Unix epoch)
epoch_uuid = uuidv7_boundary(0)
assert isinstance(epoch_uuid, uuid.UUID)
assert epoch_uuid.version == 7
assert uuidv7_timestamp(epoch_uuid) == 0
# Test with very large timestamp values
large_timestamp = 2147483647000 # 2038-01-19 03:14:07 UTC in milliseconds
large_uuid = uuidv7_boundary(large_timestamp)
assert isinstance(large_uuid, uuid.UUID)
assert large_uuid.version == 7
assert uuidv7_timestamp(large_uuid) == large_timestamp
# Test with current time
current_time = int(time.time() * 1000)
current_uuid = uuidv7_boundary(current_time)
assert isinstance(current_uuid, uuid.UUID)
assert current_uuid.version == 7
assert uuidv7_timestamp(current_uuid) == current_time
Loading…
Cancel
Save