Python如何把Spark数据写入ElasticSearch
作者:阿布gogo 发布时间:2021-06-28 20:07:46
这里以将Apache的日志写入到ElasticSearch为例,来演示一下如何使用Python将Spark数据导入到ES中。
实际工作中,由于数据与使用框架或技术的复杂性,数据的写入变得比较复杂,在这里我们简单演示一下。
如果使用Scala或Java的话,Spark提供自带了支持写入ES的支持库,但Python不支持。所以首先你需要去这里下载依赖的ES官方开发的依赖包包。
下载完成后,放在本地目录,以下面命令方式启动pyspark:
pyspark --jars elasticsearch-hadoop-6.4.1.jar
如果你想pyspark使用Python3,请设置环境变量:
export PYSPARK_PYTHON=/usr/bin/python3
理解如何写入ES的关键是要明白,ES是一个JSON格式的数据库,它有一个必须的要求。数据格式必须采用以下格式
{ "id: { the rest of your json}}
往下会展示如何转换成这种格式。
解析Apache日志文件
我们将Apache的日志文件读入,构建Spark RDD。然后我们写一个parse()函数用正则表达式处理每条日志,提取我们需要的字
rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
p=re.compile(regex)
def parse(str):
s=p.match(str)
d = {}
d['ip']=s.group(1)
d['date']=s.group(4)
d['operation']=s.group(5)
d['uri']=s.group(6)
return d
换句话说,我们刚开始从日志文件读入RDD的数据类似如下:
['83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"']
然后我们使用map函数转换每条记录:
rdd2 = rdd.map(parse)
rdd2.take(1)
[{'date': '17/May/2015:10:05:03 +0000', 'ip': '83.149.9.216', 'operation': 'GET', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}]
现在看起来像JSON,但并不是JSON字符串,我们需要使用json.dumps将dict对象转换。
我们同时增加一个doc_id字段作为整个JSON的ID。在配置ES中我们增加如下配置“es.mapping.id”: “doc_id”告诉ES我们将这个字段作为ID。
这里我们使用SHA算法,将这个JSON字符串作为参数,得到一个唯一ID。
计算结果类似如下,可以看到ID是一个很长的SHA数值。
rdd3.take(1)
[('a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', '{"date": "17/May/2015:10:05:03 +0000", "ip": "83.149.9.216", "operation": "GET", "doc_id": "a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c", "uri": "/presentations/logstash-monitorama-2013/images/kibana-search.png"}')]
现在我们需要制定ES配置,比较重要的两项是:
“es.resource” : ‘walker/apache': "walker"是索引,apache是类型,两者一般合称索引
“es.mapping.id”: “doc_id”: 告诉ES那个字段作为整个文档的ID,也就是查询结果中的_id
其他的配置自己去探索。
然后我们使用saveAsNewAPIHadoopFile()将RDD写入到ES。这部分代码对于所有的ES都是一样的,比较固定,不需要理解每一个细节
es_write_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : 'walker/apache',
"es.input.json": "yes",
"es.mapping.id": "doc_id"
}
rdd3.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
rdd3 = rdd2.map(addID)
def addId(data):
j=json.dumps(data).encode('ascii', 'ignore')
data['doc_id'] = hashlib.sha224(j).hexdigest()
return (data['doc_id'], json.dumps(data))
最后我们可以使用curl进行查询
curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*
{
"_index" : "walker",
"_type" : "apache",
"_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
"_score" : 1.0,
"_source" : {
"date" : "17/May/2015:10:05:32 +0000",
"ip" : "91.177.205.119",
"operation" : "GET",
"doc_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
"uri" : "/favicon.ico"
}
如下是所有代码:
import json
import hashlib
import re
def addId(data):
j=json.dumps(data).encode('ascii', 'ignore')
data['doc_id'] = hashlib.sha224(j).hexdigest()
return (data['doc_id'], json.dumps(data))
def parse(str):
s=p.match(str)
d = {}
d['ip']=s.group(1)
d['date']=s.group(4)
d['operation']=s.group(5)
d['uri']=s.group(6)
return d
regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
p=re.compile(regex)
rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
rdd2 = rdd.map(parse)
rdd3 = rdd2.map(addID)
es_write_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : 'walker/apache',
"es.input.json": "yes",
"es.mapping.id": "doc_id"
}
rdd3.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
也可以这么封装,其实原理是一样的
import hashlib
import json
from pyspark import Sparkcontext
def make_md5(line):
md5_obj=hashlib.md5()
md5_obj.encode(line)
return md5_obj.hexdigest()
def parse(line):
dic={}
l = line.split('\t')
doc_id=make_md5(line)
dic['name']=l[1]
dic['age'] =l[2]
dic['doc_id']=doc_id
return dic #记得这边返回的是字典类型的,在写入es之前要记得dumps
def saveData2es(pdd, es_host, port,index, index_type, key):
"""
把saprk的运行结果写入es
:param pdd: 一个rdd类型的数据
:param es_host: 要写es的ip
:param index: 要写入数据的索引
:param index_type: 索引的类型
:param key: 指定文档的id,就是要以文档的那个字段作为_id
:return:
"""
#实例es客户端记得单例模式
if es.exist.index(index):
es.index.create(index, 'spo')
es_write_conf = {
"es.nodes": es_host,
"es.port": port,
"es.resource": index/index_type,
"es.input.json": "yes",
"es.mapping.id": key
}
(pdd.map(lambda _dic: ('', json.dumps(_dic)))) #这百年是为把这个数据构造成元组格式,如果传进来的_dic是字典则需要jdumps,如果传进来之前就已经dumps,这便就不需要dumps了
.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
)
if __name__ == '__main__':
#实例化sp对象
sc=Sparkcontext()
#文件中的呢内容一行一行用sc的读取出来
json_text=sc.textFile('./1.txt')
#进行转换
json_data=json_text.map(lambda line:parse(line))
saveData2es(json_data,'127.0.01','9200','index_test','index_type','doc_id')
sc.stop()
看到了把,面那个例子在写入es之前加了一个id,返回一个元组格式的,现在这个封装指定_id就会比较灵活了
来源:https://www.cnblogs.com/tjp40922/p/12716870.html


