Python爬虫程序中使用生产者与消费者模式时进程过早退出的问题
作者:smart_cat 发布时间:2022-10-12 03:37:52
标签:Python,生产者,消费者,进程早退
之前写爬虫程序的时候,采用生产者和消费者的模式,利用Queue作为生产者进程和消费者进程之间的同步队列。
执行程序时,总是秒退,加了断点也无法中断,加打印也无法输出,我知道肯定是进程退出了,但还是百思不得解,为什么会这么快就退出。
一开始以为是我的进程代码写的有问题,在某个地方崩溃导致程序提前退出,排查了一遍又一遍,并没有发现什么明显的问题,后来走读代码,看到主模块中消费者和生产者进程的启动后,发现了问题,原因是我通过start()方法启动进程后,使用join()的方式有问题。消费者进程必须执行join()操作,否则消费者进程将在有时间完成所有工作之前被终止。
错误的示范:
queue = multiprocessing.JoinableQueue()
consumer = PageContentConsumer(queue)
consumer.start()
producer = PageContentProducer(queue)
producer.start()
# 想通过queue的join()方法确保queue中的元素都被处理完毕
# 但从实际运行看,消费者进程还没来得及处理就退出了
queue.join()
正确的示范:
queue = multiprocessing.JoinableQueue()
consumer = PageContentConsumer(queue)
consumer.start()
producer = PageContentProducer(queue)
producer.start()
# 需要执行producer.join(),确保生产者进程能够持续执行
producer.join()
# 需要执行consumer.join(),确保消费者进程有时间进行处理
consumer.join()
# 通过queue的join()方法确保queue中的元素都被处理完毕, 这一步可选,因为真实代码里放了队列完成标志
queue.join()
生产者进程示意代码:
class PageContentProducer(multiprocessing.Process):
def __init__(self, page_list:list, output_queue:multiprocessing.JoinableQueue):
multiprocessing.Process.__init__(self)
self.daemon = True
self.page_list = page_list
self.content_list = [] # 用于保存汇总信息,没有什么实际作用
self.output_queue = output_queue
def run(self):
'''
向队列中加入每一篇文章
'''
self.visit_all_page_to_get_content()
def visit_all_page_to_get_content(self):
'''
使用线程池处理所有的page, 并从每一页上提取所有的文章content
'''
...
消费者进程示意代码:
class PageContentConsumer(multiprocessing.Process):
def __init__(self, dir, input_queue:multiprocessing.JoinableQueue):
multiprocessing.Process.__init__(self)
self.daemon = True
self.input_queue = input_queue
self.dir = dir
def run(self):
while True:
try:
content = self.input_queue.get()
if content is None:
# 如果收到结束标志, 就退出当前任务
break
self.content_worker_func(self.dir, content)
print(f"已处理: {content['title']}")
# 发出信号通知任务完成
self.input_queue.task_done()
except Exception as e:
print(repr(e))
def content_worker_func(self, dir, content):
'''
主要工作函数
'''
...
主模块代码示意如下:
if __name__ == '__main__':
page_list = [xxxx]
queue = multiprocessing.JoinableQueue()
consumer_num = os.cpu_count()
consumers = []
for i in range(0, consumer_num):
consumers.append(PageContentConsumer(dir, queue))
for i in range(0, consumer_num):
consumers[i].start()
producer = PageContentProducer(page_list, queue)
producer.start()
producer.join()
# 在队列上放置标志,发出完成信号, 有几个消费者,就需要放置多少个标志
for i in range(0, consumer_num):
queue.put(None)
# 等待消费者进程关闭
for i in range(0, consumer_num):
consumers[i].join()
来源:https://blog.csdn.net/hubing_hust/article/details/128362635
0
投稿
猜你喜欢
- 前言在网页应用中,我们经常需要在处理完表单或其它类型的用户输入后,显示一个通知信息给用户。对于这个需求,Django提供了基于Cookie或
- 在深度学习中,模型的输入size通常是正方形尺寸的,比如300 x 300这样.直接resize的话,会把图像拉的变形.通常我们希望resi
- 所用拓展模块 xlrd: Python语言中,读取Excel的扩展工
- MySQL中删除数据表是非常容易操作的, 但是你再进行删除表操作时要非常小心,因为执行删除命令后所有数据都会消失。语法以下为删除MySQL数
- 本文实例为大家分享了Golang实现文件传输的具体代码,供大家参考,具体内容如下借助TCP完成文件的传输,基本思路如下:1、发送方(客户端)
- 工作中,经常会有用python访问各种数据库的需求,比如从oracle读点配置文件或者往mysql写点结果信息之类的。这里列一下可能用到的各
- Python图片处理模块PIL(pillow)pywin32的主要作用1.捕获窗口;2.模拟鼠标键盘动作;3.自动获取某路径下文件列表;4.
- 本文实例讲述了Django中使用group_by的方法。分享给大家供大家参考。具体分析如下:在Django中怎样使用group_by语句呢?
- operator模块是python中内置的操作符函数接口,它定义了一些算术和比较内置操作的函数。operator模块是用c实现的,所以执行速
- 1.TCP是一种面向连接的可靠地协议,在一方发送数据之前,必须在双方之间建立一个连接,建立的过程需要经过三次握手,通信完成后要拆除连接,需要
- 索引是什么?索引是帮助MySQL进行高效查询的一种数据结构。好比一本书的目录,能加快查询的速度索引的结构?索引可以有B-Tree索引,Has
- 1、函数实现# -*- coding: utf-8 -*-def tail(filename, n=10): with open
- jetbrains IDE的插件加载不出来场景Win10、IDEA 2020.2、电脑配置了HTTP/HTTPS/socks梯子代理。想要给
- 本文实例讲述了Python使用中文正则表达式匹配指定中文字符串的方法。分享给大家供大家参考,具体如下:业务场景:从中文字句中匹配出指定的中文
- 语言的内存管理是语言设计的一个重要方面。它是决定语言性能的重要因素。无论是C语言的手工管理,还是Java的垃圾回收,都成为语言最重要的特征。
- 在使用SQL Server存储过程或者触发器时,通常会使用自定义异常来处理一些特殊逻辑。例如游标的销毁,事务的回滚。接下来将会详细的介绍SQ
- 作为一门脚本语言,写脚本时执行系统命令可以说很常见了,python提供了相关的模块和方法。os模块提供了访问操作系统服务的功能,由于涉及到操
- 前言我最近都在写一些Python 3.8的新功能介绍的文章,在自己的项目中也在提前体验新的Python版本。为什么我对这个Python 3.
- //只能在firefox下运行 var test = { name: "puterjam", __noSuchMetho
- 方法一:直接右键,将文章路径复制下来点击Copy full Xpath使用selenium+lxml中的etree进行配合使用,使用etre