python操作kafka实践的示例代码
作者:Small_office 发布时间:2021-04-19 10:57:34
1、先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='xxxx:x')
msg_dict = {
"sleep_time": 10,
"db_config": {
"database": "test_1",
"host": "xxxx",
"user": "root",
"password": "root"
},
"table": "msg",
"msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer.send('test_rhj', msg, partition=0)
producer.close()
下面是消费者的简单代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print recv
下面是结果:
2、如果想要完成负载均衡,就需要知道kafka的分区机制,同一个主题,可以为其分区,在生产者不指定分区的情况,kafka会将多个消息分发到不同的分区,消费者订阅时候如果不指定服务组,会收到所有分区的消息,如果指定了服务组,则同一服务组的消费者会消费不同的分区,如果2个分区两个消费者的消费者组消费,则,每个消费者消费一个分区,如果有三个消费者的服务组,则会出现一个消费者消费不到数据;如果想要消费同一分区,则需要用不同的服务组。以此为原理,我们对消费者做如下修改:
from kafka import KafkaConsumer
consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print recv
然后我们开两个消费者进行消费,生产者分别往0分区和1分区发消息结果如下,可以看到,一个消费者只能消费0分区,另一个只能消费1分区:
3、kafka提供了偏移量的概念,允许消费者根据偏移量消费之前遗漏的内容,这基于kafka名义上的全量存储,可以保留大量的历史数据,历史保存时间是可配置的,一般是7天,如果偏移量定位到了已删除的位置那也会有问题,但是这种情况可能很小;每个保存的数据文件都是以偏移量命名的,当前要查的偏移量减去文件名就是数据在该文件的相对位置。要指定偏移量消费数据,需要指定该消费者要消费的分区,否则代码会找不到分区而无法消费,代码如下:
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.assign([TopicPartition(topic='test_rhj', partition=0), TopicPartition(topic='test_rhj', partition=1)])
print consumer.partitions_for_topic("test_rhj") # 获取test主题的分区信息
print consumer.assignment()
print consumer.beginning_offsets(consumer.assignment())
consumer.seek(TopicPartition(topic='test_rhj', partition=0), 0)
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print recv
因为指定的便宜量为0,所以从一开始插入的数据都可以查到,而且因为指定了分区,指定的分区结果都可以消费,结果如下:
4、有时候,我们并不需要实时获取数据,因为这样可能会造成性能瓶颈,我们只需要定时去获取队列里的数据然后批量处理就可以,这种情况,我们可以选择主动拉取数据
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.subscribe(topics=('test_rhj',))
index = 0
while True:
msg = consumer.poll(timeout_ms=5) # 从kafka获取消息
print msg
time.sleep(2)
index += 1
print '--------poll index is %s----------' % index
结果如下,可以看到,每次拉取到的都是前面生产的数据,可能是多条的列表,也可能没有数据,如果没有数据,则拉取到的为空:
来源:https://www.cnblogs.com/small-office/p/9399907.html


猜你喜欢
- 近来,随着XHTML(可扩展HTML)标准的出现,<script/>标签也经历了一些改变。该标签不再用language特性,而用
- 1 将文件保存到服务器本地upload.html<!DOCTYPE html><html lang="en&qu
- 前言:func DeepEqual(x, y interface{}) boolDeepEqual 函数的参数是两个 i
- TensorFlow版本更新太快 了,所以导致一些以前接口函数不一致,会报错。这里总结了一下自己犯的错,以防以后再碰到,也可以给别人参考。首
- 我记得在刚开始接触动态HTML及JavaScript时就接触过关于鼠标右键屏蔽的脚本代码,当时这些代码很多会用在防止浏览者未经允许的复制网页
- 本文实例讲述了Joomla开启SEF的方法。分享给大家供大家参考,具体如下:使用SEF(search engine friendly)网址的
- 1、引入模块import zipfilezip文件格式是通用的文档压缩标准,在ziplib模块中,使用ZipFile类来操作zip文件,下面
- 最近在研究python调度框架APScheduler使用的路上,那么今天也算个学习笔记吧!# coding=utf-8""
- 最近在学习仿站内容,需要用到phpstudy,在下载phpstudy后想要启动MySQL服务,但是总是无法启动。主要原因是之前已经在本地安装
- 如果你细心跟踪一下SQL Server数据库服务器的登录过程,你会发现口令计算其实是非常脆弱的,SQL Server数据
- 一、ts文件的由来ts文件,ts即"Transport Stream"的缩写,特点就是要求从视频流的任一片段开始都是可以
- 本文实例介绍了python实现井字棋游戏的方法,分享给大家,具体内容如下windows7下python3.4.0编译运行通过。由于采用了cm
- 前言ThinkPHP,是为了简化企业级应用开发和敏捷WEB应用开发而诞生的开源轻量级PHP框架。随着框架代码量的增加,一些潜在的威胁也逐渐暴
- python中的数字类型工具python中为更高级的工作提供很多高级数字编程支持和对象,其中数字类型的完整工具包括:1.整数与浮点型,2.复
- 1.建立连接字符串,里面包含数据库名称、用户名和密码2.建立操作字符串,里面是对数据操作的SQL语句3.建立Connection,用连接字符
- 方法一:需要在网络条件下安装win+R进入运行框输入命令cmd点击确定进入普通下载:pip install 模块名字例如:输入pi
- 一、什么是MD5校验和?MD5,是Message Digest Algorithm 5的缩写,即消息摘要算法版本5。消息摘要算法通过对所有数
- 参数数量及其作用该函数共有两个参数,分别是key和scope。def get_collection(key, scope=None) Wra
- 在JAVA WEB应用中,如何获取servlet请求中的参数,并传递给跳转的JSP页面?例如访问http://localhost:8088/
- Oracle游标分为显示游标和隐式游标。显示游标(Explicit Cursor):在PL/SQL程序中定义的、用于查询的游标称作显示游标。