python中进程间通信及设置状态量控制另一个进程
作者:PT、小小马 发布时间:2022-01-28 08:02:57
一、python中进程间通信
业务场景:在当前遇到的业务场景中,我们需要启一个间隔任务,这个间隔任务跑一个算法,然后把算法的结果进行一些处理,并入库。任务目前间隔是一小时,算法运行时间要50多分钟,留给结果处理的时间并不多,所以有可能会出现超时。目前来说,优化方向在算法上会更为合理,因为结果处理本来就不用很多时间。但是在这个业务场景下,想要把结果处理的时间进行无限压缩,压缩到0,其实也是可以实现的,说是压缩为0,实际上就是在算法执行完成后,再启一个进程去处理,这样就不会由于需要进行数据处理而影响到算法的运行,将算法和结果处理分为两个独立的进程去处理。在最开始的程序中,是把算法运行和结果处理作为一个周期,而现在是把算法运行和结果处理分为两个周期去处理。
技术实现方案:
启动二个进程,其中一个运行算法,在算法运行结束后,发送一个状态值到另外一个进程,另外一个进程在收到状态量后启动数据处理即可。两个进程间互不影响即可。其实也相当于算法进程控制数据处理进程
测试场景构造代码:
from multiprocessing import Process,Pipe
import time
import sys
import os
def send_message(conn):
for i in range(1000):
print('send_message:%d'%i)
print(os.getpid())
conn.send(i)
time.sleep(3)
def send_message1(conn):
# for i in range(1000):
print(conn.recv())
while True:
if conn.recv() % 5 == 0:
print(' today is nice day')
time.sleep(1)
if __name__ == '__main__':
#创建一个进程通信管道
left,right = Pipe()
t1 = Process(target=send_message,args=(left,))
t2 = Process(target=send_message1,args=(right,))
t1.start()
t2.start()
在这个案例场景下有一些需要注意的点:
一、time.sleep()的问题,睡眠指定时间,总是会出错,具体的出错原因到现在也没有找到,这是原来出现的问题,在这里没有做长时间的测试,所以不一定会出现,但是还是要注意
二、代码实现中与上述的描述差异有一些,如未启用调度任务,只是启了一个间隔运行的任务。
三、数据处理进程一直处理空跑状态,会造成资源的浪费(更合理的应该是形成阻塞状态,但是对于阻塞状态的构造缺乏认知,所以先牺牲资源
四、在上述描述的需求中,在算法运行及数据处理的上一节点还有一个调度任务在控制,这里未做出体现,其实应该把定时任务和数据处理作为两个周期独立出来才更符合上述描述中的需求。
二、设置状态量控制另一个进程
业务场景:在当前遇到的业务场景中,我们需要启一个间隔任务,这个间隔任务跑一个算法,然后把算法的结果进行一些处理,并入库。任务目前间隔是一小时,算法运行时间要50多分钟,留给结果处理的时间并不多,所以有可能会出现超时。目前来说,优化方向在算法上会更为合理,因为结果处理本来就不用很多时间。但是在这个业务场景下,想要把结果处理的时间进行无限压缩,压缩到0,其实也是可以实现的,说是压缩为0,实际上就是在算法执行完成后,再启一个进程去处理,这样就不会由于需要进行数据处理而影响到算法的运行,将算法和结果处理分为两个独立的进程去处理。在最开始的程序中,是把算法运行和结果处理作为一个周期,而现在是把算法运行和结果处理分为两个周期去处理。
上面的解决方案中只涉及到了启用两个进程去运行两个任务,并未涉及到启用定时任务框架,所以可能会显得和上述的业务场景不一致,所以在这里重新解决一下。上面也是没有问题的,只是把定时任务框架也作为一个任务去处理即可。然后在定时任务运行完程后,向另外一个进程传入一个参数,作为启动另一个进程的状态量即可。当然,在这里,两个进程还是完全占满的,即处理阻塞状态。对于资源的利用还是没有完全达到最好。后续再考虑使用进程池的方式,看是否可以让其中的一个进程运行完后直接释放资源。
技术解决方案如下:
from multiprocessing import Process,Pipe
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# schedule = BackgroundScheduler()
schedule = BlockingScheduler(timezone="Asia/Shanghai")
# schedule = AsyncIOScheduler(timezone="Asia/Shanghai")
def algorithm(conn):
print('start_run')
conn.send('please run')
# time.sleep(5)
def worth_result(conn):
while True:
if conn.recv() == 'please run':
print(conn.recv() + ' very nice!')
def time_job(conns):
schedule.add_job(func=algorithm,trigger='interval',seconds=5,args=(conns,))
schedule.start()
if __name__ == '__main__':
left,right = Pipe()
t1 = Process(target=time_job,args=(left,))
t2 = Process(target=worth_result,args=(right,))
t1.start()
t2.start()
在这里还有一些点需要说明,定时任务选择那一种类型其实都没有关系,阻塞和非阻塞其实没有关系,因为我们在这里是直接启了两个进程,每个进程间是相互独立的,并非是在定时任务下启用的两个进程,所以不会影响的。
关于这个解决方案还有的问题:
一、上述所说,两个进程是占满的,所以对于资源来说,两个进程的利用率一直很高
二、扩展性不足,如果在这个程序中还有其他需要处理的过程,就需要再添加进程,或者把他添加到当前的进程之下,代码重构会比较麻烦一些
三、整个任务的控制不足,需要加以完善。比如对于运行状态一些控制及查看,一般程序如果运行时间较长的话,我们应该添加这样的接口,否则启动后如果没有出结果,我们是不知道其运行状态,有一点被动
四、关于三,使用logging库,应该是可以直接去输出其日志,但是日志库作为第三方库,相当于是对整个运行状态进行监控,会不会再占用一个进程,这个需要去测试
五、完备性及容灾处理,如果程序由于资源等其他问题挂掉后,会有一些数据冗余下来,也就是一些算法未进行处理,这个时候需要考虑怎么样去补数据?原始文件如果没有保留下来呢?而且如果这些数据是极重要的数据该怎么处理?如果程序挂掉后,应该如何快速的去处理呢?直接重启吗?
六、如果数据处理的进程所用的时间比算法还多,那该怎么办?目前的业务来看,是远低于的,但是如果是远高于呢?可否将处理工作进行分配,利用多台机器来处理,然后再把结果合并起来?
分布式处理的思想越来越浓。
来源:https://blog.csdn.net/qq_44862918/article/details/124842372
猜你喜欢
- 本文研究的主要是python PIL实现图片合成的相关内容,具体介绍如下,分享实例代码。在项目中需要将两张图片合在一起。遇到两种情况,一种就
- 严格控制Session可以将不需要Session的内容(比如帮助画面,访问者区域,等等)移动到关闭Session的独立ASP应用程序中。在基
- 1. 前言相信参与使用Oracle数据库进行项目开发、运维的同学常常被Oracle JDBC驱动的Maven依赖折磨。现在这一情况在今年二月
- 本文实例讲述了Smarty实现页面静态化(生成HTML)的方法。分享给大家供大家参考,具体如下:为了减少数据库读取次数,某些内容不经常被更改
- suspect_pages 表位于 msdb 数据库中,是在 SQL Server 2005 中引入的。用于维护有关可疑页的信息的 susp
- 这篇文章是入门级别的应用Python + Selenium进行自动化测试,包括环境搭建及简单的实例。基本思想是用Firefox Seleni
- 在极坐标中,圆的表示方式为:x=x0+rcosθy=y0+rsinθ圆心为(x0,y0),r为半径,θ为旋转度数,值范围为0-359如果给定
- 之前一直没弄清jsonArray和jsonObject的区别,今天终于弄明白了。jsonArray是数组,以[ ]包含数据,jsonObje
- vue开发依赖的相关配置Vue SSR 指南今天先做客户端方面的配置,明天再做服务端的部分。那么马上开始吧~修改部分代码脚手架生成的代码肯定
- 前言k-means算法是无监督的聚类算法,实现起来较为简单,k-means++可以理解为k-means的增强版,在初始化中心点的方式上比k-
- PHP PDO 预处理语句与存储过程很多更成熟的数据库都支持预处理语句的概念。什么是预处理语句?可以把它看作是想要运行的 SQL 的一种编译
- python中迭代器和iter()函数迭代器为类序列对象提供了一个类序列的接口。python的迭代无缝地支持序列对象,而且它还允许程序员迭代
- 插件说明:插件根据提供的10位ISBN书号,在Amazon网站上查找该图书的详细信息。如果找到结果,则返回一个两元素的数组,其中第一个元素是
- 理论傅立叶变换用于分析各种滤波器的频率特性,对于图像,2D离散傅里叶变换(DFT)用于找到频域.快速傅里叶变换(FFT)的快速算法用于计算D
- JSONJSON 起源JSON 全称 JavaScript Object Notation 。是处理对象文字语法的 JavaScript 编
- 一开始用Firefox加Firebug/YSlow插件分析,但是firefox不能运行自定义的javascript,好像还要装什么插件。于是
- Matlab常用的输出命令1、disp方法(1)方法(2)方法(3)需要注意:直接加数字不会显示数字,num2str()使数值转换为字符串类
- 本文实例讲述了JavaScript实现父子dom同时绑定两个点击事件,一个用捕获,一个用冒泡时执行顺序的方法。分享给大家供大家参考,具体如下
- 前言matplotlib 是Python最著名的绘图库,它提供了一整套和matlab相似的命令API,十分适合交互式地进行制图。本文将以例子
- 公司安排了个任务,爬取ppt资源,我爬取后打开ppt发现,最后一页是站点的宣传,需要删除。仔细阅读了python-pptx的api和国内的教