防止走失先贴链接。pydantic-resolve
这样一个场景, 前提是 RESTful
以论坛为例,有个接口返回帖子(posts)信息,然后呢,来了新需求,说需要显示帖子的 author 信息。
这时候会有两种做法:
author_id
, author_name
之类的字段。 {'post': 'v2ex', 'author_name': 'tangkikodo'}
{'post':'v2ex', 'author': {'name': 'tangkikod'}}
在方法 1 中,需要修改 query , 还需要修改 post 的 schema. 如果未来要加新的,例如用户头像的话,需要修改两处。
方法 2 需要手动做一次拼接。而之后增减字段都是在 author 自己的部分修改。
所以相对来说方法 2 在未来的可维护性会比较好。用嵌套对象的方式可以更好的扩展和维护。
然而需求总是会变化,突然来了一个新的且奇怪的需求,要在 author 信息中添加数据,显示他最近浏览过的帖子。
[
{
"id": 1,
"post": "v2ex",
"author": {
"name": "tangkikodo",
"recent_views": [
{
"id": 2,
"post": "v3ex"
},
{
"id": 3,
"post": "v4ex"
}
]
}
}
]
那这个时候该怎么弄呢?血压是不是有点上来了。
根据之前的方法 2 , 通常的想法是在获取到 authors 信息后, 再关联查找 author 的 recent_posts
, 拼接回 authors, 再将 authors 拼接回 posts 。
反正想想就挺麻烦的对吧。如果你此时血压有点高,那请继续往下看。
那,有别的办法么? 这里有个小轮子也许能帮忙。
https://github.com/allmonday/pydantic-resolve
以刚才的例子,要做的事情分两步:
1 , 定义 dataloader ,前半部分是从数据库查询,后半部分是将数据转成 pydantic 对象后返回。 伪代码,看个大概意思就好。
class AuthorLoader(DataLoader):
async def batch_load_fn(self, author_ids):
async with async_session() as session:
res = await session.execute(select(Author).where(Author.id.in_(author_ids)))
rows = res.scalars().all()
dct = defaultdict(list)
for row in rows:
dct[row.author_id] = AuthorSchema.from_orm(row)
return [dct.get(k, None) for k in author_ids]
class RecentViewPostLoader(DataLoader):
async def batch_load_fn(self, view_ids):
async with async_session() as session:
res = await session.execute(select(Post) # join 浏览中间表
.join(PostVist, PostVisit.post_id == Post.id)
.where(PostVisit.user_id.in_(view_ids)
.where(PostVisit.created_at < some_timestamp)))
rows = res.scalars().all()
dct = defaultdict(list)
for row in rows:
dct[row.view_id].append(PostSchema.from_orm(row))
return [dct.get(k, []) for k in view_ids]
class RecentPostSchema(BaseModel):
id: int
name: str
class Config:
orm_mode = True
class AuthorSchema(BaseModel):
id: int
name: str
img_url: str
recent_views: Tuple[RecentPostSchema, ...] = tuple()
def resolve_recent_views(self, loader=LoaderDepend(RecentViewPostLoader)): # <=== 核心操作
return loader.load(self.id)
class Config:
orm_mode = True
class PostSchema(BaseModel):
id: int
author_id: int
name: str
author: Optional[AuthorSchema] = None
def resolve_author(self, loader=LoaderDepend(AuthorLoader)): # <=== 核心操作
return loader.load(self.author_id)
class Config:
orm_mode = True
然后呢?
然后就没有了,接下来只要做个 post 的查询, 再简单地...resolve 一下,任务就做好了。
posts = (await session.execute(select(Post))).scalars().all()
posts = [PostSchema.from_orm(p) for p in tasks]
results = await Resolver().resolve(posts)
在拆分了 loader 和 schema 之后,对数据地任意操作都很简单,添加新字段只要三步:
就完事了。如果说这方法有啥缺点的话。。必须用 async await 可能算一个。。
真实可测的例子可以看这个 demo: link
谢谢。
这个工具受到了 graphql 很大的启发,如果这个小轮子可以帮到忙的话,我会感到很开心。 :)
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.