From b94e6227868c270685d0332130ef879510066a9a Mon Sep 17 00:00:00 2001 From: -LAN- Date: Wed, 25 Jun 2025 16:08:40 +0800 Subject: [PATCH] refactor:Refactors transaction management and datetime usage Signed-off-by: -LAN- --- .../update_provider_when_message_created.py | 105 +++++++++--------- 1 file changed, 50 insertions(+), 55 deletions(-) diff --git a/api/events/event_handlers/update_provider_when_message_created.py b/api/events/event_handlers/update_provider_when_message_created.py index 654ab56a2e..c64e842160 100644 --- a/api/events/event_handlers/update_provider_when_message_created.py +++ b/api/events/event_handlers/update_provider_when_message_created.py @@ -1,6 +1,6 @@ import logging import time as time_module -from datetime import UTC, datetime +from datetime import datetime from typing import Any, Optional from pydantic import BaseModel @@ -12,6 +12,7 @@ from core.entities.provider_entities import QuotaUnit, SystemConfiguration from core.plugin.entities.plugin import ModelProviderID from events.message_event import message_was_created from extensions.ext_database import db +from libs import datetime_utils from models.model import Message from models.provider import Provider, ProviderType @@ -69,7 +70,7 @@ def handle(sender: Message, **kwargs): tenant_id = application_generate_entity.app_config.tenant_id provider_name = application_generate_entity.model_conf.provider - current_time = datetime.now(UTC).replace(tzinfo=None) + current_time = datetime_utils.naive_utc_now() # Prepare updates for both scenarios updates_to_perform: list[_ProviderUpdateOperation] = [] @@ -171,7 +172,7 @@ def _calculate_quota_usage( return 1 return None except Exception as e: - logger.warning(f"Failed to calculate quota usage: {e}") + logger.exception("Failed to calculate quota usage") return None @@ -182,57 +183,51 @@ def _execute_provider_updates(updates_to_perform: list[_ProviderUpdateOperation] # Use SQLAlchemy's context manager for transaction management # This automatically handles commit/rollback - try: - with db.session.begin(): - # Use a single transaction for all updates - for update_operation in updates_to_perform: - filters = update_operation.filters - values = update_operation.values - additional_filters = update_operation.additional_filters - description = update_operation.description - - # Build the where conditions - where_conditions = [ - Provider.tenant_id == filters.tenant_id, - Provider.provider_name == filters.provider_name, - ] - - # Add additional filters if specified - if filters.provider_type is not None: - where_conditions.append(Provider.provider_type == filters.provider_type) - if filters.quota_type is not None: - where_conditions.append(Provider.quota_type == filters.quota_type) - if additional_filters.quota_limit_check: - where_conditions.append(Provider.quota_limit > Provider.quota_used) - - # Prepare values dict for SQLAlchemy update - update_values = {} - if values.last_used is not None: - update_values["last_used"] = values.last_used - if values.quota_used is not None: - update_values["quota_used"] = values.quota_used - - # Build and execute the update statement - stmt = update(Provider).where(*where_conditions).values(**update_values) - result = db.session.execute(stmt) - rows_affected = result.rowcount - - logger.debug( - f"Provider update ({description}): {rows_affected} rows affected. " - f"Filters: {filters.model_dump()}, Values: {update_values}" - ) - - # If no rows were affected for quota updates, log a warning - if rows_affected == 0 and description == "quota_deduction_update": - logger.warning( - f"No Provider rows updated for quota deduction. " - f"This may indicate quota limit exceeded or provider not found. " - f"Filters: {filters.model_dump()}" - ) + with db.session.begin(): + # Use a single transaction for all updates + for update_operation in updates_to_perform: + filters = update_operation.filters + values = update_operation.values + additional_filters = update_operation.additional_filters + description = update_operation.description + + # Build the where conditions + where_conditions = [ + Provider.tenant_id == filters.tenant_id, + Provider.provider_name == filters.provider_name, + ] + + # Add additional filters if specified + if filters.provider_type is not None: + where_conditions.append(Provider.provider_type == filters.provider_type) + if filters.quota_type is not None: + where_conditions.append(Provider.quota_type == filters.quota_type) + if additional_filters.quota_limit_check: + where_conditions.append(Provider.quota_limit > Provider.quota_used) + + # Prepare values dict for SQLAlchemy update + update_values = {} + if values.last_used is not None: + update_values["last_used"] = values.last_used + if values.quota_used is not None: + update_values["quota_used"] = values.quota_used + + # Build and execute the update statement + stmt = update(Provider).where(*where_conditions).values(**update_values) + result = db.session.execute(stmt) + rows_affected = result.rowcount + + logger.debug( + f"Provider update ({description}): {rows_affected} rows affected. " + f"Filters: {filters.model_dump()}, Values: {update_values}" + ) - logger.debug(f"Successfully processed {len(updates_to_perform)} Provider updates") + # If no rows were affected for quota updates, log a warning + if rows_affected == 0 and description == "quota_deduction_update": + logger.warning( + f"No Provider rows updated for quota deduction. " + f"This may indicate quota limit exceeded or provider not found. " + f"Filters: {filters.model_dump()}" + ) - except Exception as e: - # The context manager automatically handles rollback - logger.exception("Failed to update Provider") - raise + logger.debug(f"Successfully processed {len(updates_to_perform)} Provider updates")