refactor:Refactors transaction management and datetime usage

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/21468/head
-LAN- 11 months ago
parent c74ef2b438
commit b94e622786
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -1,6 +1,6 @@
import logging
import time as time_module
from datetime import UTC, datetime
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel
@ -12,6 +12,7 @@ from core.entities.provider_entities import QuotaUnit, SystemConfiguration
from core.plugin.entities.plugin import ModelProviderID
from events.message_event import message_was_created
from extensions.ext_database import db
from libs import datetime_utils
from models.model import Message
from models.provider import Provider, ProviderType
@ -69,7 +70,7 @@ def handle(sender: Message, **kwargs):
tenant_id = application_generate_entity.app_config.tenant_id
provider_name = application_generate_entity.model_conf.provider
current_time = datetime.now(UTC).replace(tzinfo=None)
current_time = datetime_utils.naive_utc_now()
# Prepare updates for both scenarios
updates_to_perform: list[_ProviderUpdateOperation] = []
@ -171,7 +172,7 @@ def _calculate_quota_usage(
return 1
return None
except Exception as e:
logger.warning(f"Failed to calculate quota usage: {e}")
logger.exception("Failed to calculate quota usage")
return None
@ -182,57 +183,51 @@ def _execute_provider_updates(updates_to_perform: list[_ProviderUpdateOperation]
# Use SQLAlchemy's context manager for transaction management
# This automatically handles commit/rollback
try:
with db.session.begin():
# Use a single transaction for all updates
for update_operation in updates_to_perform:
filters = update_operation.filters
values = update_operation.values
additional_filters = update_operation.additional_filters
description = update_operation.description
# Build the where conditions
where_conditions = [
Provider.tenant_id == filters.tenant_id,
Provider.provider_name == filters.provider_name,
]
# Add additional filters if specified
if filters.provider_type is not None:
where_conditions.append(Provider.provider_type == filters.provider_type)
if filters.quota_type is not None:
where_conditions.append(Provider.quota_type == filters.quota_type)
if additional_filters.quota_limit_check:
where_conditions.append(Provider.quota_limit > Provider.quota_used)
# Prepare values dict for SQLAlchemy update
update_values = {}
if values.last_used is not None:
update_values["last_used"] = values.last_used
if values.quota_used is not None:
update_values["quota_used"] = values.quota_used
# Build and execute the update statement
stmt = update(Provider).where(*where_conditions).values(**update_values)
result = db.session.execute(stmt)
rows_affected = result.rowcount
logger.debug(
f"Provider update ({description}): {rows_affected} rows affected. "
f"Filters: {filters.model_dump()}, Values: {update_values}"
)
# If no rows were affected for quota updates, log a warning
if rows_affected == 0 and description == "quota_deduction_update":
logger.warning(
f"No Provider rows updated for quota deduction. "
f"This may indicate quota limit exceeded or provider not found. "
f"Filters: {filters.model_dump()}"
)
with db.session.begin():
# Use a single transaction for all updates
for update_operation in updates_to_perform:
filters = update_operation.filters
values = update_operation.values
additional_filters = update_operation.additional_filters
description = update_operation.description
# Build the where conditions
where_conditions = [
Provider.tenant_id == filters.tenant_id,
Provider.provider_name == filters.provider_name,
]
# Add additional filters if specified
if filters.provider_type is not None:
where_conditions.append(Provider.provider_type == filters.provider_type)
if filters.quota_type is not None:
where_conditions.append(Provider.quota_type == filters.quota_type)
if additional_filters.quota_limit_check:
where_conditions.append(Provider.quota_limit > Provider.quota_used)
# Prepare values dict for SQLAlchemy update
update_values = {}
if values.last_used is not None:
update_values["last_used"] = values.last_used
if values.quota_used is not None:
update_values["quota_used"] = values.quota_used
# Build and execute the update statement
stmt = update(Provider).where(*where_conditions).values(**update_values)
result = db.session.execute(stmt)
rows_affected = result.rowcount
logger.debug(
f"Provider update ({description}): {rows_affected} rows affected. "
f"Filters: {filters.model_dump()}, Values: {update_values}"
)
logger.debug(f"Successfully processed {len(updates_to_perform)} Provider updates")
# If no rows were affected for quota updates, log a warning
if rows_affected == 0 and description == "quota_deduction_update":
logger.warning(
f"No Provider rows updated for quota deduction. "
f"This may indicate quota limit exceeded or provider not found. "
f"Filters: {filters.model_dump()}"
)
except Exception as e:
# The context manager automatically handles rollback
logger.exception("Failed to update Provider")
raise
logger.debug(f"Successfully processed {len(updates_to_perform)} Provider updates")

Loading…
Cancel
Save