python每5分钟从kafka中提取数据的例子
作者:sxf_0123 发布时间:2022-05-15 16:35:52
标签:python,kafka,提取数据
我就废话不多说了,直接上代码吧!
import sys
sys.path.append("..")
from datetime import datetime
from utils.kafka2file import KafkaDownloader
import os
"""
实现取kafka数据,文件按照取数据的间隔命名
如每5分钟从kafka取数据写入文件中,文件名为当前时间加5
"""
TOPIC = "rtz_queue"
HOSTS = "ip:9092,ip:9092"
GROUP = "2001"
def get_end_time(hour,minute,time_step):
if (minute+time_step)%60<60:
if (minute+time_step)%60<10:
return str(hour+int((minute+time_step)/60))+":"+"0"+str((minute+time_step)%60)
else:
return str(hour+int((minute+time_step)/60))+":"+str((minute+time_step)%60)
else:
pass
def kafkawritefile(time_step,time_num):
start = datetime.now()
downloader = KafkaDownloader(HOSTS, TOPIC, GROUP)
i = 1
while(i<=time_num):
end_time = get_end_time(start.hour, start.minute,i*time_step)
end_time_file = end_time.replace(':', '_')
outfile_path = "/data/tmp/" + end_time_file + ".csv"
if os.path.exists(outfile_path):
os.remove(outfile_path)
writefile = open(outfile_path, 'a+', encoding='utf-8')
for msg in downloader.message():
curr_time = datetime.now()
curr_time = str(curr_time)
split_curr_time = curr_time.split(' ')
curr_time_str = split_curr_time[1][0:5]
if curr_time_str >= str(end_time):
break
i += 1
if __name__=='__main__':
time_step = 15
time_num = 1
kafkawritefile(time_step,time_num)
来源:https://blog.csdn.net/sxf_123456/article/details/80297959
0
投稿
猜你喜欢
- 问题怎样捕获代码中的所有异常?解决方案想要捕获所有的异常,可以直接捕获 Exception 即可:try: ...except
- async官方DOC介绍node安装npm install async --save使用var async = require('a
- 本文实例讲述了Python yield的用法。分享给大家供大家参考,具体如下:yield的英文单词意思是生产,刚接触Python的时候感到非
- 论坛有人问起如何获取读取CSS属性值,就写了下面这段兼容各浏览器的获取HTML元素的css属性值函数:function getSt
- CNN(Convolutional Neural Networks) 卷积神经网络简单讲就是把一个图片的数据传递给CNN,原涂层是由RGB组
- 美餐每天发一个用Excel汇总的就餐数据,我们把它导入到数据库后,行政办公服务用它和公司内的就餐数据进行比对查重。初始实现是单线程,和imp
- 以下是Django框架获取各种form表单数据的方法Django中获取text,password名字:<input type
- 滚动回归所谓滚动回归,通常用在时间序列上。记当前时刻为 t,回归时长为 s,则一直使用 当作自变量来预测 。使用滚动回归的目的通常是为了避免
- keras 中fit(self, x=None, y=None, batch_size=None, epochs=1, verbose=1,
- 描述print函数是Python的内置函数,它会将对象的__repr__特殊函数返回的字符串打印输出。默认情况下,print函数调用底层的s
- 关于跨域这个话题,很早就答应过要分享,但是因为懒,一直拖着,直到D2上有人谈起了“完美跨域”。“跨域”应该已经算不上什么难题了,只是提起“完
- 目录简介图形加载和说明图形的灰度灰度图像的压缩原始图像的压缩总结简介本文将会以图表的形式为大家讲解怎么在NumPy中进行多维数据的线性代数运
- 对于网站设计者而言,时常处理大批量的文件是难免的,特别是图片和一些文本文本文件,更是经常处理。而由于网站大量文件的关系,对于同类
- 前言在字典中查找某一个值的时候,若key不存在就会返回一个keyerror错误而不是一个默认值,如果想要返回一个默认值可以使用default
- 本篇没有考虑异步,多线程及SQL注入WebDatabase 规范中说这份规范不再维护了,原因是同质化(几乎实现者都选择了Sqlite),且不
- 场景对分页来说,我们最感兴趣的是下面几个信息总共有多少页当前是第几页是否可以上一页和下一页代码下面代码演示如何获取分页总数及当前页数、跳转到
- 这两天在用python的bottle框架开发后台管理系统,接口约定使用RESTful风格请求,前端使用jquery ajax与接口进行交互,
- QSS介绍前言QSS即Qt样式表,是用来自定义控件外观的一种机制,QSS大量参考了Css的内容,但QSS的功能要比Css弱得多,体现在选择器
- 本文为大家分享了python实现大音频文件语音识别功能的具体代码,供大家参考,具体内容如下实现思路:先用ffmpeg将其他非wav格式的音频
- 本文实例讲述了python通过get,post方式发送http请求和接收http响应的方法。分享给大家供大家参考。具体如下:测试用CGI,名