Compare commits

...

3 Commits

Author SHA1 Message Date
chenzhirong 8a2f37bb20 fix: 删除误传的多余参数 4 months ago
chenzhirong eea9687c50 perf:优化部分字段查询 4 months ago
chenzhirong f2da252a59 fix: 修复嵌套子查询异常 4 months ago

@ -3,144 +3,136 @@ import inspect
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Any, ParamSpec, TypeVar, Callable from typing import Any, ParamSpec, TypeVar, Callable
from sqlalchemy import Executable, Result, Select, Delete, Update, column, and_ from sqlalchemy import Executable, Result, and_, Select, Delete, Update
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.sql.selectable import Subquery
from common.constant import Constant from common.constant import Constant
from common.global_enums import IsDelete from common.global_enums import IsDelete
from config import settings from config import settings
from entity.base_entity import DbBaseModel
P = ParamSpec('P') P = ParamSpec('P')
T = TypeVar('T') T = TypeVar('T')
engine = create_async_engine( engine = create_async_engine(settings.database_url, echo=False, # 打印SQL日志生产环境建议关闭
settings.database_url, pool_size=10, # 连接池大小
echo=False, # 打印SQL日志生产环境建议关闭 max_overflow=20, # 最大溢出连接数
pool_size=10, # 连接池大小 pool_recycle=3600, # 连接回收时间解决MySQL超时断开问题【4†source】【5†source】
max_overflow=20, # 最大溢出连接数 )
pool_recycle=3600, # 连接回收时间解决MySQL超时断开问题【4†source】【5†source】
)
# 创建异步会话工厂 # 创建异步会话工厂
class EnhanceAsyncSession(AsyncSession): class EnhanceAsyncSession(AsyncSession):
async def scalar(self, statement: Executable,
params=None,
*,
execution_options=None,
bind_arguments=None,
**kw: Any, ):
def _add_logical_delete_condition(self, statement: Select) -> Select:
"""
Select 语句添加逻辑删除条件
支持递归处理 fastapi-pagination 生成的子查询包装
"""
# 特殊处理:如果当前查询只有一个子查询,直接修改其内部元素
# 这样可以避免产生重复的别名
if len(statement.froms) == 1 and isinstance(statement.froms[0], Subquery):
subquery = statement.froms[0]
# 递归处理子查询内部
processed_inner = self._add_logical_delete_condition(subquery.element)
# 如果内部有修改,直接替换 subquery 的 element
if processed_inner is not subquery.element:
subquery.element = processed_inner
return statement
# 处理当前层的表
delete_condition = None
delete_field = Constant.LOGICAL_DELETE_FIELD
for from_obj in statement.froms:
# 跳过子查询(因为会通过上面的逻辑递归处理)
if isinstance(from_obj, Subquery):
continue
# 只处理 Table 对象
if hasattr(from_obj, 'columns') and delete_field in from_obj.columns:
# 使用表对象上的列
condition = from_obj.columns[delete_field] == IsDelete.NO_DELETE
if delete_condition is None:
delete_condition = condition
else:
delete_condition = and_(delete_condition, condition)
# 如果有条件,则应用到当前 statement
if delete_condition is not None:
existing_condition = statement.whereclause
if existing_condition is not None:
new_condition = and_(existing_condition, delete_condition)
else:
new_condition = delete_condition
statement = statement.where(new_condition)
return statement
async def scalar(self, statement: Executable, params=None, *, execution_options=None, bind_arguments=None,
**kw: Any, ):
sig = inspect.signature(super().scalar) sig = inspect.signature(super().scalar)
if execution_options is None: if execution_options is None:
default_execution_options = sig.parameters['execution_options'].default default_execution_options = sig.parameters['execution_options'].default
execution_options = default_execution_options execution_options = default_execution_options
delete_condition = column(Constant.LOGICAL_DELETE_FIELD) == IsDelete.NO_DELETE
existing_condition = statement.whereclause # 只对 Select 语句添加逻辑删除条件
# 组合条件 if isinstance(statement, Select):
if existing_condition is not None: statement = self._add_logical_delete_condition(statement)
# 使用and_组合现有条件和逻辑删除条件
new_condition = and_(existing_condition, delete_condition)
else:
new_condition = delete_condition
# 应用新条件创建新的Select对象
statement = statement.where(new_condition)
return await super().scalar(statement, params, execution_options=execution_options, return await super().scalar(statement, params, execution_options=execution_options,
bind_arguments=bind_arguments, **kw) bind_arguments=bind_arguments, **kw)
async def execute( async def execute(self, statement: Executable, params=None, *, execution_options=None, bind_arguments=None,
self, **kw: Any, ) -> Result[Any]:
statement: Executable,
params=None,
*,
execution_options=None,
bind_arguments=None,
**kw: Any,
) -> Result[Any]:
sig = inspect.signature(super().execute) sig = inspect.signature(super().execute)
if execution_options is None: if execution_options is None:
default_execution_options = sig.parameters['execution_options'].default default_execution_options = sig.parameters['execution_options'].default
execution_options = default_execution_options execution_options = default_execution_options
# print("type(statement):{}", type(statement))
if isinstance(statement, Select): if isinstance(statement, Select):
# print("这是查询语句,过滤逻辑删除") statement = self._add_logical_delete_condition(statement)
delete_condition = column(Constant.LOGICAL_DELETE_FIELD) == IsDelete.NO_DELETE
# 获取现有条件
existing_condition = statement.whereclause
# 组合条件
if existing_condition is not None:
# 使用and_组合现有条件和逻辑删除条件
new_condition = and_(existing_condition, delete_condition)
else:
new_condition = delete_condition
# 应用新条件创建新的Select对象
statement = statement.where(new_condition)
if isinstance(statement, Delete): if isinstance(statement, Delete):
# 检查是否跳过软删除通过execution_options控制
skip_soft_delete = execution_options and execution_options.get("skip_soft_delete", False) skip_soft_delete = execution_options and execution_options.get("skip_soft_delete", False)
if not skip_soft_delete: if not skip_soft_delete:
# 获取表对象
table = statement.table table = statement.table
# 构建更新语句 if hasattr(table, 'columns') and Constant.LOGICAL_DELETE_FIELD in table.columns:
update_stmt = ( update_stmt = (Update(table).where(statement.whereclause).values(
Update(table) **{Constant.LOGICAL_DELETE_FIELD: IsDelete.DELETE}))
.where(statement.whereclause) # 保留原删除条件
.values(**{Constant.LOGICAL_DELETE_FIELD: IsDelete.DELETE}) # 设置软删除标记 if statement._returning:
) update_stmt = update_stmt.returning(*statement._returning)
# 如果原删除语句有RETURNING子句也添加到更新语句中 return await super().execute(update_stmt, params=params, execution_options=execution_options,
if statement._returning: bind_arguments=bind_arguments, **kw)
update_stmt = update_stmt.returning(*statement._returning)
# 执行更新语句 result = await super().execute(statement, params=params, execution_options=execution_options,
return await super().execute( bind_arguments=bind_arguments, **kw, )
update_stmt,
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
**kw
)
result = await super().execute(
statement,
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
**kw,
)
return result return result
# 重写delete方法处理单个对象删除
def delete(self, instance): def delete(self, instance):
from sqlalchemy import inspect from sqlalchemy import inspect as sqlalchemy_inspect
# 检查是否有逻辑删除属性
if hasattr(instance, Constant.LOGICAL_DELETE_FIELD): if hasattr(instance, Constant.LOGICAL_DELETE_FIELD):
# 设置软删除标记 setattr(instance, Constant.LOGICAL_DELETE_FIELD, IsDelete.DELETE)
instance.__setattr__(Constant.LOGICAL_DELETE_FIELD, IsDelete.DELETE)
# 确保对象在会话中(如果已分离则重新关联)
# 检查对象状态
insp = inspect(instance)
if insp.detached:
# 如果对象是分离的,则重新加入会话
self.add(instance)
elif insp.transient:
# 如果是瞬态对象,也添加到会话
self.add(instance)
# 标记对象为已修改(触发更新) insp = sqlalchemy_inspect(instance)
# self.expire(instance, [Constant.LOGICAL_DELETE_FIELD]) if insp.detached or insp.transient:
self.add(instance)
else: else:
# 如果没有逻辑删除属性,执行标准删除
super().delete(instance) super().delete(instance)
AsyncSessionLocal = async_sessionmaker( AsyncSessionLocal = async_sessionmaker(bind=engine, class_=EnhanceAsyncSession, expire_on_commit=False, # 提交后不使对象过期
bind=engine, autoflush=False # 禁用自动刷新
class_=EnhanceAsyncSession, )
expire_on_commit=False, # 提交后不使对象过期
autoflush=False # 禁用自动刷新
)
# 获取数据库session的独立方法 # 获取数据库session的独立方法
@ -152,7 +144,6 @@ async def get_db_session():
yield session yield session
def with_db_session(session_param_name: str = "session"): def with_db_session(session_param_name: str = "session"):
""" """
一个装饰器用于为异步函数自动注入数据库会话 一个装饰器用于为异步函数自动注入数据库会话
@ -160,6 +151,7 @@ def with_db_session(session_param_name: str = "session"):
Args: Args:
session_param_name: 被装饰函数中用于接收会话的参数名默认为 'session' session_param_name: 被装饰函数中用于接收会话的参数名默认为 'session'
""" """
def decorator(func: Callable[P, T]) -> Callable[P, T]: def decorator(func: Callable[P, T]) -> Callable[P, T]:
# 确保只装饰异步函数 # 确保只装饰异步函数
if not inspect.iscoroutinefunction(func): if not inspect.iscoroutinefunction(func):
@ -177,8 +169,8 @@ def with_db_session(session_param_name: str = "session"):
return await func(*args, **kwargs) return await func(*args, **kwargs)
return wrapper return wrapper
return decorator
return decorator
# 关闭引擎 # 关闭引擎