猜你喜欢
- 相信大家都做过九宫格的游戏,规则是要求填数字1-9在九个方格内,使横竖斜相加和相等。只填九个那可能有些简单,但是填25个,填49个,81个等
- 本文实例为大家分享了python实现KFC点餐收银系统的具体代码,供大家参考,具体内容如下这个kfc收银系统我实现了的以下功能:1.正常餐品
- 序列解包(Sequence Unpacking)是Python中非常重要和常用的一个功能,可以使用非常简洁的形式完成复杂的功能,大幅度提高了
- 什么是爬虫爬虫,即网络爬虫,大家可以理解为在网络上爬行的一直蜘蛛,互联网就比作一张大网,而爬虫便是在这张网上爬来爬去的蜘蛛咯,如果它遇到资源
- 1 事务的使用1.1 事务概念事务就是一组DML语句组成,这些语句在逻辑上存在相关性,这一组DML语句要么全部成功,要么全部失败,是一个整体
- 何为共线性:共线性问题指的是输入的自变量之间存在较高的线性相关度。共线性问题会导致回归模型的稳定性和准确性大大降低,另外,过多无关的维度计算
- 一、安装被认为是python官方图像处理库PIL非常适合于图像归档以及图像的批处理任务。可以使用PIL创建缩略图,转换图像格式,打印图像等等
- 报错信息最近闲来无事,用python的tkinter库开发了一款带日程提醒的万年历桌面程序。在程序开发结束开始打包时,却发现一直报错PyIn
- Python 内置的 zipfile 模块可以对文件(夹)进行ZIP格式的压缩和读取操作。要进行相关操作,首先需要实例化一个 ZipFile
- 目录01 安装02 剪辑01 安装对视频进行批量剪辑,需要三个库,分别是Moviepy库和Pathlib库,还有Tkinter库。首先我们对
- 很多时候,我们需要实时的绘制曲线,如实时的绘制串口接收到的数据。最先想到的解决策略是类似于Matlab种的drawnow函数。在python
- 序言本文所提及的VTD-XML并非本文作者原创,作者只是对它进行介绍。问题通常当我们提起XML的使用时,最头痛的部分便是XML的verbos
- 微信小程序之自定义底部弹出框动画,供大家参考,具体内容如下最近做小程序时,会经常用到各种弹框。直接做显示和隐藏虽然也能达到效果,但是体验性太
- 一、图像噪声图像噪声是图像在获取或者传输过程中受到随机信号干扰,妨碍人们对图像理解及分析处理的信号。很多时候将图像看作随机过程,因而描述噪声
- 一、前言在Python开发的过程中,为了实现某项功能,经常需要对某些字符串进行特殊的处理,如拼接字符串、截取字符串、格式化字符串等。下面将对
- 近日,小米正式宣布开源 SOAR。截至今日,该项目已经获得了 350 个「star」以及 44 个「fork」(GitHub项目地址:htt
- 本文实例讲述了python编程开发之textwrap文本样式处理技巧。分享给大家供大家参考,具体如下:在看python的API的时候,发现p
- 本文实例讲述了python通过pil模块获得图片exif信息的方法。分享给大家供大家参考。具体分析如下:python的pil模块功能超级强大
- 项目我采用了三层结构,我把LINQ的映射文件放到了DAL这个层中,映射文件自动由VS2008生成,在原来的机器上一直正常,一点问题都没有,当
- 一、python判断文件和文件夹是否存在、创建文件夹 >>> import os>>> os.