Tortoise-orm信号实现及使用场景源码详解
作者:it_miclon 发布时间:2021-12-18 23:01:45
场景
在使用Tortoise操作数据库的时候发现,通过对操作数据库模型加以装饰器,如@pre_save(Model)
,可以实现对这个模型在savue
时,自动调用被装饰的方法,从而实现对模型的一些操作。
在此先从官方文档入手,看一下官方的对于模型信号的Example
# -*- coding: utf-8 -*-
"""
This example demonstrates model signals usage
"""
from typing import List, Optional, Type
from tortoise import BaseDBAsyncClient, Tortoise, fields, run_async
from tortoise.models import Model
from tortoise.signals import post_delete, post_save, pre_delete, pre_save
class Signal(Model):
id = fields.IntField(pk=True)
name = fields.TextField()
class Meta:
table = "signal"
def __str__(self):
return self.name
@pre_save(Signal)
async def signal_pre_save(
sender: "Type[Signal]", instance: Signal, using_db, update_fields
) -> None:
print('signal_pre_save', sender, instance, using_db, update_fields)
@post_save(Signal)
async def signal_post_save(
sender: "Type[Signal]",
instance: Signal,
created: bool,
using_db: "Optional[BaseDBAsyncClient]",
update_fields: List[str],
) -> None:
print('post_save', sender, instance, using_db, created, update_fields)
@pre_delete(Signal)
async def signal_pre_delete(
sender: "Type[Signal]", instance: Signal, using_db: "Optional[BaseDBAsyncClient]"
) -> None:
print('pre_delete', sender, instance, using_db)
@post_delete(Signal)
async def signal_post_delete(
sender: "Type[Signal]", instance: Signal, using_db: "Optional[BaseDBAsyncClient]"
) -> None:
print('post_delete', sender, instance, using_db)
async def run():
await Tortoise.init(db_url="sqlite://:memory:", modules={"models": ["__main__"]})
await Tortoise.generate_schemas()
# pre_save,post_save will be send
signal = await Signal.create(name="Signal")
signal.name = "Signal_Save"
# pre_save,post_save will be send
await signal.save(update_fields=["name"])
# pre_delete,post_delete will be send
await signal.delete()
if __name__ == "__main__":
run_async(run())
以上代码可直接复制后运行,运行后的结果:
signal_pre_save <class '__main__.Signal'> Signal <tortoise.backends.sqlite.client.SqliteClient object at 0x7f8518319400> None
post_save <class '__main__.Signal'> Signal <tortoise.backends.sqlite.client.SqliteClient object at 0x7f8518319400> True None
signal_pre_save <class '__main__.Signal'> Signal_Save <tortoise.backends.sqlite.client.SqliteClient object at 0x7f8518319400> ['name']
post_save <class '__main__.Signal'> Signal_Save <tortoise.backends.sqlite.client.SqliteClient object at 0x7f8518319400> False ['name']
pre_delete <class '__main__.Signal'> Signal_Save <tortoise.backends.sqlite.client.SqliteClient object at 0x7f8518319400>
post_delete <class '__main__.Signal'> Signal_Save <tortoise.backends.sqlite.client.SqliteClient object at 0x7f8518319400>
可以发现,对模型进行保存和删除时候,都会调用对应的信号方法。
源码
从导包可以得知,tortoise的所有信号方法都在tortoise.signals
中。
from enum import Enum
from typing import Callable
Signals = Enum("Signals", ["pre_save", "post_save", "pre_delete", "post_delete"])
def post_save(*senders) -> Callable:
"""
Register given models post_save signal.
:param senders: Model class
"""
def decorator(f):
for sender in senders:
sender.register_listener(Signals.post_save, f)
return f
return decorator
def pre_save(*senders) -> Callable:
...
def pre_delete(*senders) -> Callable:
...
def post_delete(*senders) -> Callable:
...
其内部实现的四个信号方法分别是模型的保存后,保存前,删除前,删除后。
其内部装饰器代码也十分简单,就是对装饰器中的参数(也就是模型),注册一个监听者,而这个监听者,其实就是被装饰的方法。
如上面的官方示例中:
# 给模型Signal注册一个监听者,它是方法signal_pre_save
@pre_save(Signal)
async def signal_pre_save(
sender: "Type[Signal]", instance: Signal, using_db, update_fields
) -> None:
print('signal_pre_save', sender, instance, using_db, update_fields)
而到了Model类中,自然就有一个register_listener方法,定睛一看,上面示例Signal中并没有register_listener方法,所以自然就想到了,这个方法必定在父类Model中。
class Model:
...
@classmethod
def register_listener(cls, signal: Signals, listener: Callable):
...
if not callable(listener):
raise ConfigurationError("Signal listener must be callable!")
# 检测是否已经注册过
cls_listeners = cls._listeners.get(signal).setdefault(cls, []) # type:ignore
if listener not in cls_listeners:
# 注册监听者
cls_listeners.append(listener)
接下来注册后,这个listeners就会一直跟着这个Signal类。只需要在需要操作关键代码的地方,进行调用即可。
看看在模型save的时候,都干了什么?
async def save(
self,
using_db: Optional[BaseDBAsyncClient] = None,
update_fields: Optional[Iterable[str]] = None,
force_create: bool = False,
force_update: bool = False,
) -> None:
...
# 执行保存前的信号
await self._pre_save(db, update_fields)
if force_create:
await executor.execute_insert(self)
created = True
elif force_update:
rows = await executor.execute_update(self, update_fields)
if rows == 0:
raise IntegrityError(f"Can't update object that doesn't exist. PK: {self.pk}")
created = False
else:
if self._saved_in_db or update_fields:
if self.pk is None:
await executor.execute_insert(self)
created = True
else:
await executor.execute_update(self, update_fields)
created = False
else:
# TODO: Do a merge/upsert operation here instead. Let the executor determine an optimal strategy for each DB engine.
await executor.execute_insert(self)
created = True
self._saved_in_db = True
# 执行保存后的信号
await self._post_save(db, created, update_fields)
抛开其他代码,可以看到,在模型save的时候,其实是先执行保存前的信号,然后执行保存后的信号。
自己实现一个信号
有了以上的经验,可以自己实现一个信号,比如我打算做个数据处理器的类,我想在这个处理器工作中,监听处理前/后的信号。
# -*- coding: utf-8 -*-
from enum import Enum
from typing import Callable, Dict
# 声明枚举信号量
Signals = Enum("Signals", ["before_process", "after_process"])
# 处理前的装饰器
def before_process(*senders):
def decorator(f):
for sender in senders:
sender.register_listener(Signals.before_process, f)
return f
return decorator
# 处理后的装饰器
def after_process(*senders):
def decorator(f):
for sender in senders:
sender.register_listener(Signals.after_process, f)
return f
return decorator
class Model(object):
_listeners: Dict = {
Signals.before_process: {},
Signals.after_process: {}
}
@classmethod
def register_listener(cls, signal: Signals, listener: Callable):
"""注册监听者"""
# 判断是否已经存在监听者
cls_listeners = cls._listeners.get(signal).setdefault(cls, [])
if listener not in cls_listeners:
# 如果不存在,则添加监听者
cls_listeners.append(listener)
def _before_process(self):
# 取出before_process监听者
cls_listeners = self._listeners.get(Signals.before_process, {}).get(self.__class__, [])
for listener in cls_listeners:
# 调用监听者
listener(self.__class__, self)
def _after_process(self):
# 取出after_process监听者
cls_listeners = self._listeners.get(Signals.after_process, {}).get(self.__class__, [])
for listener in cls_listeners:
# 调用监听者
listener(self.__class__, self)
class SignalModel(Model):
def process(self):
"""真正的调用端"""
self._before_process()
print("Processing")
self._after_process()
# 注册before_process信号
@before_process(SignalModel)
def before_process_listener(*args, **kwargs):
print("before_process_listener1", args, kwargs)
# 注册before_process信号
@before_process(SignalModel)
def before_process_listener(*args, **kwargs):
print("before_process_listener2", args, kwargs)
# 注册after_process信号
@after_process(SignalModel)
def before_process_listener(*args, **kwargs):
print("after_process_listener", args, kwargs)
if __name__ == '__main__':
sm = SignalModel()
sm.process()
输出结果:
before_process_listener1 (<class '__main__.SignalModel'>, <__main__.SignalModel object at 0x7ff700116e50>) {}
before_process_listener2 (<class '__main__.SignalModel'>, <__main__.SignalModel object at 0x7ff700116e50>) {}
Processing
after_process_listener (<class '__main__.SignalModel'>, <__main__.SignalModel object at 0x7ff700116e50>) {}
来源:https://juejin.cn/post/7177652667911667768


