pydantic-resolve嵌套数据结构生成LoaderDepend管理contextvars
作者:allmonday 发布时间:2023-01-12 22:21:05
pydantic-resolve 解决嵌套数据结构的生成和其他方案的比较
pydantic-resolve
和GraphQL相比
GraphQL的优势是 1.方便构建嵌套结构,2.client可以方便生成查询子集。非常适合构建满足灵活变化的 public API的场景.
但是很多实际业务在前端做的其实是照单全收,并没有灵活选择的需要。GraphQL带来的便利更多体现在灵活地构建嵌套结构。
GraphQL需要client端维护查询语句,相较于通过
openapi.json
和工具自动生成client让前后端无缝对接的做法,在前后端一体的架构中维护这些查询语句,属于重复劳动。为了满足权限控制的需要,通过RESTful定义一个个API 会比全局一个Query,Mutation 控制起来更加清晰直接。
Pydantic-resolve 恰好满足了灵活构建嵌套结构的需求,它不需要像GraphQL一样引入一系列概念和设置,它非常轻量级,没有任何侵入,所有的功能通过简单
resolve
一下就实现。Pydantic-resolve 在保持轻量级的同时,可以隐藏 Dataloader 的初始化逻辑,避免了GraphQL中在多处维护dataloader的麻烦。
Pydantic-resolve 还提供了对 global
loader filters
的支持,在一些业务逻辑下可以简化很多代码。如果把Dataloader 的 keys 等价视为 relationship的 join on 条件的话, 那么loader_filters
就类似在别处的其他过滤条件。
结论:
GraphQL更适合 public API。
对前后端作为一个整体的项目,RESTful + Pydantic-resolve 才是快速灵活提供数据结构的最佳方法。
和 ORM 的 relationship相比
relationship 提供了ORM 级别的嵌套查询实现,但默认会使用lazy select的方法, 会导致很多的查询次数, 并且在异步使用的时候需要手动声明例如
.option(subquery(Model.field))
之类的代码relationship 的外键决定了,无法在关联查询的时候提供额外的过滤条件 (即便可以也是改动成本比较大的做法)
relationship 最大的问题是使得 ORM Model 和 schema 产生了代码耦合。在schema层想做的嵌套查询,会把逻辑侵入到ORM Model层。
Pydantic-resolve 则没有这样的问题,在 ORM 层不需要定义任何relationship,所有的join逻辑都通过 dataloader 批量查询解决。 并且通过 global
loader_filters
参数,可以提供额外的全局过滤条件。
结论
relationship 方案的灵活度低,不方便修改,默认的用法会产生外键约束。对迭代频繁的项目不友好。
Pydantic-resolve 和 ORM 层完全解耦,可以通过灵活创建Dataloader 来满足各种需要。
LoaderDepend的用途 背景
如果你使用过dataloader, 不论是js还是python的,都会遇到一个问题,如何为单独的一个请求创建独立的dataloader?
以 python 的 strawberry
来举例子:
@strawberry.type
class User:
id: strawberry.ID
async def load_users(keys) -> List[User]:
return [User(id=key) for key in keys]
loader = DataLoader(load_fn=load_users)
@strawberry.type
class Query:
@strawberry.field
async def get_user(self, id: strawberry.ID) -> User:
return await loader.load(id)
schema = strawberry.Schema(query=Query)
如果单独实例化的话,会导致所有的请求都使用同一个dataloader, 由于loader本身是有缓存优化机制的,所以即使内容更新之后,依然会返回缓存的历史数据。
因此 strawberry
的处理方式是:
@strawberry.type
class User:
id: strawberry.ID
async def load_users(keys) -> List[User]:
return [User(id=key) for key in keys]
class MyGraphQL(GraphQL):
async def get_context(
self, request: Union[Request, WebSocket], response: Optional[Response]
) -> Any:
return {"user_loader": DataLoader(load_fn=load_users)}
@strawberry.type
class Query:
@strawberry.field
async def get_user(self, info: Info, id: strawberry.ID) -> User:
return await info.context["user_loader"].load(id)
schema = strawberry.Schema(query=Query)
app = MyGraphQL(schema)
开发者需要在get_context
中去初始化loader, 然后框架会负责在每次request的时候会执行初始化。 这样每个请求就会有独立的loader, 解决了多次请求被缓存的问题。
其中的原理是:contextvars 在 await 的时候会做一次浅拷贝,所以外层的context可以被内部读到,因此手动在最外层(request的时候) 初始化一个引用类型(dict)之后,那么在 request 内部自然就能获取到引用类型内的loader。
这个方法虽然好,但存在两个问题:
需要手动去维护
get_context
, 每当新增了一个 DataLoader, 就需要去里面添加, 而且实际执行.load
的地方也要从context 里面取loader。存在初始化了loaders却没有被使用到的情况,比如整个Query 有 N 个loader,但是用户的查询实际只用到了1个,那么其他loader 的初始化就浪费了。而且作为公共区域东西多了之后代码维护会不清晰。(重要)
而 graphene
就更加任性了,把loader 的活交给了 aiodataloader, 如果翻阅文档的话,会发现处理的思路也是类似的,只是需要手动去维护创建过程。
解决方法
我所期望的功能是:
初始化按需执行,比如我的整个schema 里面只存在 DataLoaderA, 那我希望只有DataLoaderA 被实例化
不希望在某个reqeust或者 middleware中干手动维护初始化。
其实这两件事情说的是同一个问题,就是如何把初始化的事情依赖反转到 resolve_field 方法中。
具体转化为代码:
class CommentSchema(BaseModel):
id: int
task_id: int
content: str
feedbacks: List[FeedbackSchema] = []
def resolve_feedbacks(self, loader=LoaderDepend(FeedbackLoader)):
return loader.load(self.id)
class TaskSchema(BaseModel):
id: int
name: str
comments: List[CommentSchema] = []
def resolve_comments(self, loader=LoaderDepend(CommentLoader)):
return loader.load(self.id)
就是说,我只要这样申明好loader,其他的事情就一律不用操心。那么,这做得到么?
得益于pydantic-resolve
存在一个手动执行resolve
的过程,于是有一个思路:
contextvar 是浅拷贝,所以存的如果是引用类型,那么在最外层定义的dict,可以被所有内层读到。可以在Resolver初始化的时候定义。
假如
tasks: list[TaskSchema]
有n个,我希望在第一次遇到的时候把loader 初始化并缓存,后续其他都使用缓存的loader。LoaderDepend 里面存放的是 DataLoader类,做为default 参数传入resolve_field 方法
执行resolve_field之前,利用inspect.signature 分析 default 参数,执行初始化和缓存的逻辑。
总体就是一个lazy的路子,到实际执行的时候去处理初始化流程。
下图中 1 会执行LoaderA 初始化,2,3则是读取缓存, 1.1 会执行LoaderB初始化,2.1,3.1 读取缓存
代码如下:
class Resolver:
def __init__(self):
self.ctx = contextvars.ContextVar('pydantic_resolve_internal_context', default={})
def exec_method(self, method):
signature = inspect.signature(method)
params = {}
for k, v in signature.parameters.items():
if isinstance(v.default, Depends):
cache_key = str(v.default.dependency.__name__)
cache = self.ctx.get()
hit = cache.get(cache_key, None)
if hit:
instance = hit
else:
instance = v.default.dependency()
cache[cache_key] = instance
self.ctx.set(cache)
params[k] = instance
return method(**params)
遗留问题 (已经解决)
有些DataLoader的实现可能需要一个外部的查询条件, 比如查询用户的absense信息的时候,除了user_key 之外,还需要额外提供其他全局filter 比如sprint_id)。 这种全局变量从load参数走会显得非常啰嗦。
这种时候就依然需要借助contextvars 在外部设置变量。 以一段项目代码为例:
async def get_team_users_load(team_id: int, sprint_id: Optional[int], session: AsyncSession):
ctx.team_id_context.set(team_id) # set global filter
ctx.sprint_id_context.set(sprint_id) # set global filter
res = await session.execute(select(User)
.join(UserTeam, UserTeam.user_id == User.id)
.filter(UserTeam.team_id == team_id))
db_users = res.scalars()
users = [schema.UserLoadUser(id=u.id, employee_id=u.employee_id, name=u.name)
for u in db_users]
results = await Resolver().resolve(users) # resolve
return results
class AbsenseLoader(DataLoader):
async def batch_load_fn(self, user_keys):
async with async_session() as session, session.begin():
sprint_id = ctx.sprint_id_context.get() # read global filter
sprint_stmt = Sprint.status == SprintStatusEnum.ongoing if not sprint_id else Sprint.id == sprint_id
res = await session.execute(select(SprintAbsence)
.join(Sprint, Sprint.id == SprintAbsence.sprint_id)
.join(User, User.id == SprintAbsence.user_id)
.filter(sprint_stmt)
.filter(SprintAbsence.user_id.in_(user_keys)))
rows = res.scalars().all()
dct = {}
for row in rows:
dct[row.user_id] = row.hours
return [dct.get(k, 0) for k in user_keys]
期望的设置方式为:
loader_filters = {
AbsenseLoader: {'sprint_id': 10},
OtherLoader: {field: 'value_x'}
}
results = await Resolver(loader_filters=loader_filters).resolve(users)
如果需要filter但是却没有设置, 该情况下要抛异常
来源:https://juejin.cn/post/7218862096406544443


