Python使用signal定时结束AsyncIOScheduler任务的问题
作者:返回主页临渊(v:superz-han) 发布时间:2022-12-19 21:28:11
标签:Python,signal,AsyncIOScheduler,任务
在使用aiohttp结合apscheduler的AsyncIOScheduler模拟定点并发的时候遇到两个问题
在调度器scheduler.start()后,程序直接退出(在Jupiter中任务可以正常启动)
如何在指定时间调用scheduler.shutdown()? (因为程序直接退出了)
原调试代码如下:
from datetime import datetime, timedelta
import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def get(session):
url = 'https://httpbin.org/get?a=1'
async with session.get(url) as res:
print('get', res.status)
return await res.text()
async def post(session):
url = 'https://httpbin.org/post?b=2'
async with session.post(url) as res:
print('post', res.status)
return await res.text()
async def main():
async with aiohttp.ClientSession() as session:
await get(session)
await post(session)
if __name__ == '__main__':
jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
scheduler = AsyncIOScheduler(jobstores=jobstores)
for i in range(10): # 添加10个任务
job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
scheduler.start()
Google后发现AsyncIOScheduler的使用需要在scheduler启动后,需要自己调用asyncio.get_event_loop().run_forever()
来启动协程任务。
但是一旦run_forever()则就会阻塞至死。除非有KeyboardInterrupt, SystemExit等异常或者强杀来停止其运行。
此时想到使用Python的signal来定时发送信号,修改后程序如下,可以正常延迟停止(感觉有点像模拟Go的defer)。
# -*- coding: utf-8 -*-
"""
@Time : 2021/7/23
@Auth : hanzhichao
@Desc:
"""
from datetime import datetime, timedelta
import signal
import asyncio
import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def get(session):
url = 'https://httpbin.org/get?a=1'
async with session.get(url) as res:
print('get', res.status)
return await res.text()
async def post(session):
url = 'https://httpbin.org/post?b=2'
async with session.post(url) as res:
print('post', res.status)
return await res.text()
async def main():
async with aiohttp.ClientSession() as session:
await get(session)
await post(session)
if __name__ == '__main__':
jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
scheduler = AsyncIOScheduler(jobstores=jobstores)
for i in range(10): # 添加10个任务
job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
scheduler.start()
signal.alarm(20) # 20秒后终止程序
asyncio.get_event_loop().run_forever() # 永远运行
来源:https://www.cnblogs.com/superhin/p/15060818.html


猜你喜欢
- 前言保留小数位是我们经常会碰到的问题,尤其是刷题过程中。那么在python中保留小数位的方法也非常多,但是笔者的原则就是什么简单用什么,因此
- 这个问题让我查了许多天才解决,为了避免后面的人重复走弯路,记录下来。问题描述:我在ubuntu 下编译安装了caffe ,在命令行模式下可以
- 一、背景近期项目即将开展,计划第一步就是实现数据的可视化,所以先学习一下数据展示相关Demo。选用Python2.7与Matplotlib来
- Go 处理 json数据主要就是使用 json 包下的 Marshal 和 UnMarshal 两个函数。定义结构体 Usertype Us
- import osimport sysimport MySQLdbdef getStatus(conn):  
- 初入深度学习,就遇到了困难,一直安装不了tensorflow和keras库!!!真是让人着急!!!在经过无数次尝试,看了无数篇博客之后,终于
- 这是一个获取字符串中两个子串之间的子串,如从字符串www.aspxhome.com中获取coderbolg子串,就让这个PHP函数来实现吧,
- 1,js取得IP地址的方法一<script src="http://pv.sohu.com/cityjson?ie=utf-
- 如下所示:# coding:utf-8import osfrom PIL import Image# bmp 转换为jpgdef bmpTo
- 1、将python打包好的exe解压为py文件,步骤如下: 下载pyinstxtractor.py文件2、下载地址:https://nchc
- 本文实例讲述了PHP获取二叉树镜像的方法。分享给大家供大家参考,具体如下:问题操作给定的二叉树,将其变换为源二叉树的镜像。解决思路翻转二叉树
- 为了让大家更好的对python中WSGI有更好的理解,我们先从最简单的认识WSGI着手,然后介绍一下WSGI几个经常使用到的接口,了解基本的
- requests上传excel数据流headers=self.headers #获取导
- 什么是 Socket?Socket又称"套接字",应用程序通常通过"套接字"向网络发出请求或者应答网
- 前言这篇文章通过实例给大家讲解Python3中print带颜色输出的格式,下面话不多说了,来一起看看详细的介绍吧\033[显示方式;前景色;
- 本文实例讲述了PHP实现数组根据某个字段进行水平合并,横向合并。分享给大家供大家参考,具体如下:PHP数组水平合并,横向合并,两条数据合并成
- 在Python中有时会碰到需要一个一维列向量(n*1)与另一个一维列向量(n*1)的转置(1*n)相乘,得到一个n*n的矩阵的情况。但是在p
- 一:Zmail的优势:1:自动填充大多数导致服务端拒信的头信息(From To LocalHost之类的)2:将一个字典映射为email,构
- 今天,发现了一个之前从未注意的角落,相信能够大大提高自己写JS的速度。能够迅速发现错误。例如,今天的加班中调试一个js错误发现的一个例子。1
- 类的参数定义将conda环境设置为ai,conda activate ai这个文件的由来:由于在yolov1的pytorch实现的损失函数中