猜你喜欢
- 问题一:python启动报错api-ms-win-crt-process-l1-1-0.dll 丢失解决:下载api-ms-win-crt-
- 上一一节我们讲了while循环,while循环主要用于重复程序的运行,for循环更加倾向于遍历一个项目,即将特定内容(比如一个列表、一个字符
- 前言我们的游戏资源处理工具是Python实现的,功能包括csv解析,UI材质处理,动画资源解析、批处理,Androd&iOS自动打包
- 本文介绍了从golang语言中fmt包从标准输入获取数据的Scan系列函数、从io.Reader中获取数据的Fscan系列函数以及从字符串中
- 废话不多说了,直接上代码吧:#Copyright (c)2017, 东北大学软件学院学生# All rightsreserved#文件名称:
- 在实际开发过程中,对数据库的操作无非就“增删改查”。就最为普遍的单表操作而言,除了表和字段不同外,语句都是类似的,开发人员需要写大量类似而枯
- 大致介绍Git是一款免费、开源的分布式版本控制系统,用于敏捷高效地处理任何或小或大的项目,可以有效、高速的处理从很小到非常大的项目版本管理。
- 前言现在很多的直播平台或者视频平台都会用到弹幕加强和观众的互动效果,那么如何用JS实现这样的效果呢,用一个初学者的方法记录下这个方法,欢迎大
- 简介testify可以说是最流行的(从 GitHub star 数来看)Go 语言测试库了。testify提供了很多方便的函数帮助我们做as
- 本周的豆知识分享就来深入研究一下window.event对象。请先看看下边的代码片断。 <button id=”btn”&g
- Django结合ajax进行页面实时更新踩过的坑简单记录一下在使用Django、echarts和ajax实现数据动态更新时遇到的一些坑: 1
- 我就废话不多说了,还是直接看代码吧!import numpyworld_alchol=numpy.genfromtxt("worl
- 如下所示:import pandas as pdfile = pd.read_csv('file.csv',iterator
- 这两天在搞Theano,要把mat文件转成pickle格式载入Python。Matlab是把一维数组当做n*1的矩阵的,但Numpy里还是有
- 本文侧重于如何使用Python语言实现SIFT算法所有程序已打包:基于OpenCV-Python的SIFT算法的实现一、什么是SIFT算法
- 由于工作的需求,需要用python做一个类似网络爬虫的采集器。虽然Python的urllib模块提供更加方便简洁操作,但是涉及到一些底层的需
- Windows中升级MySQL应采取的步骤:1. 进行升级前你应先备份当前的MySQL安装。2. 下载最新Windows版MySQL。3.
- paramikoparamiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ans
- 在vue中已经不像jq那样直接操作dom了,如果要指向当前选中项时,就不能再用jq的思路来做了,方法如下:当指向一个状态的时候,只让指向的状
- 前言在遇到三维数据时,三维图像能给我们对数据带来更加深入地理解。python的matplotlib库就包含了丰富的三维绘图工具。1.创建三维