猜你喜欢
- 在MySQL数据库中导出整个数据库:1.导出整个数据库mysqldump -u 用户名 -p 数据库名 > 导出的文件名mysqldu
- 介绍百度aip模块是用于实现百度云与用户接口,简单来说就是使用百度云所拥有的人工智能模块。模块使用pip install baidu-aip
- 代码如下:<%@ Language=VBScript %> <% Dim 
- 1、手动调参,但这种方法依赖于大量的经验,而且比较费时。许多情况下,工程师依靠试错法手工调整超参数进行优化,有经验的工程师可以在很大程度上判
- 是否曾经有过这样的经历:把一个元素置于另一个元素之上,而希望下面的那个元素成为可点击的?现在,利用css的pointer-events属性即
- 由于周牛的JS知识很多,下面我来分享一下SQL下编写储存过程的初级阶段 在数据库界,在SQL,DB,甲骨文三大数据库中,通过编写储存过程增强
- 在JavaScript中四种基本的数据类型:数值(整数和实数)、字符串型(用“”号或‘'括起来的字符或数值)、布尔型(使True或F
- 目录一、变量、常量的区别二、变量1. Python中的变量不需要声明类型2. 用“=”号来给变量赋值3. 赋值4. 变量5. “=”6. P
- 本文实例讲述了python打开文件并获取文件相关属性的方法。分享给大家供大家参考。具体分析如下:下面的代码通过open函数打开文件,并输出文
- 简介with的基本表达式如下with context_expression [as target(s)]: ...
- 不同于其他软件项目,互联网项目的开发有其独有的特性。互联网项目开发不同于传统软件项目开发不同于需求定制性的软件开发公司。客户的需求是明确的,
- 识别MNIST已经成了深度学习的hello world,所以每次例程基本都会用到这个数据集,这个数据集在tensorflow内部用着很好的封
- 在javascript里怎么样才能把int型转换成string型(1)var x=100 a = &nb
- 方法1:使用dataframe.loc[]函数通过这个方法,我们可以用一个条件或一个布尔数组来访问一组行或列。如果我们可以访问它,我们也可以
- pycharm sql语句警告产生原因为没有配置数据库,配置数据库,似乎没什么作用那么,直接去掉他的警告提示找到setting->ed
- 可以用函数 json.dumps()将 Python 对象编码转换为字符串形式。例如:import json python_obj = [[
- 50个常用sql语句 Student(S#,Sname,Sage,Ssex) 学生表 Course(C#,Cname,T#) 课程表 SC(
- 介绍Django是一个Web框架——一套用于帮助开发交互式网站的工具。Django能够响应网页请求,还能让我们更轻松地读写数据库、管理用户等
- 为了庆祝jQuery的四周岁生日, jQuery的团队荣幸的发布了jQuery Javascript库的最新主要版本! 这个版本包含了大量的
- 我们的规范到底做到哪一步算是发挥良好的价值?其实一件事物我们理解错根本目的会导致出大不一样的结果,直接反应在设计师到底要体现什么的价值。想想