python每5分钟从kafka中提取数据的例子
作者:sxf_0123 发布时间:2022-05-15 16:35:52
标签:python,kafka,提取数据
我就废话不多说了,直接上代码吧!
import sys
sys.path.append("..")
from datetime import datetime
from utils.kafka2file import KafkaDownloader
import os
"""
实现取kafka数据,文件按照取数据的间隔命名
如每5分钟从kafka取数据写入文件中,文件名为当前时间加5
"""
TOPIC = "rtz_queue"
HOSTS = "ip:9092,ip:9092"
GROUP = "2001"
def get_end_time(hour,minute,time_step):
if (minute+time_step)%60<60:
if (minute+time_step)%60<10:
return str(hour+int((minute+time_step)/60))+":"+"0"+str((minute+time_step)%60)
else:
return str(hour+int((minute+time_step)/60))+":"+str((minute+time_step)%60)
else:
pass
def kafkawritefile(time_step,time_num):
start = datetime.now()
downloader = KafkaDownloader(HOSTS, TOPIC, GROUP)
i = 1
while(i<=time_num):
end_time = get_end_time(start.hour, start.minute,i*time_step)
end_time_file = end_time.replace(':', '_')
outfile_path = "/data/tmp/" + end_time_file + ".csv"
if os.path.exists(outfile_path):
os.remove(outfile_path)
writefile = open(outfile_path, 'a+', encoding='utf-8')
for msg in downloader.message():
curr_time = datetime.now()
curr_time = str(curr_time)
split_curr_time = curr_time.split(' ')
curr_time_str = split_curr_time[1][0:5]
if curr_time_str >= str(end_time):
break
i += 1
if __name__=='__main__':
time_step = 15
time_num = 1
kafkawritefile(time_step,time_num)
来源:https://blog.csdn.net/sxf_123456/article/details/80297959


猜你喜欢
- 一、安装第三方模块首先要下载名为"pymssql"的模块,然后import该模块安装方法 :1.第一种方法:按win+r
- 查看数据库状态:service mysqld status 启动数据库:service mysqld start&
- 1 Support Vector Machines1.1 Example Dataset 1%matplotlib inlineimport
- 记录一些select的技巧: 1、select语句可以用回车分隔 $sql="select * from&nb
- 前言:《flappy bird》是一款由来自越南的独立游戏开发者Dong Nguyen所开发的作品,游戏于2013年5月24日上线,并在20
- 目录一、前言二、基础命令三、正常显示中文:四、设置图样或子图五、设置x轴或y轴相关属性:六、设置标题:七、设置图例:八、进行标注:九、保存图
- 1. 新建项目在命令行窗口下输入scrapy startproject scrapytest, 如下然后就自动创建了相应的文件,如下2. 修
- 效果图如下所示:废话不多说了,直接给大家贴js代码了.<!DOCTYPE html><html lang="en
- 看代码吧~import torchimport numpy as npfrom torchvision.transforms import
- 这里分享一些轨迹聚类的基本方法,涉及轨迹距离的定义、kmeans聚类应用。需要使用的python库如下import pandas as pd
- 装饰器通用模型def wrapper(fn): def inner(*args, **kwargs):  
- 1、环境搭建需安装:python3.6Pycharm专业版django 2.2.6mysqlclientpip install django
- 菜鸟一枚,写着试了试,虽说有点杂乱,但还是能用,我是在linux下运行的大致说下过程:1、把需要ping的网段中所有ip存到数组中(我是放到
- 本教程将分步讲解如何使用JQuery和CSS打造一个炫酷动感菜单。jQuery的"write less, do more"
- icon可以用多个软件制作,也可以通过一些网站把普通图片转换为.ico文件,但通常存在的问题是图片本该透明的地方经转换后变为了黑色或者白色,
- 前言在Windows上编写python程序时,有时候需要对输出的文字颜色进行设置,特别是日志显示,不同级别的日志设置不同的颜色进行展示可以直
- Django框架中的URL分发采用正则表达式匹配来进行,以下是正则表达式的基本规则:官方演示代码:from django.conf.urls
- 1. 读取CSV文件 csv.reader()该方法的作用相当于就是通过 ',' 分割csv格式的数据,并将分割好的每行数据
- 近来有客户要求用table显示一大串数据,由于滚动后就看不到表头,很不方便,所以想到这个效果。上次做table排序对table有了一些了解,
- 一、简介从Python2.6开始,新增了str.format(),它增强了字符串格式化的功能。基本语法是通过 {} 和 : 来代替以前的 %