Python使用apscheduler模块设置定时任务的实现
作者:redrose2100 发布时间:2021-05-01 05:01:54
一、安装
pip install apscheduler
二、ApScheduler 简介
1 APScheduler的组件
triggers:触发器
triggers包含任务执行的调度逻辑,决定任务按照什么逻辑进行定时执行
job stores;任务存储器
存储了调度任务
executors:执行器
用例执行任务的,包含线程池以及进程池等的创建和调用等等
schedulers:调度器
属于控制面,将其他几个方面组织起来的作用、
2 调度器的种类
调度器有以下几种常见类型,其中最常用的BackgroundScheduler,即非阻塞式,因为在一般情况下,定时任务都会在放到web服务中,如果使用阻塞式,则无法启动web服务,而使用非阻塞式,则将定时任务设定后,就不管了,继续执行后面的web服务,只要web服务在运行,定时任务就是一直有效的
BlockingScheduler: 阻塞式
BackgroundScheduler: 非阻塞式(后台运行)
AsyncIOScheduler: 当使用asyncio模块时使用
GeventScheduler: 当使用gevent模块时使用
TornadoScheduler: 构建Tornado应用时使用
TwistedScheduler: 构建Twisted应用时使用
QtScheduler: 构建Qt应用时使用
3 内置的触发器类型
date: 在某个时间点执行一次时使用
interval: 固定的时间间隔循环执行时使用
cron: 在一天 * 定的时间点执行时使用
calendarinterval: 当想在在一天 * 定时间点或以日历为基础的时间间隔内执行时使用
三、使用举例
这里jiu就以非阻塞式BackgroundScheduler调度器为例展开
1 使用date类型的触发器
如下,使用了三种设置日期和时间的方法
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime
def do_func(name,age):
print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))
def main():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
sched=BackgroundScheduler()
# 通过date 设置指定日期执行
sched.add_job(do_func,trigger="date",run_date=date(2022,5,25),args=("张三丰",100))
# 通过datetime,设置指定日期额指定时刻执行
sched.add_job(do_func, trigger="date", run_date=datetime(2022, 5, 25,14,0,10), args=("张三丰", 100))
# 直接使用文本的方式指定日期和时刻表
sched.add_job(do_func, trigger="date", run_date="2022-05-25 14:0:20", args=("张三丰", 100))
sched.start()
if __name__=="__main__":
main()
while True:
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(1)
执行结果如下,可以发现,第一个通过date指定日期的默认是0点执行,显然时间已经过了,不会执行,第二和第三个则在规定的时间点执行了,这里还需要注意的是,通过打印可以看出,main函数执行完成后,已经开始执行main函数下面的while循环打印语句了,而在执行循环的过程中,定时任务仍然生效,这就是非阻塞式调度器的原理,如果是阻塞式,则在此代码中,会一直卡在main函数中,main下面额while循环语句是不会执行的,因此在实际使用中,非阻塞式应用的是非常多的
2022-05-25 14:00:02
2022-05-25 14:00:02
C:\Users\hitre\.virtualenvs\zentaolinkgitlab-QCS5yxD9\lib\site-packages\apscheduler\util.py:436: PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495). For more details on migrating to a PEP 495-compliant implementation, see https://pytz-deprecation-shim.readthedocs.io/en/latest/migration.html
return tzinfo.localize(dt)
Run time of job "do_func (trigger: date[2022-05-25 00:00:00 CST], next run at: 2022-05-25 00:00:00 CST)" was missed by 14:00:02.088260
2022-05-25 14:00:03
2022-05-25 14:00:04
2022-05-25 14:00:05
2022-05-25 14:00:06
2022-05-25 14:00:07
2022-05-25 14:00:08
2022-05-25 14:00:09
2022-05-25 14:00:10 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:00:10
2022-05-25 14:00:11
2022-05-25 14:00:12
2022-05-25 14:00:13
2022-05-25 14:00:14
2022-05-25 14:00:15
2022-05-25 14:00:16
2022-05-25 14:00:17
2022-05-25 14:00:18
2022-05-25 14:00:19
2022-05-25 14:00:20 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:00:20
2022-05-25 14:00:21
2022-05-25 14:00:22
2 使用interval类型的触发器
如下代码演示了时间间隔循环执行的使用例子
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime
def do_func(name,age):
print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))
def main():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
sched=BackgroundScheduler()
# 每3秒执行一次
sched.add_job(do_func,trigger="interval",args=("张三丰",100),seconds=3)
# 每3分钟执行一次
sched.add_job(do_func, trigger="interval", args=("张三丰", 100), minutes=3)
# 每3小时执行一次
sched.add_job(do_func, trigger="interval", args=("张三丰", 100), hours=3)
# 每3天执行一次
sched.add_job(do_func, trigger="interval", args=("张三丰", 100), days=3)
# 每3周执行一次
sched.add_job(do_func, trigger="interval", args=("张三丰", 100), weeks=3)
sched.start()
if __name__=="__main__":
main()
while True:
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(1)
上面的代码中因为时间跨度比较大,这里只演示妹3秒执行一次的代码
代码如下:
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime
def do_func(name,age):
print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))
def main():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
sched=BackgroundScheduler()
# 每3秒执行一次
sched.add_job(do_func,trigger="interval",args=("张三丰",100),seconds=3)
sched.start()
if __name__=="__main__":
main()
while True:
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(1)
执行结果如下:
2022-05-25 14:14:04
2022-05-25 14:14:04
2022-05-25 14:14:05
2022-05-25 14:14:06
2022-05-25 14:14:07 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:14:07
2022-05-25 14:14:08
2022-05-25 14:14:09
2022-05-25 14:14:10 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:14:10
2022-05-25 14:14:11
2022-05-25 14:14:12
2022-05-25 14:14:13 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:14:13
3 使用cron类型的触发器
cron的触发器有点类似linux上crontab定时器的使用,代码如下:
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime
def do_func(name,age):
print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))
def main():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
sched=BackgroundScheduler()
# 任务会在6月、7月、8月、11月和12月的第三个周五,00:00、01:00、02:00和03:00触发
sched.add_job(do_func,trigger="cron",month='6-8,11-12', day='3rd fri', hour='0-3',args=("张三丰",100))
sched.start()
if __name__=="__main__":
main()
while True:
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(1)
这里需要注意的是,可以省略不需要的字段。当省略时间参数时,在显式指定参数之前的参数会被设定为*,之后的参数会被设定为最小值,week 和day_of_week的最小值为*
day=1, minute=20
等同于
year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0
此外,也可以直接使用crontab表达式,如下:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import time
from datetime import date
from datetime import datetime
def do_func(name,age):
print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))
def main():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
sched=BackgroundScheduler()
# 任务会在6月、7月、8月、11月和12月的第三个周五,00:00、01:00、02:00和03:00触发
sched.add_job(do_func,trigger=CronTrigger.from_crontab('48 10 1-15 sep-nov *'),args=("张三丰",100))
sched.start()
if __name__=="__main__":
main()
while True:
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(1)
四、定时器使用装饰器的方法
以间隔时间循环执行的代码为例,如下为未使用装饰器的方式
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime
def do_func(name,age):
print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))
def main():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
sched=BackgroundScheduler()
# 每3秒执行一次
sched.add_job(do_func,trigger="interval",args=("张三丰",100),seconds=3)
sched.start()
if __name__=="__main__":
main()
while True:
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(1)
修改为使用装饰器的方式如下:
from apscheduler.schedulers.background import BackgroundScheduler
import time
sched=BackgroundScheduler()
@sched.scheduled_job(trigger="interval",args=("张三丰",100),seconds=3)
def do_func(name,age):
print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))
if __name__=="__main__":
sched.start()
while True:
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(1)
执行结果如下:
2022-05-25 14:34:10
2022-05-25 14:34:11
2022-05-25 14:34:12
2022-05-25 14:34:13 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:34:13
2022-05-25 14:34:14
2022-05-25 14:34:15
2022-05-25 14:34:16 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:34:16
2022-05-25 14:34:17
2022-05-25 14:34:18
2022-05-25 14:34:19 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:34:19
2022-05-25 14:34:20
来源:https://blog.csdn.net/redrose2100/article/details/124965778
猜你喜欢
- 本文导读:删除表中的数据的方法有delete,truncate, 其中TRUNCATE TABLE用于删除表中的所有行,而不记录单个行删除操
- 需求是, 同一个页面, 有多组(不固定), 每组区块数量不一定一样的小区块. 要求每次只展开一个区块. 实现原理其实很简单, 点击导航, 若
- 问题:Python2获取包含中文的文件名是如果不转码会出现乱码。这里假设要测试的文件夹名为test,文件夹下有5个文件名包含中文的文件分别为
- 引言随着ES6新语法的不断迭代更新,已经出现了许多常用的工具api。今天我将为大家推荐两款明星api,它们就是Object.freeze和O
- 发现问题在golang中,对文件进行写操作时出现上面的错误。首先复现下这个问题。package mainimport ( "os&
- 本节我们首先来尝试识别最简单的一种验证码,图形验证码,这种验证码出现的最早,现在也很常见,一般是四位字母或者数字组成的,例如中国知网的注册页
- torch中tensor数据类型转换转换为float32类型:points = points.type(torch.float32)读取to
- SQL Server是一种关系型数据库管理系统(RDBMS),由微软公司开发和维护。它支持结构化查询语言(SQL)和Transact-SQL
- 这方面我还是一个freshman,不过看了一些文章,经过一些实践后也算是有了一些想法。希望如果有这方面的前辈路过的话,能不吝指教。首先,作为
- 创建数据表的SQL语句如下: string tatlename = "T_useruid";//定义一个变量。用于自动创
- asp如何获知页面上的图象的实际尺寸大小?见下面的两个asp文件:<!--#include virtual="/i
- 本文实例讲述了Python正则替换字符串函数re.sub用法。分享给大家供大家参考,具体如下:python re.sub属于python正则
- MySQL 客户端连接成功后,通过 show [session|global]status 命令 可以提供服务器状态信息,也可以在操作系统上
- 在VsCode中ctrl+s后会在当前目录下自动生成dist目录解决办法:关闭compile-hero插件在设置中搜索compile-her
- XML是一个精简的SGML,它将SGML的丰富功能与HTML的易用性结合到Web的应用中。XML保留了SGML的可扩展功能,这使XML从根本
- 昨天公司的网络更换,然后在使用git操作代码的时候,遇见了标题所述问题。仅以此文,记录该问题的解决过程。首先第一步: 得到本机的I
- 配置数据库密码特殊字符报错一般的springboot项目会有application.yml或者application.properties文
- 发现报错对于Navicat连接数据库mysql 出现报错2003(10060),如下所示报错经过查询总结以上出现问题的主要解决方法三种:1、
- 今天在运行登录远程桌面的时候发现了这个提示关于 “ MySQL Installer is running in Community mode
- 做前端也有几年时间了,不敢说能把他看地多透,但是多多少少还是有些自己的东西。下面以 Tudou.com 的首页为例,总结总结。就制作而言,我