利用pandas进行大文件计数处理的方法
作者:CaspianR 发布时间:2021-05-20 02:36:55
Pandas读取大文件
要处理的是由探测器读出的脉冲信号,一组数据为两列,一列为时间,一列为脉冲能量,数据量在千万级,为了有一个直接的认识,先使用Pandas读取一些
import pandas as pd
data = pd.read_table('filename.txt', iterator=True)
chunk = data.get_chunk(5)
而输出是这样的:
Out[4]:
332.977889999979 -0.0164794921875
0 332.97790 -0.022278
1 332.97791 -0.026855
2 332.97792 -0.030518
3 332.97793 -0.045776
4 332.97794 -0.032654
DataFram基本用法
这里,data只是个容器,pandas.io.parsers.TextFileReader。
使用astype可以实现dataframe字段类型转换
输出数据中,每组数据会多处一行,因为get_chunk返回的是pandas.core.frame.DataFrame格式, 而data在读取过程中并没有指定DataFrame的columns,因此在get_chunk过程中,默认将第一组数据作为columns。因此需要在读取过程中指定names即DataFrame的columns。
import pandas as pd
data = pd.read_table('filename.txt', iterator=True, names=['time', 'energe'])
chunk = data.get_chunk(5)
data['energe'] = df['energe'].astype('int')
输出为
Out[6]:
index | time | energe |
---|---|---|
0 | 332.97789 | -0.016479 |
1 | 332.97790 | -0.022278 |
2 | 332.97791 | -0.026855 |
3 | 332.97792 | -0.030518 |
4 | 332.97793 | -0.045776 |
DataFram存储和索引
这里讲一下DataFrame这个格式,与一般二维数据不同(二维列表等),DataFrame既有行索引又有列索引,因此在建立一个DataFrame数据是
DataFrame(data, columns=[‘year', ‘month', ‘day'],
index=[‘one', ‘two', ‘three'])
year | month | day | |
---|---|---|---|
0 | 2010 | 4 | 1 |
1 | 2011 | 5 | 2 |
2 | 2012 | 6 | 3 |
3 | 2013 | 7 | 5 |
4 | 2014 | 8 | 9 |
而pd.read_table中的names就是指定DataFrame的columns,而index自动设置。 而DataFrame的索引格式有很多
类型 | 说明 | 例子 |
---|---|---|
obj[val] | 选取单列或者一组列 | |
obj.ix[val] | 选取单个行或者一组行 | |
obj.ix[:,val] | 选取单个列或列子集 | |
obj.ix[val1, val2] | 同时选取行和列 | |
reindex方法 | 将一个或多个轴匹配到新索引 | |
xs方法 | 根据标签选取单行或单列,返回一个Series | |
icol,lrow方法 | 根据整数位置选取单列或单行,返回一个Series | |
get_value,set_value | 根据行标签列标签选取单个值 |
exp: In[1]:data[:2]
Out[2]:
year | month | day | |
---|---|---|---|
0 | 2010 | 4 | 1 |
1 | 2011 | 5 | 2 |
In[2]:data[data[‘month']>5]
Out[2]:
year | month | day | |
---|---|---|---|
2 | 2012 | 6 | 3 |
4 | 2014 | 8 | 9 |
如果我们直接把data拿来比较的话,相当于data中所有的标量元素
In[3]:data[data<6]=0
Out[3]:
year | month | day | |
---|---|---|---|
0 | 2010 | 0 | 0 |
1 | 2011 | 0 | 0 |
2 | 2012 | 6 | 0 |
3 | 2013 | 7 | 0 |
4 | 2014 | 8 | 9 |
Pandas运算
series = data.ix[0]
data - series
Out:
year | month | day | |
---|---|---|---|
0 | 0 | 0 | 0 |
1 | 1 | 1 | 1 |
2 | 2 | 2 | 2 |
3 | 3 | 3 | 4 |
4 | 4 | 4 | 8 |
DataFrame与Series之间运算会将Series索引匹配到DataFrame的列,然后沿行一直向下广播
如果令series1 = data[‘year']
data.sub(series1,axis=0)
则每一列都减去该series1,axis为希望匹配的轴,=0行索引,即匹配列,=1列索引,则按行匹配。
DataFrame的一些函数方法
这个就有很多了,比如排序和排名;求和、平均数以及方差、协方差等数学方法;还有就是唯一值(类似于集合)、值计数和成员资格等方法。
当然还有一些更高级的属性,用的时候再看吧
数据处理
在得到数据样式后我们先一次性读取数据
start = time.time()
data = pd.read_table('Eu155_Na22_K40_MR_0CM_3Min.csv', names=['time', 'energe'])
end = time.time()
data.index
print("The time is %f s" % (end - start))
plus = data['energe']
plus[plus < 0] = 0
The time is 29.403917 s
RangeIndex(start=0, stop=68319232, step=1)
对于一个2G大小,千万级的数据,这个读取速度还是挺快的。之前使用matlab load用时160多s,但是不知道这个是否把数据完全读取了。然后只抽取脉冲信号,将负值归0,因为会出现一定的电子噪声从而产生一定负值。
然后就需要定位脉冲信号中的能峰了,也就是findpeaks
这里用到了scipy.signal中的find_peaks_cwt,具体用法可以参见官方文档
peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)),它返回找到的peaks的位置,输入第一个为数据,第二个为窗函数,也就是在这个宽度的能窗内寻找峰,我是这样理解的。刚开始以为是数据的另一维坐标,结果找了半天没结果。不过事实上这个找的确定也挺慢的。
50w条的数据,找了足足7分钟,我这一个数据3000w条不得找半个多小时,而各种数据有好几十,恩。。这样是不行的,于是想到了并行的方法。这个下篇文章会讲到,也就是把数据按照chunksize读取,然后同时交给(map)几个进程同时寻峰,寻完后返回(reduce)一起计数,计数的同时,子进程再此寻峰。
在处理的时候碰到我自己的破 笔记本由于内存原因不能load这个数据,并且想着每次copy这么大数据好麻烦,就把一个整体数据文件分割成了几个部分,先对方法进行一定的实验,时间快,比较方便。
import pandas as pd
def split_file(filename, size):
name = filename.split('.')[0]
data = pd.read_table(filename, chunksize=size, names=['time', 'intension'])
i = 1
for piece in data:
outname = name + str(i) + '.csv'
piece.to_csv(outname, index=False, names = ['time', 'intension'])
i += 1
def split_csvfile(filename, size):
name = filename.split('.')[0]
data = pd.read_csv(filename, chunksize=size, names=['time', 'intension'])
i = 1
for piece in data:
outname = name + str(i) + '.csv'
piece = piece['intension']
piece.to_csv(outname, index=False)
i += 1
额..使用并行寻峰通过map/reduce的思想来解决提升效率这个想法,很早就实现了,但是,由于效果不是特别理想,所以放那也就忘了,今天整理代码来看了下当时记的些笔记,然后竟然发现有个评论…..我唯一收到的评论竟然是“催稿”=。=。想一想还是把下面的工作记录下来,免得自己后来完全忘记了。
rom scipy import signal
import os
import time
import pandas as pd
import numpy as np
from multiprocessing import Pool
import matplotlib.pylab as plt
from functools import partial
def findpeak(pluse):
pluse[pluse < 0.05] = 0
print('Sub process %s.' % os.getpid())
start = time.time()
peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)) # 返回一个列表
end = time.time()
print("The time is %f s" % (end - start))
pks = [pluse[x] for x in peaks]
return pks
def histcnt(pks, edge=None, channel=None):
cnt = plt.hist(pks, edge)
res = pd.DataFrame(cnt[0], index=channel, columns=['cnt'])
return res
if __name__ == '__main__':
with Pool(processes=8) as p:
start = time.time()
print('Parent process %s.' % os.getpid())
pluse = pd.read_csv('data/samples.csv', chunksize=50000, names=['time', 'energe'])
channel = pd.read_csv('data/channels.txt', names=['value'])
edges = channel * 2
edges = pd.DataFrame({'value': [0]}).append(edges, ignore_index=True)
specal = []
for data in pluse:
total = p.apply_async(findpeak, (data['energe'],),
callback=partial(histcnt, edge=edges['value'], channel=channel['value']))
specal.append(total)
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
spec = sum(specal)
plt.figure()
plt.plot(spec['cnt'])
spec.to_csv('data/spec1.csv', header=False)
print('every is OK')
end = time.time()
print("The time is %f s" % (end - start))
由于对对进程线程的编程不是很了解,其中走了很多弯路,尝试了很多方法也,这个是最终效果相对较好的。
首先,通过 pd.readtable以chunksize=50000分块读取,edges为hist过程中的下统计box。
然后,apply_async为非阻塞调用findpeak,然后将结果返回给回调函数histcnt,但是由于回调函数除了进程返回结果还有额外的参数,因此使用partial,对特定的参数赋予固定的值(edge和channel)并返回了一个全新的可调用对象,这个新的可调用对象仍然需要通过制定那些未被赋值的参数(findpeak返回的值)来调用。这个新的课调用对象将传递给partial()的固定参数结合起来,同一将所有参数传递给原始函数(histcnt)。(至于为啥不在histcnt中确定那两个参数,主要是为了避免一直打开文件。。当然,有更好的办法只是懒得思考=。=),还有个原因就是,apply_async返回的是一个对象,需要通过该对象的get方法才能获取值。。
对于 apply_async官方上是这样解释的
Apply_async((func[, args[, kwds[, callback[, error_callback]]]])),apply()方法的一个变体,返回一个结果对象
如果指定回调,那么它应该是一个可调用的接受一个参数。结果准备好回调时,除非调用失败,在这种情况下,应用error_callback代替。
如果error_callback被指定,那么它应该是一个可调用的接受一个参数。如果目标函数失败,那么error_callback叫做除了实例。
回调应立即完成以来,否则线程处理结果将被封锁。
不使用回调函数的版本如下,即先将所有子进程得到的数据都存入peaks列表中,然后所有进程完毕后在进行统计计数。
import pandas as pd
import time
import scipy.signal as signal
import numpy as np
from multiprocessing import Pool
import os
import matplotlib.pyplot as plt
def findpeak(pluse):
pluse[pluse < 0] = 0
pluse[pluse > 100] = 0
print('Sub process %s.' % os.getpid())
start = time.time()
peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10))
end = time.time()
print("The time is %f s" % (end - start))
res = [pluse[x] for x in peaks]
return res
if __name__ == '__main__':
with Pool(processes=8) as p:
start = time.time()
print('Parent process %s.' % os.getpid())
pluse = pd.read_csv('data/sample.csv', chunksize=200000, names=['time', 'energe'])
pks = []
for data in pluse:
pks.append(p.apply_async(findpeak, (data['energe'],)))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
peaks = []
for i, ele in enumerate(pks):
peaks.extend(ele.get())
peaks = pd.DataFrame(peaks, columns=['energe'])
peaks.to_csv('peaks.csv', index=False, header=False, chunksize=50000)
channel = pd.read_csv('data/channels.txt', names=['value'])
channel *= 2
channel = pd.DataFrame({'value': [0]}).append(channel, ignore_index=True)
plt.figure()
spec = plt.hist(peaks['energe'], channel['value'])
# out.plot.hist(bins=1024)
# print(out)
# cnt = peaks.value_counts(bins=1024)
# cnt.to_csv('data/cnt.csv', index=False, header=False)
print('every is OK')
end = time.time()
print("The time is %f s" % (end - start))
来源:https://blog.csdn.net/renjunsong0/article/details/53107091
猜你喜欢
- SQL Server 2005的新功能为动态管理对象,它们是在指定时间返回某个数据库实例的特殊状态信息的数据库视图或函数。这些对象允许数据库
- 如何用OdbcRegTool组件来创建一个数据源?OdbcRegTool是一个免费组件,在服务器上安装后,就可以来创建一个数据源:<h
- 一、队列基本操作from queue import Queueq = Queue(5) # 创建一个容量为5的队列。如果给一个小
- 废话不多说了,直接把我写的timeit函数分享给大家,具体内容如下:/** * Compute the delay to execute a
- 本文研究的主要是django在接受post请求时显示403forbidden时的处理方法,具体代码如下。最近在做一个项目需要用到Django
- 看了群主最后成像的图片,应该是循环了36次画方框,每次有10度的偏移。当然不能提前看答案,自己试着写代码。之前有用过海龟画图来画过五角星、奥
- 我们有时候看到一些文章因为一行的字符超长而把一些表格或DIV撑开或字符写到溢出了.如何防止表格防止撑开,表格不被内容撑开,单行字符超长div
- 本文实例讲述了python config文件的读写操作。分享给大家供大家参考,具体如下:1、设置配置文件[mysql]host = 1234
- 创建小程序全局函数1:在微信开发工具中增加一个JS文档, 放入全局全局函数代码说明1:全局函数只能放var定义的变量下,本例的var 变量为
- 1, 创建pytorch 的Tensor张量:torch.rand((3,224,224)) #创建随机值的三维张量,大小为(3,224,2
- # -*- coding: cp936 -*-import socketfrom threading import Thread,activ
- 前言最近几天,研究了一下一直很好奇的爬虫算法。这里写一下最近几天的点点心得。下面进入正文:你可能需要的工作环境:Python 3.6官网下载
- 一直以来都有这样一个困惑,那就是打开页面间的链接时是在原窗口转换还在新窗口打开呢?如果是在原窗口里转换页面的话,那我还想使用原页面的信息呢?
- Python 实现删除某路径下文件及文件夹的脚本#!/usr/bin/env pythonimport osimport shutildel
- 本文实例讲述了Python切片工具pillow用法。分享给大家供大家参考,具体如下:切片:使用切片将源图像分成许多的功能区域因为要对图片进行
- 本文实例讲述了Python装饰器。分享给大家供大家参考。具体分析如下:这是在Python学习小组上介绍的内容,现学现卖、多练习是好的学习方式
- 获取一组href元素属性的值lst = driver.find_elements_by_class_name("ib-it-tex
- 本文介绍了目前6种比较常用的进度条,让大家都能直观地看到脚本运行最新的进展情况1.普通进度条在代码迭代运行中可以自己进行统计计算,并使用格式
- 本文介绍了关于redux-saga中take使用方法详解,分享给大家,具体如下:带来一个自己研究好久的API使用方法.redux-saga中
- 下面的各种屏蔽网页鼠标或键盘的代码都是我以前收集的,挺实用的,防一般的访客还是很有用的。1.禁止鼠标选中捕捉网页文字图片等元素在<bo