python操作kafka实践的示例代码
作者:Small_office 发布时间:2021-04-19 10:57:34
1、先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='xxxx:x')
msg_dict = {
"sleep_time": 10,
"db_config": {
"database": "test_1",
"host": "xxxx",
"user": "root",
"password": "root"
},
"table": "msg",
"msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer.send('test_rhj', msg, partition=0)
producer.close()
下面是消费者的简单代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print recv
下面是结果:
2、如果想要完成负载均衡,就需要知道kafka的分区机制,同一个主题,可以为其分区,在生产者不指定分区的情况,kafka会将多个消息分发到不同的分区,消费者订阅时候如果不指定服务组,会收到所有分区的消息,如果指定了服务组,则同一服务组的消费者会消费不同的分区,如果2个分区两个消费者的消费者组消费,则,每个消费者消费一个分区,如果有三个消费者的服务组,则会出现一个消费者消费不到数据;如果想要消费同一分区,则需要用不同的服务组。以此为原理,我们对消费者做如下修改:
from kafka import KafkaConsumer
consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print recv
然后我们开两个消费者进行消费,生产者分别往0分区和1分区发消息结果如下,可以看到,一个消费者只能消费0分区,另一个只能消费1分区:
3、kafka提供了偏移量的概念,允许消费者根据偏移量消费之前遗漏的内容,这基于kafka名义上的全量存储,可以保留大量的历史数据,历史保存时间是可配置的,一般是7天,如果偏移量定位到了已删除的位置那也会有问题,但是这种情况可能很小;每个保存的数据文件都是以偏移量命名的,当前要查的偏移量减去文件名就是数据在该文件的相对位置。要指定偏移量消费数据,需要指定该消费者要消费的分区,否则代码会找不到分区而无法消费,代码如下:
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.assign([TopicPartition(topic='test_rhj', partition=0), TopicPartition(topic='test_rhj', partition=1)])
print consumer.partitions_for_topic("test_rhj") # 获取test主题的分区信息
print consumer.assignment()
print consumer.beginning_offsets(consumer.assignment())
consumer.seek(TopicPartition(topic='test_rhj', partition=0), 0)
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print recv
因为指定的便宜量为0,所以从一开始插入的数据都可以查到,而且因为指定了分区,指定的分区结果都可以消费,结果如下:
4、有时候,我们并不需要实时获取数据,因为这样可能会造成性能瓶颈,我们只需要定时去获取队列里的数据然后批量处理就可以,这种情况,我们可以选择主动拉取数据
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.subscribe(topics=('test_rhj',))
index = 0
while True:
msg = consumer.poll(timeout_ms=5) # 从kafka获取消息
print msg
time.sleep(2)
index += 1
print '--------poll index is %s----------' % index
结果如下,可以看到,每次拉取到的都是前面生产的数据,可能是多条的列表,也可能没有数据,如果没有数据,则拉取到的为空:
来源:https://www.cnblogs.com/small-office/p/9399907.html
猜你喜欢
- 内容摘要:asp使用最多的就是ACCESS数据库和ms sql server数据库,本文列出了asp连接这两个数据库的方
- 本文实例讲述了Python实现操纵控制windows注册表的方法。分享给大家供大家参考,具体如下:使用_winreg模块的话基本概念:KEY
- 本章将覆盖所有在Python中使用的基本I/O功能。有关更多函数,请参考标准Python文档。打印到屏幕上:产生输出最简单的方法
- 如下所示:import dateutildef before_month_lastday(ti): today=dateutil
- 本文实例讲述了django框架中间件原理与用法。分享给大家供大家参考,具体如下:中间件:轻量级,介于 request和response之间的
- 在做Django项目的过程中, 无法进入pycharm提供的Run manager.py Task交互环境出现这种问题是因为Pycharm无
- 1、运动速率上节中,实现了一辆汽车在马路上由下到上行驶,并使用了pygame.time.delay(200)来进行时间延迟。看了很多参考材料
- Python list列表添加元素方法实际开发中,经常需要对 Python 列表进行更新,包括向列表中添加元素、修改表中元素以及删除元素。&
- 功能描述目标完成多账号微信小程序每天自动签到输出签到成功则向微信群发送签到成功的信息否则提示用户签到失败,需手动签到包管理requestsi
- 你是一位交互设计师吗?告诉我,你具体做些什么?我是做网站设计的?听起来不够专业。我是做网页设计的,听起来……你们是做界面的……恩,好吧,我勉
- 我就废话不多说了,大家还是直接看代码吧~/** 二分幂法 求x^n */// 求整数幂package mainimport (
- 一、Tornado简介Tornado是一个使用Python编写的高效、非阻塞的web服务器软件和网络库。它被设计用于处理数以千计的并发连接,
- 前言需要注意,对实例化的文本组件的insert、delete等操作的index**都是浮点型而不是整型**,(1.0,2.0)表示的是对第一
- 1.视频分解图片我们使用cv2.VideoCapture来读取视频import cv2cap = cv2.VideoCapture('
- 1.首先自己直接在cmd中输入 pip3 install openCV是不可行的,即需要自己下载安装包本地安装2.openCV库 下载地址h
- 什么是XML?XML 指可扩展标记语言(eXtensibleMarkupLanguage)。 你可以通过本站学习XML教程XML 被设计用来
- 这篇文章主要介绍了Pandas数据离散化原理及实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋
- 如何用SQLMail建立一个电子刊物自动处理系统?下面我们用SQLMail来做一个电子刊物自动处理系统。在这个系统中,主要实现两个功能:1、
- scrapy.FormRequestlogin.pyclass LoginSpider(scrapy.Spider): name =
- python创建一个类很简单只需要定义它就可以了.class Cat: pass就像这样就可以了,通过创建子类我们可以继承他的父