python 解决动态的定义变量名,并给其赋值的方法(大数据处理)
作者:威震四海 发布时间:2021-10-09 10:44:58
最近消费kafka数据到磁盘的时候遇到了这样的问题:
需求:每天大概有1千万条数据,每条数据包含19个字段信息,需要将数据写到服务器磁盘,以第二个字段作为大类建立目录,第7个字段作为小类配合时间戳作为文件名,临时文件后缀tmp,当每个文件的写入条数(可配置,比如100条)达到要求条数时,将后缀tmp改为out。
问题:大类共有30个,小类不计其数而且未知,比如大类为A,小类为a,时间戳为20180606095835234,则A目录下的文件名为20180606095835234_a.tmp,这样一来需要在此文件写满100条时,更新时间戳生成第二个文件名,如果此时有1000个文件都在写则需要有1000个时间戳,和1000个计数器记录每个文件当前的条数,如果分别定义1000个变量显然是不划算的,
尝试:中间过程想到了动态定义变量名,即
定义第七个字段:seven = data.split('|')[7]
定义文件名:filename = time_stamp + '_' + seven+'.tmp',
定义文件计数器:seven + ‘_num' = 0
定义文件时间戳:seven + '_stamp' = time.time( )
想法其实是没问题的,但是这里用到了一个不常用的语法:用一个变量名和一个字符串拼接出来一个新的变量名,并继续赋值(不知道我的表述是否清楚),试过了用local()函数、global()函数、exec()函数都没有达到预期效果,也许是把问题想的太复杂了
解决:最后使用三个字典将这个问题完美解决,
定义一个字典用来存计数器,字典的每一个键对应一个文件名,值对应当前计数,并实时更新;
定义一个字典用来存时间戳,键对应一个文件名,值对应时间戳,达到100条就更新一次;
定义一个字典用来存大类,键对应代号,值对应分类;
局部功能代码如下:
def kafka_to_disk():
print('启动前检测上次运行时是否存在意外中断的数据文件......')
print('搜索最近一次执行脚本产生的时间目录......')
# 待处理临时文件列表
tmp_list = []
try:
for category_dir in os.listdir(local_file_path):
if len(os.listdir(local_file_path+os.sep+category_dir)) > 0:
for file in os.listdir(local_file_path+os.sep+category_dir):
if suffix in file:
tmp_list.append(local_file_path+os.sep+category_dir+os.sep+file)
# print('上次运行程序产生的临时文件有---{}'.format(tmp_list))
except Exception as e:
pass
if len(tmp_list) == 0:
print('未扫描任何残留临时文件')
else:
print('开始修复残留临时文件......')
tmp_num = 0
for tmp in tmp_list:
os.rename(tmp, tmp.split('.')[0]+'.out')
tmp_num += 1
print('本次启动共修复残留临时文件★★★★★-----{}个-----★★★★★'.format(tmp_num))
category_poor = {
'1': 'news', '2': 'weibo', '3': 'weixin', '4': 'app', '5': 'newspaper', '6': 'luntan',
'7': 'blog', '8': 'video', '9': 'shangji', '10': 'shangjia', '11': 'gtzy', '12': 'zfztb',
'13': 'gyfp', '14': 'gjz', '15': 'zfxx', '16': 'ptztb', '17': 'company', '18': 'house',
'19': 'hospital', '20': 'bank', '21': 'zone', '22': 'express', '23': 'zpgw', '24': 'zscq',
'25': 'hotel', '26': 'cpws', '27': 'gxqy', '28': 'gpjj', '29': 'dtyy', '30': 'bdbk'}
time_stamp = utils.get_time_stamp() # 初始化毫秒级时间戳 : 20180509103015125
consumer = KafkaConsumer(topic, group_id=group_id, auto_offset_reset=auto_offset_reset, bootstrap_servers=eval(bootstrap_servers))
print('连接kafka成功,数据筛选中......')
file_poor = {} # 子类池用于文件计数器
time_stamp_poor = {} # 子类时间戳池,用于触发文件切换
time_stamp = utils.get_time_stamp() # 初始化毫秒级时间戳 :20180509103015125
for message in consumer:
# 提取第8个字段自动匹配目录进行创建
if message.value.decode().split('|')[1] in category_poor:
category = category_poor[message.value.decode().split('|')[1]]
else:
print(message.value.decode())
continue
category_dir = local_file_path + os.sep + category
if not os.path.exists(category_dir):
os.makedirs(category_dir)
# 提取第2个字段,用于生成文件名
if message.value.decode().split('|')[7] in time_stamp_poor:
shot_file_name = time_stamp_poor[message.value.decode().split('|')[7]] + '_' + message.value.decode().split('|')[7]
else:
shot_file_name = time_stamp + '_' + message.value.decode().split('|')[7]
file_name = category_dir + os.sep + shot_file_name + '.tmp'
# 给每一个文件设定一个计数器
if message.value.decode().split('|')[7] not in file_poor:
file_poor[message.value.decode().split('|')[7]] = 0
with open(file_name, 'a', encoding='utf-8')as f1:
f1.write(message.value.decode())
file_poor[message.value.decode().split('|')[7]] += 1
# 触发切换文件的操作,用时间戳生成第二文件名
if file_poor[message.value.decode().split('|')[7]] == strip_number:
time_stamp_poor[message.value.decode().split('|')[7]] = utils.get_time_stamp()
file_poor[message.value.decode().split('|')[7]] = 0
来源:https://blog.csdn.net/Beyond_F4/article/details/80590082


