pyspark操作MongoDB的方法步骤
作者:小歪的博客 发布时间:2021-05-02 11:12:12
标签:pyspark,操作,MongoDB
如何导入数据
数据可能有各种格式,虽然常见的是HDFS,但是因为在Python爬虫中数据库用的比较多的是MongoDB,所以这里会重点说说如何用spark导入MongoDB中的数据。
当然,首先你需要在自己电脑上安装spark环境,简单说下,在这里下载spark,同时需要配置好JAVA,Scala环境。
这里建议使用Jupyter notebook,会比较方便,在环境变量中这样设置
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
如果你的环境中有多个Python版本,同样可以制定你想要使用的解释器,我这里是python36,根据需求修改。
PYSPARK_PYTHON=/usr/bin/python36
pyspark对mongo数据库的基本操作 (๑• . •๑)
有几点需要注意的:
不要安装最新的pyspark版本,请安装
pip3 install pyspark==2.3.2
spark-connector
与平常的MongoDB写法不同,格式是:mongodb://127.0.0.1:database.collection
如果计算数据量比较大,你的电脑可能会比较卡,^_^
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: zhangslob
@file: spark_count.py
@time: 2019/01/03
@desc:
不要安装最新的pyspark版本
`pip3 install pyspark==2.3.2`
更多pyspark操作MongoDB请看https://docs.mongodb.com/spark-connector/master/python-api/
"""
import os
from pyspark.sql import SparkSession
# set PYSPARK_PYTHON to python36
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python36'
# load mongodb data
# 格式是:"mongodb://127.0.0.1:database.collection"
input_uri = "mongodb://127.0.0.1:27017/spark.spark_test"
output_uri = "mongodb://127.0.0.1:27017/spark.spark_test"
# 创建spark,默认使用本地环境,或者"spark://master:7077"
spark = SparkSession \
.builder \
.master("local") \
.appName("MyApp") \
.config("spark.mongodb.input.uri", input_uri) \
.config("spark.mongodb.output.uri", output_uri) \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \
.getOrCreate()
def except_id(collection_1, collection_2, output_collection, pipeline):
"""
计算表1与表2中不同的数据
:param collection_1: 导入表1
:param collection_2: 导入表2
:param output_collection: 保存的表
:param pipeline: MongoDB查询语句 str
:return:
"""
# 可以在这里指定想要导入的数据库,将会覆盖上面配置中的input_uri。下面保存数据也一样
# .option("collection", "mongodb://127.0.0.1:27017/spark.spark_test")
# .option("database", "people").option("collection", "contacts")
df_1 = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("collection", collection_1) \
.option("pipeline", pipeline).load()
df_2 = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("collection", collection_2) \
.option("pipeline", pipeline).load()
# df_1有但是不在 df_2,同理可以计算df_2有,df_1没有
df = df_1.subtract(df_2)
df.show()
# mode 参数可选范围
# * `append`: Append contents of this :class:`DataFrame` to existing data.
# * `overwrite`: Overwrite existing data.
# * `error` or `errorifexists`: Throw an exception if data already exists.
# * `ignore`: Silently ignore this operation if data already exists.
df.write.format("com.mongodb.spark.sql.DefaultSource").option("collection", output_collection).mode("append").save()
spark.stop()
if __name__ == '__main__':
# mongodb query, MongoDB查询语句,可以减少导入数据量
pipeline = "[{'$project': {'uid': 1, '_id': 0}}]"
collection_1 = "spark_1"
collection_2 = "spark_2"
output_collection = 'diff_uid'
except_id(collection_1, collection_2, output_collection, pipeline)
print('success')
完整代码地址: spark_count_diff_uid.py
来源:https://zhangslob.github.io/2019/01/03/pyspark操作MongoDB/


猜你喜欢
- 本文实例讲述了Python专用方法与迭代机制,分享给大家供大家参考之用。具体分析如下:众所周知,Python 设计哲学是“优雅”、“明确”、
- 一、简介简单记录一下存储过程的使用。存储过程是预编译SQL语句集合,也可以包含一些逻辑语句,而且当第一次调用存储过程时,被调用的存储过程会放
- HTTP上传的文件的原理HTTP协议的文件上传是通过HTTP POST请求实现的,使用multipart/form-data格式将待上传的文
- ISSET();——适合于检测是否存在这个参数。 定义和作用范围:用于测试一个变量是否具有值(包括0,FALSE,或者一个空字串,但不能是N
- 本章我们来实现一下折线图,有了画柱状图的经验,我们可以快速的分析出柱状图和折线图的区别主要是x轴比例尺和绘制数据图形,其余的画布,坐标轴等,
- Python的Collections模块提供了不少好用的数据容器类型,其中一个精品当属namedtuple。namedtuple能够用来创建
- 本文实例讲述了Python注释、分支结构、循环结构、伪“选择结构”用法。分享给大家供大家参考,具体如下:注释:python使用#作为行注释符
- 古巴比伦王颁布了汉摩拉比法典,刻在黑色的玄武岩,距今已经三千七百多年,你在橱窗前…熟悉吧?没错,这就是周董的爱在西元前歌词。前不久工作不是很
- 这篇文章主要介绍Python的numpy库中的一些函数,做备份,以便查找。(1)将矩阵转换为列表的函数:numpy.matrix.tolis
- 简介:格式:map(function,iterable,……)参数说明:function:是表示
- 语法使用 CAST:CAST ( expression AS data_type )使用 CONVERT:CONVERT (data_typ
- 最近写了一个网络验证登录的爬虫,需要发布为Rest服务,然后发现Flask是一个很好的Web框架,使用Python语言实现。1. 安装fla
- 可以使用Python的email模块来实现带有附件的邮件的发送。SMTP (Simple Mail Transfer Protocol)邮件
- 本文实例为大家分享了php实现ajax图片上传的具体代码,供大家参考,具体内容如下html页面代码<!DOCTYPE html>
- 如何保持数据库中原有格式不变:这些问题在论坛里面几乎天天有人问~!其实当在输入信息,然后提交信息的时候,所有内容的格式是没有变的。只是在当提
- <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN&
- 使用场景公司内部使用Django作为后端服务框架的Web服务,当需要使用公司内部搭建的Ldap 或者 Windows 的AD服务器作为Web
- 本文实例为大家分享了python获取地震信息微信实时推送的具体代码,供大家参考,具体内容如下import requests,timefrom
- 如下所示:#-*- coding:utf-8 -*-import numpy as npimport matplotlib.pyplot a
- 本文实例讲述了php tpl模板引擎定义与使用。分享给大家供大家参考,具体如下:tpl.php<?phpnamespace tpl;/