python消费kafka数据批量插入到es的方法
作者:亮亮爱吃虾 发布时间:2023-04-30 18:10:47
标签:python,kafka,es
1、es的批量插入
这是为了方便后期配置的更改,把配置信息放在logging.conf中
用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2
from elasticsearch import Elasticsearch
class ImportEsData:
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("msg")
def __init__(self,hosts,index,type):
self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
self.index = index
self.type = type
def set_date(self,data):
# 批量处理
# es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})
self.es.index(index=self.index,doc_type=self.index,body=data)
2、使用pykafka消费kafka
1.因为kafka是0.8,pykafka不支持zk,只能用get_simple_consumer来实现
2.为了实现多个应用同时消费而且不重消费,所以一个应用消费一个partition
3. 为是确保消费数据量在不满足10000这个批量值,能在一个时间范围内插入到es中,这里设置consumer_timeout_ms一个超时等待时间,退出等待消费阻塞。
4.退出等待消费阻塞后导致无法再消费数据,因此在获取self.consumer 的外层加入了while True 一个死循环
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pykafka import KafkaClient
import logging
import logging.config
from ConfigUtil import ConfigUtil
import datetime
class KafkaPython:
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("msg")
logger_data = logging.getLogger("data")
def __init__(self):
self.server = ConfigUtil().get("kafka","kafka_server")
self.topic = ConfigUtil().get("kafka","topic")
self.group = ConfigUtil().get("kafka","group")
self.partition_id = int(ConfigUtil().get("kafka","partition"))
self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))
self.consumer = None
self.hosts = ConfigUtil().get("es","hosts")
self.index_name = ConfigUtil().get("es","index_name")
self.type_name = ConfigUtil().get("es","type_name")
def getConnect(self):
client = KafkaClient(self.server)
topic = client.topics[self.topic]
p = topic.partitions
ps={p.get(self.partition_id)}
self.consumer = topic.get_simple_consumer(
consumer_group=self.group,
auto_commit_enable=True,
consumer_timeout_ms=self.consumer_timeout_ms,
# num_consumer_fetchers=1,
# consumer_id='test1',
partitions=ps
)
self.starttime = datetime.datetime.now()
def beginConsumer(self):
print("beginConsumer kafka-python")
imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)
#创建ACTIONS
count = 0
ACTIONS = []
while True:
endtime = datetime.datetime.now()
print (endtime - self.starttime).seconds
for message in self.consumer:
if message is not None:
try:
count = count + 1
# print(str(message.partition.id)+","+str(message.offset)+","+str(count))
# self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
action = {
"_index": self.index_name,
"_type": self.type_name,
"_source": message.value
}
ACTIONS.append(action)
if len(ACTIONS) >= 10000:
imprtEsData.set_date(ACTIONS)
ACTIONS = []
self.consumer.commit_offsets()
endtime = datetime.datetime.now()
print (endtime - self.starttime).seconds
#break
except (Exception) as e:
# self.consumer.commit_offsets()
print(e)
self.logger.error(e)
self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n")
# self.logger_data.error(message.value+"\n")
# self.consumer.commit_offsets()
if len(ACTIONS) > 0:
self.logger.info("等待时间超过,consumer_timeout_ms,把集合数据插入es")
imprtEsData.set_date(ACTIONS)
ACTIONS = []
self.consumer.commit_offsets()
def disConnect(self):
self.consumer.close()
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ImportEsData:
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("msg")
def __init__(self,hosts,index,type):
self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
self.index = index
self.type = type
def set_date(self,data):
# 批量处理
success = bulk(self.es, data, index=self.index, raise_on_error=True)
self.logger.info(success)
3、运行
if __name__ == '__main__':
kp = KafkaPython()
kp.getConnect()
kp.beginConsumer()
# kp.disConnect()
注:简单的写了一个从kafka中读取数据到一个list里,当数据达到一个阈值时,在批量插入到 es的插件
现在还在批量的压测中。。。
来源:https://blog.csdn.net/liagliang/article/details/78712475


猜你喜欢
- 故事还得从下面的图说起:what? 两条sql执行结果的id列居然不一致。。。。。。一、LIMIT 处理过程为了故事的顺利发展,我们得先创建
- 一、概述单机Mysql8数据库服务器运行过程中突然断电,导致数据库崩溃,无法重启。二、查找原因查看mysql运行错误日志:WIN-SOTMI
- 迭代器跟生成器,与上篇文章讲的装饰器一样,都是属于我的一个老大难问题。通常就是遇到的时候就去搜一下,结果在一大坨各种介绍博客中看了看,回头又
- 背景当前很多文章尝试过最优分箱,python上也有cut等方法进行等宽分箱。为了方便日后输出结果以及分箱要求。做一个简单的轮子以供大家日后使
- 我想让一片文章,每到3000字就分到下一条插入到数据库,求高手 <%Dim Content Conte
- 这次我使用ADO.NET来插入一条数据,到数据库中。主用到存储过程。我不想每次都是用SQL文本的形式了,那样始终没有进步~~~下面首先,我把
- Jaslabs的Justin Silverton列出了十条有关优化MySQL查询的语句,我不得不对此发表言论,因为这个清单非常非常糟糕。另外
- 示例可以附件中下载 1.加载xml文件 加载XML文件共有2种方法,一是加载指定字符串,二是加载指定文件 2.获取element的方法 a)
- FTP服务器FTP服务器是在互联网上提供文件存储和访问服务的计算机,它们依照FTP协议提供服务。FTP是File Transfer Prot
- 在python2.x中 ,异常是这样的处理的,异常基类后面加一个逗号“ ,” 然后跟着异常类型import tracebackt
- 前段时间看到letcode上的元音字母字符串反转的题目,今天来研究一下字符串反转的内容。主要有三种方法:1.切片法(最简洁的一种)#切片法d
- 异常处理是日常操作了,但是有时候不能只能打印我们处理的结果,还需要将我们的异常打印出来,这样更直观的显示错误下面来介绍traceback模块
- 前些天有位网友建议我在博客中添加RSS订阅功能,觉得挺好,所以自己抽空看了一下如何在Django中添加RSS功能,发现使用Django中的s
- 一、背景Python 是一门易于学习、功能强大的编程语言。它提供了高效的高级数据结构,还能简单有效地面向对象编程。Python 优雅的语法和
- 简介pandas按条件筛选数据时,除了使用query()方法,还可以使用isin和对isin取反进行条件筛选.代码 import panda
- 以下示例显示如何在 XPath 查询中指定轴。这些示例中的 XPath 查询都在 SampleSchema1.xml 中所包含的映射架构上指
- 1.requests库简介requests 是 Python 中比较常用的网页请求库,主要用来发送 HTTP 请求,在使用爬虫或测试服务器响
- 总的来说,提高应用程序性能的最好的方法是发现应用的瓶径之所在,和数据库进行交互的性能无疑是决定应用程序性能的重要环节之一。因为ADO是当前最
- 函数带括号和不带括号的区别1、不带括号时,调用的是这个函数本身 ,是整个函数体,是一个函数对象,不需等该函数执行完成2、带括号(此
- 1、fit和fit_generator的区别首先Keras中的fit()函数传入的x_train和y_train是被完整的加载进内存的,当然