windowns使用PySpark环境配置和基本操作
作者:Nick_Spider 发布时间:2021-04-12 06:43:06
下载依赖
首先需要下载hadoop和spark,解压,然后设置环境变量。
hadoop清华源下载
spark清华源下载
HADOOP_HOME => /path/hadoop
SPARK_HOME => /path/spark
安装pyspark。
pip install pyspark
基本使用
可以在shell终端,输入pyspark,有如下回显:
输入以下指令进行测试,并创建SparkContext,SparkContext是任何spark功能的入口点。
>>> from pyspark import SparkContext
>>> sc = SparkContext("local", "First App")
如果以上不会报错,恭喜可以开始使用pyspark编写代码了。
不过,我这里使用IDE来编写代码,首先我们先在终端执行以下代码关闭SparkContext。
>>> sc.stop()
下面使用pycharm编写代码,如果修改了环境变量需要先重启pycharm。
在pycharm运行如下程序,程序会起本地模式的spark计算引擎,通过spark统计abc.txt文件中a和b出现行的数量,文件路径需要自己指定。
from pyspark import SparkContext
sc = SparkContext("local", "First App")
logFile = "abc.txt"
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Line with a:%i,line with b:%i" % (numAs, numBs))
运行结果如下:
20/03/11 16:15:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/11 16:15:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Line with a:3,line with b:1
这里说一下,同样的工作使用python可以做,spark也可以做,使用spark主要是为了高效的进行分布式计算。
戳pyspark教程
戳spark教程
RDD
RDD代表Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素,RDD是spark计算的操作对象。
一般,我们先使用数据创建RDD,然后对RDD进行操作。
对RDD操作有两种方法:
Transformation(转换) - 这些操作应用于RDD以创建新的RDD。例如filter,groupBy和map。
Action(操作) - 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序,例如count,collect等。
创建RDD
parallelize是从列表创建RDD,先看一个例子:
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"
])
print(words)
结果中我们得到一个对象,就是我们列表数据的RDD对象,spark之后可以对他进行操作。
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
Count
count方法返回RDD中的元素个数。
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"
])
print(words)
counts = words.count()
print("Number of elements in RDD -> %i" % counts)
返回结果:
Number of elements in RDD -> 8
Collect
collect返回RDD中的所有元素。
from pyspark import SparkContext
sc = SparkContext("local", "collect app")
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"
])
coll = words.collect()
print("Elements in RDD -> %s" % coll)
返回结果:
Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']
foreach
每个元素会使用foreach内的函数进行处理,但是不会返回任何对象。
下面的程序中,我们定义的一个累加器accumulator,用于储存在foreach执行过程中的值。
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
accum = sc.accumulator(0)
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
def increment_counter(x):
print(x)
accum.add(x)
return 0
s = rdd.foreach(increment_counter)
print(s) # None
print("Counter value: ", accum)
返回结果:
None
Counter value: 15
filter
返回一个包含元素的新RDD,满足过滤器的条件。
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))
Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']
也可以改写成这样:
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
def g(x):
for i in x:
if "spark" in x:
return i
words_filter = words.filter(g)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))
map
将函数应用于RDD中的每个元素并返回新的RDD。
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1, "_{}".format(x)))
mapping = words_map.collect()
print("Key value pair -> %s" % (mapping))
返回结果:
Key value pair -> [('scala', 1, '_scala'), ('java', 1, '_java'), ('hadoop', 1, '_hadoop'), ('spark', 1, '_spark'), ('akka', 1, '_akka'), ('spark vs hadoop', 1, '_spark vs hadoop'), ('pyspark', 1, '_pyspark'), ('pyspark and spark', 1, '_pyspark and spark')]
Reduce
执行指定的可交换和关联二元操作后,然后返回RDD中的元素。
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print("Adding all the elements -> %i" % (adding))
这里的add是python内置的函数,可以使用ide查看:
def add(a, b):
"Same as a + b."
return a + b
reduce会依次对元素相加,相加后的结果加上其他元素,最后返回结果(RDD中的元素)。
Adding all the elements -> 15
Join
返回RDD,包含两者同时匹配的键,键包含对应的所有元素。
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4), ("python", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
print("x =>", x.collect())
print("y =>", y.collect())
joined = x.join(y)
final = joined.collect()
print( "Join RDD -> %s" % (final))
返回结果:
x => [('spark', 1), ('hadoop', 4), ('python', 4)]
y => [('spark', 2), ('hadoop', 5)]
Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]
来源:https://blog.csdn.net/weixin_39198406/article/details/104798681
猜你喜欢
- 此文章主要向大家描述的是MySQL高级查询方法之记录查询的实际操作步骤,以及对其实际操作过程中要用到的代码的详细描述,以下就是文章的主要内容
- 即使在urlencode之前str.decode(“cp936″).encode(“utf-8″)做了编码转换也是没用的。后来查询手册查到一
- 首先创建一个csv文件,创建方式为新建一个文本文档,然后将这个文本文档重命名为test.csv再用Excel打开,添加内容内容如下:先来添加
- 如下所示:def ref_txt_demo(): f = open('1.txt', 'r') data =
- 前言都说抖音有毒,一刷就停不下来了。看来抖音这款产品紧紧抓住了人们内心深处的某些需求。当然今天不是来探讨抖音这款产品的啊。今天我们来学习如何
- 本文实例讲述了Python tkinter事件高级用法。分享给大家供大家参考,具体如下:先来看看运行效果:完整实例代码:# -*- codi
- 背景:作为一个python小白,今天从菜鸟教程上看了一些python的教程,看到了python的一些语法,对比起来(有其他语言功底),感觉还
- 如何生成任意n阶的三对角矩阵数学作业要求实现共轭梯度法的算法。题目中的矩阵A是n=400/500/600的三对角矩阵。在网上查阅资料未果后,
- 一、基本概述目前电脑上已经下载了MongoDB数据库、navicat for mongodb作为mongoDB的可视化工具,形如navica
- 本文实例为大家分享了python七夕浪漫表白的具体代码,供大家参考,具体内容如下from turtle import *from time
- 前言文接上回,我们已经使用gojs实现了一个最最最基本的树形布局。这次我们开始对图形的骨架进行一个内容展示上的丰富和显示风格上的美化。可以说
- PDO::getAttributePDO::getAttribute — 取回一个数据库连接的属性(PHP 5 >= 5.1.0, P
- 问题描述当前使用的PyCharm社区版版本号2022.1.2,配置镜像源时,没有manage repositories解决方案:镜像源:清华
- 在业务稳定性要求比较高的情况下,运维为能及时发现问题,有时需要对应用程序的日志进行实时分析,当符合某个条件时就立刻报警,而不是被动等待出问题
- 安装Apache1.安装yum -y install httpd2.开启apache服务systemctl start httpd.serv
- 本文为大家分享了Python2.7与Python3.6环境切换的具体方法,供大家参考,具体内容如下系统支持为:Ubuntu18.04系统默认
- 本文为大家分享了opencv基于Haar人脸检测和眼睛检测的具体代码,供大家参考,具体内容如下在这里,我们将进行人脸检测。最初,该算法需要大
- 背景 background css 说明 background-image:url(&q
- 我的同事Fara给大家介绍了戴尔网站首页的改版设计,这里我还想和大家介绍一下戴尔是如何从网站用户使用体验的角度进行设计,让大家进一步了解戴尔
- 可以在Mac OS X 10.2.x(“Jaguar”)和以上版本上Mac OS X使用二进制安装软