解决python3 pika之连接断开的问题
作者:moxiaomomo 发布时间:2021-09-28 18:40:09
问题描述
在消费rabbitMQ队列时, 每次进入回调函数内需要进行一些比较耗时的操作;操作完成后给rabbitMQ server发送ack信号以dequeue本条消息。
问题就发生在发送ack操作时, 程序提示链接已被断开或socket error。
源码示例
#!/usr/bin
#coding: utf-8
import pika
import time
USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'
def callback(ch, method, properties, body):
print(body)
time.sleep(600)
ch.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
ch.basic_ack(delivery_tag = method.delivery_tag)
def test_main():
s_conn = pika.BlockingConnection(
pika.ConnectionParameters('127.0.0.1',
credentials=pika.PlainCredentials(USER, PWD)))
chan = s_conn.channel()
chan.queue_declare(queue=TEST_QUEUE)
chan.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
chan.basic_consume(callback, queue=TEST_QUEUE)
chan.start_consuming()
if __name__ == "__main__":
test_main()
运行一段时间后, 就会报错:
[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None
[CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed
[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')
问题排查
猜测:pika客户端没有及时发送心跳,连接被server断开
一开始修改了heartbeat_interval参数值, 示例如下:
def test_main():
s_conn = pika.BlockingConnection(
pika.ConnectionParameters('127.0.0.1',
heartbeat_interval=10,
socket_timeout=5,
credentials=pika.PlainCredentials(USER, PWD)))
# ....
修改后运行依然报错,后来想想应该单线程被一直占用,pika无法发送心跳;
于是又加了个心跳线程, 示例如下:
#!/usr/bin
#coding: utf-8
import pika
import time
import logging
import threading
USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'
class Heartbeat(threading.Thread):
def __init__(self, connection):
super(Heartbeat, self).__init__()
self.lock = threading.Lock()
self.connection = connection
self.quitflag = False
self.stopflag = True
self.setDaemon(True)
def run(self):
while not self.quitflag:
time.sleep(10)
self.lock.acquire()
if self.stopflag :
self.lock.release()
continue
try:
self.connection.process_data_events()
except Exception as ex:
logging.warn("Error format: %s"%(str(ex)))
self.lock.release()
return
self.lock.release()
def startHeartbeat(self):
self.lock.acquire()
if self.quitflag==True:
self.lock.release()
return
self.stopflag=False
self.lock.release()
def callback(ch, method, properties, body):
logging.info("recv_body:%s" % body)
time.sleep(600)
ch.basic_ack(delivery_tag = method.delivery_tag)
def test_main():
s_conn = pika.BlockingConnection(
pika.ConnectionParameters('127.0.0.1',
heartbeat_interval=10,
socket_timeout=5,
credentials=pika.PlainCredentials(USER, PWD)))
chan = s_conn.channel()
chan.queue_declare(queue=TEST_QUEUE)
chan.basic_consume(callback,
queue=TEST_QUEUE)
heartbeat = Heartbeat(s_conn)
heartbeat.start() #开启心跳线程
heartbeat.startHeartbeat()
chan.start_consuming()
if __name__ == "__main__":
test_main()
尝试运行,结果还是不行,不得不安静下来思考自己是不是想错了。
去看它的api,看到heartbeat_interval的解析:
:param int heartbeat_interval: How often to send heartbeats.
Min between this value and server's proposal
will be used. Use 0 to deactivate heartbeats
and None to accept server's proposal.
按这样说法,应该还是没有把心跳值给设置好。上面的程序期望是10秒发一次心跳,但是理论上发送心跳的间隔会比10秒多一点。所以艾玛,我应该是把heartbeat_interval的作用搞错了, 它是指超过这个时间间隔不发心跳或不给server任何信息,server就会断开连接, 而不是说pika会按这个间隔来发心跳。 结果我把heartbeat_interval值设置高一点(比实际发送心跳/信息的间隔更长),比如上面设置成60秒,就正常运行了。
如果不指定heartbeat_interval, 它默认为None, 意味着按rabbitMQ server的配置来检测心跳是否正常。
如果设置heartbeat_interval=0, 意味着不检测心跳,server端将不会主动断开连接。
来源:https://blog.csdn.net/moxiaomomo/article/details/77414831


猜你喜欢
- 文件的介绍什么是文件?如图展示:使用文件的目的:保存数据存放在磁盘,把一些存储存放起来,可以让程序下一次执行的时候直接使用,而不必重新制作一
- 简介在廖雪峰的python网站上,他是这么说的python是动态语言,它允许程序在执行过程中动态绑定属性或者方法(使用MethodTpye)
- 1.前言(闲话)最近在做电磁炮,发现题目需要用到颜色跟踪,于是花了一点时间学了一下OpenMV,只学习OpenMV是远远不够的,还需要实现与
- 在我的印象里面进制互相转换确实是很常见的问题,所以在Python中,自然也少不了把下面这些代码收为util。这是从网上搜索的一篇也的还可以的
- 版本:MySQL-5.7.32前言:对于业务繁忙的数据库来说,在运行了一定时间后,往往会产生一些数据量较大的表,特别是对于每天新增数据较多的
- 列表UL或是OL中都有一个预设标记,这个标记可能是实点圆点,也可能是数字。在实际的应用中,我们需要去掉这个预设标记,但我们不清楚这个预设标记
- Python GUI 库有很多,下面给大家罗列常用的几种 GUI 库。下面介绍的这些GUI框架,能满足大部分开发人员的需要,你可以根据自己的
- 使用router.meta.keepAlive对页面进行缓存需求:1. 从stockList页面到stockInfo页面,从stockInf
- 用法示例:import random# 1)随机小数print(random.random()) # 获取大于0且小于1 之间的小数 &nb
- 和数据库打交道要频繁地用到SQL语句,除非你是全部用控件绑定的方式,但采用控件绑定的方式存在着灵活性差、效率低、功能弱等等缺点。因此,大多数
- 一、pyinstaller简介Python是一个脚本语言,被解释器解释执行。它的发布方式:.py文件:对于开源项目或者源码没那么重要的,直接
- 如下所示:$preg= '/xue[\s\S]*?om/i';preg_match_all($preg,"学并思网
- 今天看到一个教程,是关于Python安装pygame模块的。觉得很好,拿来分享一下。安装Python额,这个小题貌似在这里很是多余啊。但是为
- 前几天玩了玩Google的Map API,感觉还不错,很简单。但凡有过任何编程经验的同学,看完以下的教程,都可以在10分钟内掌握它的主要功能
- 假设在python中有一字典如下:x={‘a':'1,2,3', ‘b':'2,3,4'}需
- 何为标准化:在数据分析之前,我们通常需要先将数据标准化(normalization),利用标准化后的数据进行数据分析。数据标准化也就是统计数
- 本文实例为大家分享了检测几何图形轮廓和检测花朵图形轮廓,供大家参考,具体内容如下OpenCV绘制图像轮廓绘制轮廓的一般步骤:1、读取图像im
- 前言InnoDB采用按表空间(tablespace)的方式进行存储数据, 默认配置情况下会有一个初始大小为10MB, 名字为ibdata1的
- OpenCV的全称是:Open Source Computer Vision Library。OpenCV是一个基于(开源)发行的跨平台计算
- 翻译说明:这是Solid State Group网站上的一篇很友好的文章,解决了我在设计中遇到的很多问题,故在此我翻译其文,并对原作者表示非