Compare commits

..

No commits in common. '8a2f37bb208214f883b83d82837b899f21b642eb' and '6d94c43685583466df92de435fc450aee8d43c58' have entirely different histories.

@ -3,18 +3,20 @@ import inspect
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Any, ParamSpec, TypeVar, Callable from typing import Any, ParamSpec, TypeVar, Callable
from sqlalchemy import Executable, Result, and_, Select, Delete, Update from sqlalchemy import Executable, Result, Select, Delete, Update, column, and_
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.sql.selectable import Subquery
from common.constant import Constant from common.constant import Constant
from common.global_enums import IsDelete from common.global_enums import IsDelete
from config import settings from config import settings
from entity.base_entity import DbBaseModel
P = ParamSpec('P') P = ParamSpec('P')
T = TypeVar('T') T = TypeVar('T')
engine = create_async_engine(settings.database_url, echo=False, # 打印SQL日志生产环境建议关闭 engine = create_async_engine(
settings.database_url,
echo=False, # 打印SQL日志生产环境建议关闭
pool_size=10, # 连接池大小 pool_size=10, # 连接池大小
max_overflow=20, # 最大溢出连接数 max_overflow=20, # 最大溢出连接数
pool_recycle=3600, # 连接回收时间解决MySQL超时断开问题【4†source】【5†source】 pool_recycle=3600, # 连接回收时间解决MySQL超时断开问题【4†source】【5†source】
@ -22,115 +24,121 @@ engine = create_async_engine(settings.database_url, echo=False, # 打印SQL日
# 创建异步会话工厂 # 创建异步会话工厂
class EnhanceAsyncSession(AsyncSession): class EnhanceAsyncSession(AsyncSession):
async def scalar(self, statement: Executable,
params=None,
*,
execution_options=None,
bind_arguments=None,
**kw: Any, ):
def _add_logical_delete_condition(self, statement: Select) -> Select: sig = inspect.signature(super().scalar)
""" if execution_options is None:
Select 语句添加逻辑删除条件 default_execution_options = sig.parameters['execution_options'].default
支持递归处理 fastapi-pagination 生成的子查询包装 execution_options = default_execution_options
""" delete_condition = column(Constant.LOGICAL_DELETE_FIELD) == IsDelete.NO_DELETE
# 特殊处理:如果当前查询只有一个子查询,直接修改其内部元素
# 这样可以避免产生重复的别名
if len(statement.froms) == 1 and isinstance(statement.froms[0], Subquery):
subquery = statement.froms[0]
# 递归处理子查询内部
processed_inner = self._add_logical_delete_condition(subquery.element)
# 如果内部有修改,直接替换 subquery 的 element
if processed_inner is not subquery.element:
subquery.element = processed_inner
return statement
# 处理当前层的表
delete_condition = None
delete_field = Constant.LOGICAL_DELETE_FIELD
for from_obj in statement.froms:
# 跳过子查询(因为会通过上面的逻辑递归处理)
if isinstance(from_obj, Subquery):
continue
# 只处理 Table 对象
if hasattr(from_obj, 'columns') and delete_field in from_obj.columns:
# 使用表对象上的列
condition = from_obj.columns[delete_field] == IsDelete.NO_DELETE
if delete_condition is None:
delete_condition = condition
else:
delete_condition = and_(delete_condition, condition)
# 如果有条件,则应用到当前 statement
if delete_condition is not None:
existing_condition = statement.whereclause existing_condition = statement.whereclause
# 组合条件
if existing_condition is not None: if existing_condition is not None:
# 使用and_组合现有条件和逻辑删除条件
new_condition = and_(existing_condition, delete_condition) new_condition = and_(existing_condition, delete_condition)
else: else:
new_condition = delete_condition new_condition = delete_condition
# 应用新条件创建新的Select对象
statement = statement.where(new_condition) statement = statement.where(new_condition)
return statement
async def scalar(self, statement: Executable, params=None, *, execution_options=None, bind_arguments=None,
**kw: Any, ):
sig = inspect.signature(super().scalar)
if execution_options is None:
default_execution_options = sig.parameters['execution_options'].default
execution_options = default_execution_options
# 只对 Select 语句添加逻辑删除条件
if isinstance(statement, Select):
statement = self._add_logical_delete_condition(statement)
return await super().scalar(statement, params, execution_options=execution_options, return await super().scalar(statement, params, execution_options=execution_options,
bind_arguments=bind_arguments, **kw) bind_arguments=bind_arguments, **kw)
async def execute(self, statement: Executable, params=None, *, execution_options=None, bind_arguments=None, async def execute(
**kw: Any, ) -> Result[Any]: self,
statement: Executable,
params=None,
*,
execution_options=None,
bind_arguments=None,
**kw: Any,
) -> Result[Any]:
sig = inspect.signature(super().execute) sig = inspect.signature(super().execute)
if execution_options is None: if execution_options is None:
default_execution_options = sig.parameters['execution_options'].default default_execution_options = sig.parameters['execution_options'].default
execution_options = default_execution_options execution_options = default_execution_options
# print("type(statement):{}", type(statement))
if isinstance(statement, Select): if isinstance(statement, Select):
statement = self._add_logical_delete_condition(statement) # print("这是查询语句,过滤逻辑删除")
delete_condition = column(Constant.LOGICAL_DELETE_FIELD) == IsDelete.NO_DELETE
# 获取现有条件
existing_condition = statement.whereclause
# 组合条件
if existing_condition is not None:
# 使用and_组合现有条件和逻辑删除条件
new_condition = and_(existing_condition, delete_condition)
else:
new_condition = delete_condition
# 应用新条件创建新的Select对象
statement = statement.where(new_condition)
if isinstance(statement, Delete): if isinstance(statement, Delete):
# 检查是否跳过软删除通过execution_options控制
skip_soft_delete = execution_options and execution_options.get("skip_soft_delete", False) skip_soft_delete = execution_options and execution_options.get("skip_soft_delete", False)
if not skip_soft_delete: if not skip_soft_delete:
# 获取表对象
table = statement.table table = statement.table
if hasattr(table, 'columns') and Constant.LOGICAL_DELETE_FIELD in table.columns: # 构建更新语句
update_stmt = (Update(table).where(statement.whereclause).values( update_stmt = (
**{Constant.LOGICAL_DELETE_FIELD: IsDelete.DELETE})) Update(table)
.where(statement.whereclause) # 保留原删除条件
.values(**{Constant.LOGICAL_DELETE_FIELD: IsDelete.DELETE}) # 设置软删除标记
)
# 如果原删除语句有RETURNING子句也添加到更新语句中
if statement._returning: if statement._returning:
update_stmt = update_stmt.returning(*statement._returning) update_stmt = update_stmt.returning(*statement._returning)
# 执行更新语句
return await super().execute(update_stmt, params=params, execution_options=execution_options, return await super().execute(
bind_arguments=bind_arguments, **kw) update_stmt,
params=params,
result = await super().execute(statement, params=params, execution_options=execution_options, execution_options=execution_options,
bind_arguments=bind_arguments, **kw, ) bind_arguments=bind_arguments,
**kw
)
result = await super().execute(
statement,
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
**kw,
)
return result return result
# 重写delete方法处理单个对象删除
def delete(self, instance): def delete(self, instance):
from sqlalchemy import inspect as sqlalchemy_inspect from sqlalchemy import inspect
# 检查是否有逻辑删除属性
if hasattr(instance, Constant.LOGICAL_DELETE_FIELD): if hasattr(instance, Constant.LOGICAL_DELETE_FIELD):
setattr(instance, Constant.LOGICAL_DELETE_FIELD, IsDelete.DELETE) # 设置软删除标记
instance.__setattr__(Constant.LOGICAL_DELETE_FIELD, IsDelete.DELETE)
insp = sqlalchemy_inspect(instance) # 确保对象在会话中(如果已分离则重新关联)
if insp.detached or insp.transient: # 检查对象状态
insp = inspect(instance)
if insp.detached:
# 如果对象是分离的,则重新加入会话
self.add(instance)
elif insp.transient:
# 如果是瞬态对象,也添加到会话
self.add(instance) self.add(instance)
# 标记对象为已修改(触发更新)
# self.expire(instance, [Constant.LOGICAL_DELETE_FIELD])
else: else:
# 如果没有逻辑删除属性,执行标准删除
super().delete(instance) super().delete(instance)
AsyncSessionLocal = async_sessionmaker(bind=engine, class_=EnhanceAsyncSession, expire_on_commit=False, # 提交后不使对象过期 AsyncSessionLocal = async_sessionmaker(
bind=engine,
class_=EnhanceAsyncSession,
expire_on_commit=False, # 提交后不使对象过期
autoflush=False # 禁用自动刷新 autoflush=False # 禁用自动刷新
) )
@ -144,6 +152,7 @@ async def get_db_session():
yield session yield session
def with_db_session(session_param_name: str = "session"): def with_db_session(session_param_name: str = "session"):
""" """
一个装饰器用于为异步函数自动注入数据库会话 一个装饰器用于为异步函数自动注入数据库会话
@ -151,7 +160,6 @@ def with_db_session(session_param_name: str = "session"):
Args: Args:
session_param_name: 被装饰函数中用于接收会话的参数名默认为 'session' session_param_name: 被装饰函数中用于接收会话的参数名默认为 'session'
""" """
def decorator(func: Callable[P, T]) -> Callable[P, T]: def decorator(func: Callable[P, T]) -> Callable[P, T]:
# 确保只装饰异步函数 # 确保只装饰异步函数
if not inspect.iscoroutinefunction(func): if not inspect.iscoroutinefunction(func):
@ -169,10 +177,10 @@ def with_db_session(session_param_name: str = "session"):
return await func(*args, **kwargs) return await func(*args, **kwargs)
return wrapper return wrapper
return decorator return decorator
# 关闭引擎 # 关闭引擎
async def close_engine(): async def close_engine():
await engine.dispose() await engine.dispose()

@ -28,10 +28,11 @@ class DbBaseModel(SQLModel, table=False):
# class Config: # class Config:
# arbitrary_types_allowed = True # arbitrary_types_allowed = True
@classmethod @classmethod
def select(cls, fields: list = None): def select(cls, fields=None):
if fields is None: if fields is None:
return Select(cls) fields = cls
return Select(*fields) return Select(fields)
@classmethod @classmethod
def delete(cls): def delete(cls):

@ -14,7 +14,6 @@ from entity.dto import HttpResp, ApiResponse
from exceptions.base import AppException from exceptions.base import AppException
from utils import get_uuid from utils import get_uuid
__all__ = ["unified_resp", "BaseController"]
RT = TypeVar('RT') # 返回类型 RT = TypeVar('RT') # 返回类型
@ -36,7 +35,6 @@ def unified_resp(func: Callable[..., RT]) -> Callable[..., RT]:
resp = await func(*args, **kwargs) or [] resp = await func(*args, **kwargs) or []
else: else:
resp = func(*args, **kwargs) or [] resp = func(*args, **kwargs) or []
return JSONResponse( return JSONResponse(
content=jsonable_encoder( content=jsonable_encoder(
# 正常请求响应 # 正常请求响应
@ -60,23 +58,33 @@ class BaseController:
async def base_page(self, req: Union[dict, BaseModel], dto_class: Type[BaseModel] = None): async def base_page(self, req: Union[dict, BaseModel], dto_class: Type[BaseModel] = None):
if not isinstance(req, dict): if not isinstance(req, dict):
req = req.model_dump() req = req.model_dump()
result = await self.service.get_by_page(req,dto_class) result = await self.service.get_by_page(req)
# datas = result.data datas = result.data
# if datas and dto_class: if datas and dto_class:
# result.data = self.service.entity_conversion_dto(datas, dto_class) result.data = self.service.entity_conversion_dto(datas, dto_class)
return result return result
async def base_list(self, req: Union[dict, BaseModel], dto_class: Type[BaseModel] = None): async def base_list(self, req: Union[dict, BaseModel], dto_class: Type[BaseModel] = None):
if not isinstance(req, dict): if not isinstance(req, dict):
req = req.model_dump() req = req.model_dump()
datas = await self.service.get_list(req,dto_class) datas = await self.service.get_list(req)
if datas and dto_class:
datas = self.service.entity_conversion_dto(datas, dto_class)
return datas return datas
async def get_all(self, dto_class: Type[BaseModel] = None):
result = await self.service.get_all()
if dto_class:
result = self.service.entity_conversion_dto(result, dto_class)
return result
async def get_by_id(self, id: str): async def get_by_id(self, id: str, dto_class: Type[BaseModel] = None):
result = await self.service.get_by_id(id) data = await self.service.get_by_id(id)
if not result: if not data:
raise AppException(f"不存在 id 为{id}的数据") raise AppException(f"不存在 id 为{id}的数据")
result = data.to_dict()
if dto_class:
result = self.service.entity_conversion_dto(result, dto_class)
return result return result
async def add(self, req: Union[dict, BaseModel]): async def add(self, req: Union[dict, BaseModel]):
@ -94,7 +102,7 @@ class BaseController:
db_query_data = await self.service.get_by_id(id) db_query_data = await self.service.get_by_id(id)
if not db_query_data: if not db_query_data:
raise AppException(f"数据不存在") raise AppException(f"数据不存在")
await self.service.check_base_permission(db_query_data) self.service.check_base_permission(db_query_data)
try: try:
return await self.service.delete_by_id(id) return await self.service.delete_by_id(id)
except Exception as e: except Exception as e:
@ -109,7 +117,7 @@ class BaseController:
db_query_data = await self.service.get_by_id(data_id) db_query_data = await self.service.get_by_id(data_id)
if not db_query_data: if not db_query_data:
raise AppException(f"数据不存在") raise AppException(f"数据不存在")
await self.service.check_base_permission(db_query_data) self.service.check_base_permission(db_query_data)
try: try:
return await self.service.update_by_id(data_id, req) return await self.service.update_by_id(data_id, req)

@ -29,7 +29,7 @@ class BaseService(Generic[T]):
def get_query_stmt(cls, query_params, stmt=None, *, fields: list = None): def get_query_stmt(cls, query_params, stmt=None, *, fields: list = None):
if stmt is None: if stmt is None:
if fields: if fields:
stmt = cls.model.select(fields) stmt = cls.model.select(*fields)
else: else:
stmt = cls.model.select() stmt = cls.model.select()
for key, value in query_params.items(): for key, value in query_params.items():
@ -48,9 +48,6 @@ class BaseService(Generic[T]):
@classmethod @classmethod
def entity_conversion_dto(cls, entity_data: Union[list, BaseModel], dto: Type[BaseModel]) -> Union[ def entity_conversion_dto(cls, entity_data: Union[list, BaseModel], dto: Type[BaseModel]) -> Union[
BaseModel, List[BaseModel]]: BaseModel, List[BaseModel]]:
"""
数据脱敏
"""
dto_list = [] dto_list = []
if not isinstance(entity_data, list): if not isinstance(entity_data, list):
return dto(**entity_data.model_dump()) return dto(**entity_data.model_dump())
@ -62,27 +59,23 @@ class BaseService(Generic[T]):
return dto_list return dto_list
@classmethod @classmethod
async def check_base_permission(cls, daba: Any): def check_base_permission(cls, daba: Any):
# todo # todo
pass pass
@classmethod @classmethod
async def get_by_page(cls, query_params: Union[dict, BasePageQueryReq], dto_model_class: Type[BaseModel] = None) -> \ async def get_by_page(cls, query_params: Union[dict, BasePageQueryReq]) -> BasePageResp[T]:
BasePageResp[T]:
if not isinstance(query_params, dict): if not isinstance(query_params, dict):
query_params = query_params.model_dump() query_params = query_params.model_dump()
query_params = {k: v for k, v in query_params.items() if v not in [None, ""]} query_params = {k: v for k, v in query_params.items() if v not in [None, ""]}
fields = None query_stmt = cls.get_query_stmt(query_params)
if dto_model_class is not None: return await cls.auto_page(query_stmt, query_params)
fields = [getattr(cls.model, key) for key in dto_model_class.model_fields.keys()]
query_stmt = cls.get_query_stmt(query_params, fields=fields)
return await cls.auto_page(query_stmt, query_params, dto_model_class)
@classmethod @classmethod
@with_db_session() @with_db_session()
async def auto_page(cls, query_stmt, query_params: Union[dict, BasePageQueryReq] = None, async def auto_page(cls, query_stmt, query_params: Union[dict, BasePageQueryReq] = None,
dto_model_class: Type[BaseModel] = None, *, session: Optional[AsyncSession]) -> BasePageResp[T]: dto_model_class: Type[BaseModel] = None, *, session: Optional[AsyncSession]) -> \
BasePageResp[T]:
if not query_params: if not query_params:
query_params = {} query_params = {}
if not isinstance(query_params, dict): if not isinstance(query_params, dict):
@ -91,45 +84,87 @@ class BaseService(Generic[T]):
page_size = query_params.get("page_size", 12) page_size = query_params.get("page_size", 12)
sort = query_params.get("sort", "desc") sort = query_params.get("sort", "desc")
orderby = query_params.get("orderby", "created_time") orderby = query_params.get("orderby", "created_time")
data_count = None
if data_count == 0:
return BasePageResp(**{
"page_number": page_number,
"page_size": page_size,
"count": data_count,
"sort": sort,
"orderby": orderby,
"data": [],
})
if sort == "desc": if sort == "desc":
query_stmt = query_stmt.order_by(getattr(cls.model, orderby).desc()) query_stmt = query_stmt.order_by(getattr(cls.model, orderby).desc())
else: else:
query_stmt = query_stmt.order_by(getattr(cls.model, orderby).asc()) query_stmt = query_stmt.order_by(getattr(cls.model, orderby).asc())
query_page_result = await paginate(session,
query_page_result = await paginate(session, query_stmt, Params(page=page_number, size=page_size)) query_stmt,
Params(page=page_number, size=page_size))
result = query_page_result.items result = query_page_result.items
if dto_model_class is not None: if dto_model_class is not None:
# 使用 row._mapping 将 Row 对象转换为可解包的字典 result = [dto_model_class(**item) for item in result]
result = [dto_model_class(**dict(row._mapping)) for row in result] return BasePageResp(**{
return BasePageResp( "page_number": page_number,
**{"page_number": page_number, "page_size": page_size, "page_count": query_page_result.pages, "page_size": page_size,
"count": query_page_result.total, "sort": sort, "orderby": orderby, "data": result, }) "page_count": query_page_result.pages,
"count": query_page_result.total,
"sort": sort,
"orderby": orderby,
"data": result,
})
@classmethod @classmethod
@with_db_session() @with_db_session()
async def get_list(cls, query_params: Union[dict, BaseQueryReq], dto_model_class: Type[BaseModel] = None, *, async def get_list(cls, query_params: Union[dict, BaseQueryReq], *, session: Optional[AsyncSession] = None) -> List[
session: Optional[AsyncSession] = None) -> List[T] | List[BaseModel]: T]:
""" if not isinstance(query_params, dict):
query_params: 参数字典 or 参数请求模型 query_params = query_params.model_dump()
dto_model_class: 输出类型 query_params = {k: v for k, v in query_params.items() if v not in [None, ""]}
session: 数据库会话---支持传递以便于事务管控 sort = query_params.get("sort", "desc")
获取数据集合 orderby = query_params.get("orderby", "created_time")
""" query_stmt = cls.get_query_stmt(query_params)
query_stmt = cls.build_query(query_params, dto_model_class=dto_model_class) field = getattr(cls.model, orderby)
if sort == "desc":
query_stmt = query_stmt.order_by(field.desc())
else:
query_stmt = query_stmt.order_by(field.asc())
if query_params.get("limit", None) is not None:
query_stmt = query_stmt.limit(query_params.get("limit"))
exec_result = await session.execute(query_stmt) exec_result = await session.execute(query_stmt)
return cls.parse_result(exec_result, dto_model_class=dto_model_class) return list(exec_result.scalars().all())
@classmethod @classmethod
@with_db_session() @with_db_session()
async def get_id_list(cls, query_params: Union[dict, BaseQueryReq], *, session: Optional[AsyncSession] = None): async def get_list_json(cls, query_params: Union[dict, BaseQueryReq], *, session: Optional[AsyncSession] = None) -> \
query_stmt = cls.build_query(query_params, fields=[cls.model.id]) List[
exec_result = await session.scalars(query_stmt) T]:
return list(exec_result) resp_list = await cls.get_list(query_params, session=session)
return [i.model_dump() for i in resp_list]
@classmethod
@with_db_session()
async def get_id_list(cls, query_params: Union[dict, BaseQueryReq], *, session: Optional[AsyncSession] = None) -> \
List[str]:
if not isinstance(query_params, dict):
query_params = query_params.model_dump()
query_params = {k: v for k, v in query_params.items() if v is not None}
sort = query_params.get("sort", "desc")
orderby = query_params.get("orderby", "created_time")
query_stmt = cls.model.select(cls.model.id)
query_stmt = cls.get_query_stmt(query_params, query_stmt)
if sort == "desc":
query_stmt = query_stmt.order_by(cls.model.getter_by(orderby).desc())
else:
query_stmt = query_stmt.order_by(cls.model.getter_by(orderby).asc())
exec_result = await session.execute(query_stmt)
return [item["id"] for item in exec_result.scalars().all()]
@classmethod @classmethod
@with_db_session() @with_db_session()
async def save(cls, *, session: Optional[AsyncSession] = None, **kwargs) -> T: async def save(cls, *, session: Optional[AsyncSession] = None, **kwargs) -> T:
sample_obj = cls.model(**kwargs) sample_obj = cls.model(**kwargs)
session.add(sample_obj) session.add(sample_obj)
await session.flush() await session.flush()
@ -171,6 +206,7 @@ class BaseService(Generic[T]):
@classmethod @classmethod
@with_db_session() @with_db_session()
async def get_by_id(cls, pid, *, session: Optional[AsyncSession] = None) -> T: async def get_by_id(cls, pid, *, session: Optional[AsyncSession] = None) -> T:
stmt = cls.model.select().where(cls.model.id == pid) stmt = cls.model.select().where(cls.model.id == pid)
return await session.scalar(stmt) return await session.scalar(stmt)
@ -186,12 +222,15 @@ class BaseService(Generic[T]):
@classmethod @classmethod
@with_db_session() @with_db_session()
async def get_by_ids(cls, pids, dto_model_class: Type[BaseModel] = None, *, async def get_by_ids(cls, pids, cols=None, *, session: Optional[AsyncSession] = None) -> List[T]:
session: Optional[AsyncSession] = None) -> List[T]:
stmt = cls.build_query({}, dto_model_class=dto_model_class) if cols:
stmt = stmt.where(cls.model.id.in_(pids)) objs = cls.model.select(*cols)
exec_result = await session.execute(stmt) else:
return cls.parse_result(exec_result, dto_model_class=dto_model_class) objs = cls.model.select()
stmt = objs.where(cls.model.id.in_(pids))
result = await session.scalars(stmt)
return list(result.all())
@classmethod @classmethod
@with_db_session() @with_db_session()
@ -232,52 +271,3 @@ class BaseService(Generic[T]):
@classmethod @classmethod
async def is_exist(cls, query_params: dict = None): async def is_exist(cls, query_params: dict = None):
return await cls.get_data_count(query_params) > 0 return await cls.get_data_count(query_params) > 0
@classmethod
def build_query(cls, query_params: Union[dict, BaseQueryReq], *, dto_model_class: Type[BaseModel] = None,
fields: List = None):
"""
可选dto_model_classfields
优先级dto_model_class>fields
如果传递了dto_model_class则无需传递fieldsfields会被dto_model_class覆盖
"""
if not isinstance(query_params, dict):
query_params = query_params.model_dump()
query_params = {k: v for k, v in query_params.items() if v not in [None, ""]}
sort = query_params.get("sort", "desc").lower()
orderby = query_params.get("orderby", "created_time").lower()
if dto_model_class is not None:
# 安全映射:只获取 DTO 中存在且 Model 中也有的字段
# 避免 DTO 包含计算字段时 getattr 报错
db_columns = []
for key in dto_model_class.model_fields.keys():
if hasattr(cls.model, key):
db_columns.append(getattr(cls.model, key))
fields = db_columns
query_stmt = cls.get_query_stmt(query_params, fields=fields)
if hasattr(cls.model, orderby):
order_field = getattr(cls.model, orderby)
else:
# 如果传入的 orderby 字段不存在,回退到默认排序,防止报错
order_field = cls.model.created_time
# 根据xxx字段排序
if sort == "desc":
query_stmt = query_stmt.order_by(order_field.desc())
else:
query_stmt = query_stmt.order_by(order_field.asc())
if query_params.get("limit", None) is not None:
query_stmt = query_stmt.limit(query_params.get("limit"))
return query_stmt
@classmethod
def parse_result(cls, exec_result, dto_model_class: Type[BaseModel] = None):
"""
将数据库执行结果解析为 DTO 列表或 实体列表
"""
if dto_model_class is not None:
return [dto_model_class(**dict(row._mapping)) for row in exec_result.all()]
return list(exec_result.scalars().all())

Loading…
Cancel
Save