解决python3 pika之连接断开的问题
作者:moxiaomomo 发布时间:2021-09-28 18:40:09
问题描述
在消费rabbitMQ队列时, 每次进入回调函数内需要进行一些比较耗时的操作;操作完成后给rabbitMQ server发送ack信号以dequeue本条消息。
问题就发生在发送ack操作时, 程序提示链接已被断开或socket error。
源码示例
#!/usr/bin
#coding: utf-8
import pika
import time
USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'
def callback(ch, method, properties, body):
print(body)
time.sleep(600)
ch.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
ch.basic_ack(delivery_tag = method.delivery_tag)
def test_main():
s_conn = pika.BlockingConnection(
pika.ConnectionParameters('127.0.0.1',
credentials=pika.PlainCredentials(USER, PWD)))
chan = s_conn.channel()
chan.queue_declare(queue=TEST_QUEUE)
chan.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
chan.basic_consume(callback, queue=TEST_QUEUE)
chan.start_consuming()
if __name__ == "__main__":
test_main()
运行一段时间后, 就会报错:
[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None
[CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed
[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')
问题排查
猜测:pika客户端没有及时发送心跳,连接被server断开
一开始修改了heartbeat_interval参数值, 示例如下:
def test_main():
s_conn = pika.BlockingConnection(
pika.ConnectionParameters('127.0.0.1',
heartbeat_interval=10,
socket_timeout=5,
credentials=pika.PlainCredentials(USER, PWD)))
# ....
修改后运行依然报错,后来想想应该单线程被一直占用,pika无法发送心跳;
于是又加了个心跳线程, 示例如下:
#!/usr/bin
#coding: utf-8
import pika
import time
import logging
import threading
USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'
class Heartbeat(threading.Thread):
def __init__(self, connection):
super(Heartbeat, self).__init__()
self.lock = threading.Lock()
self.connection = connection
self.quitflag = False
self.stopflag = True
self.setDaemon(True)
def run(self):
while not self.quitflag:
time.sleep(10)
self.lock.acquire()
if self.stopflag :
self.lock.release()
continue
try:
self.connection.process_data_events()
except Exception as ex:
logging.warn("Error format: %s"%(str(ex)))
self.lock.release()
return
self.lock.release()
def startHeartbeat(self):
self.lock.acquire()
if self.quitflag==True:
self.lock.release()
return
self.stopflag=False
self.lock.release()
def callback(ch, method, properties, body):
logging.info("recv_body:%s" % body)
time.sleep(600)
ch.basic_ack(delivery_tag = method.delivery_tag)
def test_main():
s_conn = pika.BlockingConnection(
pika.ConnectionParameters('127.0.0.1',
heartbeat_interval=10,
socket_timeout=5,
credentials=pika.PlainCredentials(USER, PWD)))
chan = s_conn.channel()
chan.queue_declare(queue=TEST_QUEUE)
chan.basic_consume(callback,
queue=TEST_QUEUE)
heartbeat = Heartbeat(s_conn)
heartbeat.start() #开启心跳线程
heartbeat.startHeartbeat()
chan.start_consuming()
if __name__ == "__main__":
test_main()
尝试运行,结果还是不行,不得不安静下来思考自己是不是想错了。
去看它的api,看到heartbeat_interval的解析:
:param int heartbeat_interval: How often to send heartbeats.
Min between this value and server's proposal
will be used. Use 0 to deactivate heartbeats
and None to accept server's proposal.
按这样说法,应该还是没有把心跳值给设置好。上面的程序期望是10秒发一次心跳,但是理论上发送心跳的间隔会比10秒多一点。所以艾玛,我应该是把heartbeat_interval的作用搞错了, 它是指超过这个时间间隔不发心跳或不给server任何信息,server就会断开连接, 而不是说pika会按这个间隔来发心跳。 结果我把heartbeat_interval值设置高一点(比实际发送心跳/信息的间隔更长),比如上面设置成60秒,就正常运行了。
如果不指定heartbeat_interval, 它默认为None, 意味着按rabbitMQ server的配置来检测心跳是否正常。
如果设置heartbeat_interval=0, 意味着不检测心跳,server端将不会主动断开连接。
来源:https://blog.csdn.net/moxiaomomo/article/details/77414831
猜你喜欢
- <html><head>//搜索暂时没做,数据是出来了,但是却没法显示<link rel="styl
- Array(数组)内部机制在 Go 语言中数组是固定长度的数据类型,它包含相同类型的连续的元素,这些元素可以是内建类型,像数字和字符串,也可
- 表单的验证一直是网页设计者头痛的问题,表单验证类 Validator就是为解决这个问题而写的,旨在使设计者从纷繁复杂的表单验证中解放出来,把
- 分区表的概念 分区致力于解决支持极大表和索引的关键问题。它采用他们分解成较小和易于管理的称为分区的片(piece)的方法。一旦分区被定义,S
- 今天交流会上,分享前端的开发经验,有一条虽然很快带过,但是我倒是印象蛮深刻的,就写点小结来分享一下吧。不知道是标准害了大家还是大家害了标准,
- 今天使用shuffleNetV2+,使用自己的数据集,遇到了loss是nan的情况,而且top1精确率出现断崖式上升,这显示是不正常的。在网
- 1)用这个方法,把虚拟主机的默认页只设置成一个比如index.asp 在index.asp的最上面加入以下代码:<%if R
- 1.客户端的主页面:<%@LANGUAGE="VBSCRIPT" CODEPAGE="936"
- 利用字典dict来完成统计举例:a = [1, 2, 3, 1, 1, 2]dict = {}for key in a: dic
- 代码#!/usr/bin/env python# -*- coding: utf-8 -*-# @File : HtmlParser.py#
- 1. 用户必须在几秒钟知道网站是做什么的。注意力是因特网上最有价值的货币。 如果访问者无法在几秒钟之内得知你的网站的方向,他很有可能转而访问
- 1. logging日志的介绍在现实生活中,记录日志非常重要,比如:银行转账时会有转账记录;飞机飞行过程中,会有个黑盒子(飞行数据记录器)记
- 1.1.1 摘要 相信大家对于SQL Transcation再熟悉不过,它确保了数据库的数据一致性和安全性,尤其在对数据执行增删时,如果发生
- 编写思路:把本地文件在客户端通过base64编码以后发送目的地.测试过程中,上传文件过大,导致超时不成功.后来经过改善.把编码分段发送.测试
- 从信息组织角度来看,段落内行之间的关系要比段落之间的关系低一个级别,所以在呈现上段落之间的“段距”应该大于段落之内的“行距”,如此才能一目了
- 本文实例讲述了python获取当前计算机cpu数量的方法。分享给大家供大家参考。具体分析如下:这里实际上返回的是计算机的cpu核心数,比如c
- 你有没有觉得你的CSS样式表文件过于臃肿?其实如果你注意并培养一些比较好的CSS书写习惯,我想你的CSS样式表过于”肥胖”的问题会得到很好的
- 关于list的insert函数list#insert(ind,value)在ind元素前面插入value首先对ind进行预处理:如果ind&
- 显示图像是 Opencv最基本的操作之一, imshow()函数可以实现该操作。如果使用过其他GUI框架背景,就会很自然地调用 imshow
- 正在学习javascript 的朋友可以把它当作小练习动手做一做。加强自己的动手编码能力。参考代码:<!DOCTYPE HTML PU