猜你喜欢
- 本文实例讲述了PHP字典树(Trie树)定义与实现方法。分享给大家供大家参考,具体如下:Trie树的概念(百度的解释):字典树又称单词查找树
- 问题你想将几个小的字符串合并为一个大的字符串解决方案如果你想要合并的字符串是在一个序列或者 iterable 中,那么最快的方式就是使用 j
- 本文实例讲述了Python切片操作。分享给大家供大家参考,具体如下:在很多编程语言中,针对字符串提供了截取函数,其实目的就是对字符串切片。P
- 1 问题描述最近与诸位聊起,在用户体验网站产品、完成任务的过程中,页面的切换到底是新开窗口,还是当前页面跳转,哪一种是最佳的用户体验。这一讨
- blur事件在元素失去焦点时触发。在一些jquery的教程、api手册等上面对blur事件,提供了一个错误的例子,就是关于p标签失去焦点的问
- 注:本文是应Alan邀请为《CSS布局实录》写的一个web标准入门指导。书已经上市近一年了,现在摘选出来,给初学者一个参考。希望了解更多实现
- 本文实例讲述了php隐藏IP地址后两位显示为星号的方法。分享给大家供大家参考。具体实现方法如下:我们在很多的公共网站中都会有碰到显示用户的I
- 导语:除夕除夕,就是除去烦脑,迎接新的希望!在这里小编先祝大家除夕快乐,岁岁常欢笑,事事皆如意!正文:创建画布setup和draw是p5.j
- 总结调试网站获取cookies时请查看,r.header和r.request.header这两个属性,因为cookie说不准出现在他们俩谁里
- 本文实例讲述了PHP自定义函数格式化json数据的方法。分享给大家供大家参考,具体如下:<?php /**
- 在平时的需求开发中涉及到将多列值合并为一列值的操作,通过查阅相关资料特此记录以下方法,方便日后学习复盘 import pandas
- 一、问题描述使用vscode,在markdown的预览模式下无法预览网络图片二、本机环境该问题与电脑硬件以及操作系统环境无关。本机markd
- 其实也算不上教程,也就是自己没事儿的时候做点东西然后发上来大家交流交流,希望大家不吝赐教^!^因为刚看过亚东的教程和这个有点相似,所以就自己
- ---- ORACLE数据库由数据文件,控制文件和联机日志文件三种文件组成。由于磁盘空间的变化,或者基于数据库磁盘I/O性能的调
- SQLServer错误5120:先用widows用户登录附加再分离重新用sa附加就行了 不行的话,绝招:你先用SQLServer
- 我就废话不多说了,还是直接看代码吧!import numpyworld_alchol=numpy.genfromtxt("worl
- 说完了理论,我们来做点实事。这篇文章将介绍使用 Javascript 实现的动画组件。下面记录下当时编写这个组件的考虑的些问题,对技术细节感
- (一)什么是json:JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式。易于人阅读和编写。同时也
- 前言大多数人只知道github是开源社区,可以用来做项目的版本管理,但是其实他还有一些其他功能和小彩蛋。有没有和我一样不想花钱去购置服务器的
- 在使用python编程过程中,我们往往需要借助字典来提高编程效率。同时为了调试方便,我们希望将某些变量保存为中间文件。例如,在协同过滤算法中