@ -28,11 +28,10 @@ class DbBaseModel(SQLModel, table=False):
# class Config: # class Config:
# arbitrary_types_allowed = True # arbitrary_types_allowed = True
@classmethod @classmethod
def select(cls, fields=None): def select(cls, fields: list = None):
if fields is None: if fields is None:
fields = cls return Select(cls)
return Select(fields) return Select(*fields)
@classmethod @classmethod
def delete(cls): def delete(cls):

@ -14,6 +14,7 @@ from entity.dto import HttpResp, ApiResponse
from exceptions.base import AppException from exceptions.base import AppException
from utils import get_uuid from utils import get_uuid
__all__ = ["unified_resp", "BaseController"]
RT = TypeVar('RT') # 返回类型 RT = TypeVar('RT') # 返回类型
@ -35,6 +36,7 @@ def unified_resp(func: Callable[..., RT]) -> Callable[..., RT]:
resp = await func(*args, **kwargs) or [] resp = await func(*args, **kwargs) or []
else: else:
resp = func(*args, **kwargs) or [] resp = func(*args, **kwargs) or []
return JSONResponse( return JSONResponse(
content=jsonable_encoder( content=jsonable_encoder(
# 正常请求响应 # 正常请求响应
@ -58,33 +60,23 @@ class BaseController:
async def base_page(self, req: Union[dict, BaseModel], dto_class: Type[BaseModel] = None): async def base_page(self, req: Union[dict, BaseModel], dto_class: Type[BaseModel] = None):
if not isinstance(req, dict): if not isinstance(req, dict):
req = req.model_dump() req = req.model_dump()
result = await self.service.get_by_page(req) result = await self.service.get_by_page(req,dto_class)
datas = result.data # datas = result.data
if datas and dto_class: # if datas and dto_class:
result.data = self.service.entity_conversion_dto(datas, dto_class) # result.data = self.service.entity_conversion_dto(datas, dto_class)
return result return result
async def base_list(self, req: Union[dict, BaseModel], dto_class: Type[BaseModel] = None): async def base_list(self, req: Union[dict, BaseModel], dto_class: Type[BaseModel] = None):
if not isinstance(req, dict): if not isinstance(req, dict):
req = req.model_dump() req = req.model_dump()
datas = await self.service.get_list(req) datas = await self.service.get_list(req,dto_class)
if datas and dto_class:
datas = self.service.entity_conversion_dto(datas, dto_class)
return datas return datas
async def get_all(self, dto_class: Type[BaseModel] = None):
result = await self.service.get_all()
if dto_class:
result = self.service.entity_conversion_dto(result, dto_class)
return result
async def get_by_id(self, id: str, dto_class: Type[BaseModel] = None): async def get_by_id(self, id: str):
data = await self.service.get_by_id(id) result = await self.service.get_by_id(id)
if not data: if not result:
raise AppException(f"不存在 id 为{id}的数据") raise AppException(f"不存在 id 为{id}的数据")
result = data.to_dict()
if dto_class:
result = self.service.entity_conversion_dto(result, dto_class)
return result return result
async def add(self, req: Union[dict, BaseModel]): async def add(self, req: Union[dict, BaseModel]):
@ -102,7 +94,7 @@ class BaseController:
db_query_data = await self.service.get_by_id(id) db_query_data = await self.service.get_by_id(id)
if not db_query_data: if not db_query_data:
raise AppException(f"数据不存在") raise AppException(f"数据不存在")
self.service.check_base_permission(db_query_data) await self.service.check_base_permission(db_query_data)
try: try:
return await self.service.delete_by_id(id) return await self.service.delete_by_id(id)
except Exception as e: except Exception as e:
@ -117,7 +109,7 @@ class BaseController:
db_query_data = await self.service.get_by_id(data_id) db_query_data = await self.service.get_by_id(data_id)
if not db_query_data: if not db_query_data:
raise AppException(f"数据不存在") raise AppException(f"数据不存在")
self.service.check_base_permission(db_query_data) await self.service.check_base_permission(db_query_data)
try: try:
return await self.service.update_by_id(data_id, req) return await self.service.update_by_id(data_id, req)

@ -29,7 +29,7 @@ class BaseService(Generic[T]):
def get_query_stmt(cls, query_params, stmt=None, *, fields: list = None): def get_query_stmt(cls, query_params, stmt=None, *, fields: list = None):
if stmt is None: if stmt is None:
if fields: if fields:
stmt = cls.model.select(*fields) stmt = cls.model.select(fields)
else: else:
stmt = cls.model.select() stmt = cls.model.select()
for key, value in query_params.items(): for key, value in query_params.items():
@ -48,6 +48,9 @@ class BaseService(Generic[T]):
@classmethod @classmethod
def entity_conversion_dto(cls, entity_data: Union[list, BaseModel], dto: Type[BaseModel]) -> Union[ def entity_conversion_dto(cls, entity_data: Union[list, BaseModel], dto: Type[BaseModel]) -> Union[
BaseModel, List[BaseModel]]: BaseModel, List[BaseModel]]:
"""
数据脱敏
"""
dto_list = [] dto_list = []
if not isinstance(entity_data, list): if not isinstance(entity_data, list):
return dto(**entity_data.model_dump()) return dto(**entity_data.model_dump())
@ -59,23 +62,27 @@ class BaseService(Generic[T]):
return dto_list return dto_list
@classmethod @classmethod
def check_base_permission(cls, daba: Any): async def check_base_permission(cls, daba: Any):
# todo # todo
pass pass
@classmethod @classmethod
async def get_by_page(cls, query_params: Union[dict, BasePageQueryReq]) -> BasePageResp[T]: async def get_by_page(cls, query_params: Union[dict, BasePageQueryReq], dto_model_class: Type[BaseModel] = None) -> \
BasePageResp[T]:
if not isinstance(query_params, dict): if not isinstance(query_params, dict):
query_params = query_params.model_dump() query_params = query_params.model_dump()
query_params = {k: v for k, v in query_params.items() if v not in [None, ""]} query_params = {k: v for k, v in query_params.items() if v not in [None, ""]}
query_stmt = cls.get_query_stmt(query_params) fields = None
return await cls.auto_page(query_stmt, query_params) if dto_model_class is not None:
fields = [getattr(cls.model, key) for key in dto_model_class.model_fields.keys()]
query_stmt = cls.get_query_stmt(query_params, fields=fields)
return await cls.auto_page(query_stmt, query_params, dto_model_class)
@classmethod @classmethod
@with_db_session() @with_db_session()
async def auto_page(cls, query_stmt, query_params: Union[dict, BasePageQueryReq] = None, async def auto_page(cls, query_stmt, query_params: Union[dict, BasePageQueryReq] = None,
dto_model_class: Type[BaseModel] = None, *, session: Optional[AsyncSession]) -> \ dto_model_class: Type[BaseModel] = None, *, session: Optional[AsyncSession]) -> BasePageResp[T]:
BasePageResp[T]:
if not query_params: if not query_params:
query_params = {} query_params = {}
if not isinstance(query_params, dict): if not isinstance(query_params, dict):
@ -84,87 +91,45 @@ class BaseService(Generic[T]):
page_size = query_params.get("page_size", 12) page_size = query_params.get("page_size", 12)
sort = query_params.get("sort", "desc") sort = query_params.get("sort", "desc")
orderby = query_params.get("orderby", "created_time") orderby = query_params.get("orderby", "created_time")
data_count = None
if data_count == 0:
return BasePageResp(**{
"page_number": page_number,
"page_size": page_size,
"count": data_count,
"sort": sort,
"orderby": orderby,
"data": [],
})
if sort == "desc": if sort == "desc":
query_stmt = query_stmt.order_by(getattr(cls.model, orderby).desc()) query_stmt = query_stmt.order_by(getattr(cls.model, orderby).desc())
else: else:
query_stmt = query_stmt.order_by(getattr(cls.model, orderby).asc()) query_stmt = query_stmt.order_by(getattr(cls.model, orderby).asc())
query_page_result = await paginate(session,
query_stmt, query_page_result = await paginate(session, query_stmt, Params(page=page_number, size=page_size))
Params(page=page_number, size=page_size))
result = query_page_result.items result = query_page_result.items
if dto_model_class is not None: if dto_model_class is not None:
result = [dto_model_class(**item) for item in result] # 使用 row._mapping 将 Row 对象转换为可解包的字典
return BasePageResp(**{ result = [dto_model_class(**dict(row._mapping)) for row in result]
"page_number": page_number, return BasePageResp(
"page_size": page_size, **{"page_number": page_number, "page_size": page_size, "page_count": query_page_result.pages,
"page_count": query_page_result.pages, "count": query_page_result.total, "sort": sort, "orderby": orderby, "data": result, })
"count": query_page_result.total,
"sort": sort,
"orderby": orderby,
"data": result,
})
@classmethod @classmethod
@with_db_session() @with_db_session()
async def get_list(cls, query_params: Union[dict, BaseQueryReq], *, session: Optional[AsyncSession] = None) -> List[ async def get_list(cls, query_params: Union[dict, BaseQueryReq], dto_model_class: Type[BaseModel] = None, *,
T]: session: Optional[AsyncSession] = None) -> List[T] | List[BaseModel]:
if not isinstance(query_params, dict): """
query_params = query_params.model_dump() query_params: 参数字典 or 参数请求模型
query_params = {k: v for k, v in query_params.items() if v not in [None, ""]} dto_model_class: 输出类型
sort = query_params.get("sort", "desc") session: 数据库会话---支持传递以便于事务管控
orderby = query_params.get("orderby", "created_time") 获取数据集合
query_stmt = cls.get_query_stmt(query_params) """
field = getattr(cls.model, orderby) query_stmt = cls.build_query(query_params, dto_model_class=dto_model_class)
if sort == "desc":
query_stmt = query_stmt.order_by(field.desc())
else:
query_stmt = query_stmt.order_by(field.asc())
if query_params.get("limit", None) is not None:
query_stmt = query_stmt.limit(query_params.get("limit"))
exec_result = await session.execute(query_stmt) exec_result = await session.execute(query_stmt)
return list(exec_result.scalars().all()) return cls.parse_result(exec_result, dto_model_class=dto_model_class)
@classmethod
@with_db_session()
async def get_list_json(cls, query_params: Union[dict, BaseQueryReq], *, session: Optional[AsyncSession] = None) -> \
List[
T]:
resp_list = await cls.get_list(query_params, session=session)
return [i.model_dump() for i in resp_list]
@classmethod @classmethod
@with_db_session() @with_db_session()
async def get_id_list(cls, query_params: Union[dict, BaseQueryReq], *, session: Optional[AsyncSession] = None) -> \ async def get_id_list(cls, query_params: Union[dict, BaseQueryReq], *, session: Optional[AsyncSession] = None):
List[str]: query_stmt = cls.build_query(query_params, fields=[cls.model.id])
if not isinstance(query_params, dict): exec_result = await session.scalars(query_stmt)
query_params = query_params.model_dump() return list(exec_result)
query_params = {k: v for k, v in query_params.items() if v is not None}
sort = query_params.get("sort", "desc")
orderby = query_params.get("orderby", "created_time")
query_stmt = cls.model.select(cls.model.id)
query_stmt = cls.get_query_stmt(query_params, query_stmt)
if sort == "desc":
query_stmt = query_stmt.order_by(cls.model.getter_by(orderby).desc())
else:
query_stmt = query_stmt.order_by(cls.model.getter_by(orderby).asc())
exec_result = await session.execute(query_stmt)
return [item["id"] for item in exec_result.scalars().all()]
@classmethod @classmethod
@with_db_session() @with_db_session()
async def save(cls, *, session: Optional[AsyncSession] = None, **kwargs) -> T: async def save(cls, *, session: Optional[AsyncSession] = None, **kwargs) -> T:
sample_obj = cls.model(**kwargs) sample_obj = cls.model(**kwargs)
session.add(sample_obj) session.add(sample_obj)
await session.flush() await session.flush()
@ -206,7 +171,6 @@ class BaseService(Generic[T]):
@classmethod @classmethod
@with_db_session() @with_db_session()
async def get_by_id(cls, pid, *, session: Optional[AsyncSession] = None) -> T: async def get_by_id(cls, pid, *, session: Optional[AsyncSession] = None) -> T:
stmt = cls.model.select().where(cls.model.id == pid) stmt = cls.model.select().where(cls.model.id == pid)
return await session.scalar(stmt) return await session.scalar(stmt)
@ -222,15 +186,12 @@ class BaseService(Generic[T]):
@classmethod @classmethod
@with_db_session() @with_db_session()
async def get_by_ids(cls, pids, cols=None, *, session: Optional[AsyncSession] = None) -> List[T]: async def get_by_ids(cls, pids, dto_model_class: Type[BaseModel] = None, *,
session: Optional[AsyncSession] = None) -> List[T]:
if cols: stmt = cls.build_query({}, dto_model_class=dto_model_class)
objs = cls.model.select(*cols) stmt = stmt.where(cls.model.id.in_(pids))
else: exec_result = await session.execute(stmt)
objs = cls.model.select() return cls.parse_result(exec_result, dto_model_class=dto_model_class)
stmt = objs.where(cls.model.id.in_(pids))
result = await session.scalars(stmt)
return list(result.all())
@classmethod @classmethod
@with_db_session() @with_db_session()
@ -271,3 +232,52 @@ class BaseService(Generic[T]):
@classmethod @classmethod
async def is_exist(cls, query_params: dict = None): async def is_exist(cls, query_params: dict = None):
return await cls.get_data_count(query_params) > 0 return await cls.get_data_count(query_params) > 0
@classmethod
def build_query(cls, query_params: Union[dict, BaseQueryReq], *, dto_model_class: Type[BaseModel] = None,
fields: List = None):
"""
可选dto_model_classfields
优先级dto_model_class>fields
如果传递了dto_model_class则无需传递fieldsfields会被dto_model_class覆盖
"""
if not isinstance(query_params, dict):
query_params = query_params.model_dump()
query_params = {k: v for k, v in query_params.items() if v not in [None, ""]}
sort = query_params.get("sort", "desc").lower()
orderby = query_params.get("orderby", "created_time").lower()
if dto_model_class is not None:
# 安全映射:只获取 DTO 中存在且 Model 中也有的字段
# 避免 DTO 包含计算字段时 getattr 报错
db_columns = []
for key in dto_model_class.model_fields.keys():
if hasattr(cls.model, key):
db_columns.append(getattr(cls.model, key))
fields = db_columns
query_stmt = cls.get_query_stmt(query_params, fields=fields)
if hasattr(cls.model, orderby):
order_field = getattr(cls.model, orderby)
else:
# 如果传入的 orderby 字段不存在,回退到默认排序,防止报错
order_field = cls.model.created_time
# 根据xxx字段排序
if sort == "desc":
query_stmt = query_stmt.order_by(order_field.desc())
else:
query_stmt = query_stmt.order_by(order_field.asc())
if query_params.get("limit", None) is not None:
query_stmt = query_stmt.limit(query_params.get("limit"))
return query_stmt
@classmethod
def parse_result(cls, exec_result, dto_model_class: Type[BaseModel] = None):
"""
将数据库执行结果解析为 DTO 列表或 实体列表
"""
if dto_model_class is not None:
return [dto_model_class(**dict(row._mapping)) for row in exec_result.all()]
return list(exec_result.scalars().all())

Loading…
Cancel
Save