pyspark创建DataFrame的几种方法
作者:Nick_Spider 发布时间:2023-01-09 01:30:53
目录
pyspark创建DataFrame
RDD和DataFrame
使用二元组创建DataFrame
使用键值对创建DataFrame
使用rdd创建DataFrame
基于rdd和ROW创建DataFrame
基于rdd和StructType创建DataFrame
基于pandas DataFrame创建pyspark DataFrame
创建有序的DataFrame
配置DataFrame和临时表
创建DataFrame时指定列类型
注册DataFrame为临时表
获取和修改配置
注册自定义函数
查看临时表列表
从其他数据源创建DataFrame
MySQL
pyspark创建DataFrame
为了便于操作,使用pyspark时我们通常将数据转为DataFrame的形式来完成清洗和分析动作。
RDD和DataFrame
在上一篇pyspark基本操作有提到RDD也是spark中的操作的分布式数据对象。
这里简单看一下RDD和DataFrame的类型。
print(type(rdd)) # <class 'pyspark.rdd.RDD'>
print(type(df)) # <class 'pyspark.sql.dataframe.DataFrame'>
翻阅了一下源码的定义,可以看到他们之间并没有继承关系。
class RDD(object):
"""
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
Represents an immutable, partitioned collection of elements that can be
operated on in parallel.
"""
class DataFrame(object):
"""A distributed collection of data grouped into named columns.
A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
and can be created using various functions in :class:`SparkSession`::
...
"""
RDD是一种弹性分布式数据集,Spark中的基本抽象。表示一种不可变的、分区储存的集合,可以进行并行操作。
DataFrame是一种以列对数据进行分组表达的分布式集合, DataFrame等同于Spark SQL中的关系表。相同点是,他们都是为了支持分布式计算而设计。
但是RDD只是元素的集合,但是DataFrame以列进行分组,类似于MySQL的表或pandas中的DataFrame。
实际工作中,我们用的更多的还是DataFrame。
使用二元组创建DataFrame
尝试第一种情形发现,仅仅传入二元组,结果是没有列名称的。
于是我们尝试第二种,同时传入二元组和列名称。
a = [('Alice', 1)]
output = spark.createDataFrame(a).collect()
print(output)
# [Row(_1='Alice', _2=1)]
output = spark.createDataFrame(a, ['name', 'age']).collect()
print(output)
# [Row(name='Alice', age=1)]
这里collect()是按行展示数据表,也可以使用show()对数据表进行展示。
spark.createDataFrame(a).show()
# +-----+---+
# | _1| _2|
# +-----+---+
# |Alice| 1|
# +-----+---+
spark.createDataFrame(a, ['name', 'age']).show()
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 1|
# +-----+---+
使用键值对创建DataFrame
d = [{'name': 'Alice', 'age': 1}]
output = spark.createDataFrame(d).collect()
print(output)
# [Row(age=1, name='Alice')]
使用rdd创建DataFrame
a = [('Alice', 1)]
rdd = sc.parallelize(a)
output = spark.createDataFrame(rdd).collect()
print(output)
output = spark.createDataFrame(rdd, ["name", "age"]).collect()
print(output)
# [Row(_1='Alice', _2=1)]
# [Row(name='Alice', age=1)]
基于rdd和ROW创建DataFrame
from pyspark.sql import Row
a = [('Alice', 1)]
rdd = sc.parallelize(a)
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
output = spark.createDataFrame(person).collect()
print(output)
# [Row(name='Alice', age=1)]
基于rdd和StructType创建DataFrame
from pyspark.sql.types import *
a = [('Alice', 1)]
rdd = sc.parallelize(a)
schema = StructType(
[
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
]
)
output = spark.createDataFrame(rdd, schema).collect()
print(output)
# [Row(name='Alice', age=1)]
基于pandas DataFrame创建pyspark DataFrame
df.toPandas()可以把pyspark DataFrame转换为pandas DataFrame。
df = spark.createDataFrame(rdd, ['name', 'age'])
print(df) # DataFrame[name: string, age: bigint]
print(type(df.toPandas())) # <class 'pandas.core.frame.DataFrame'>
# 传入pandas DataFrame
output = spark.createDataFrame(df.toPandas()).collect()
print(output)
# [Row(name='Alice', age=1)]
创建有序的DataFrame
output = spark.range(1, 7, 2).collect()
print(output)
# [Row(id=1), Row(id=3), Row(id=5)]
output = spark.range(3).collect()
print(output)
# [Row(id=0), Row(id=1), Row(id=2)]
通过临时表得到DataFrame
spark.registerDataFrameAsTable(df, "table1")
df2 = spark.table("table1")
b = df.collect() == df2.collect()
print(b)
# True
配置DataFrame和临时表
创建DataFrame时指定列类型
在createDataFrame中可以指定列类型,只保留满足数据类型的列,如果没有满足的列,会抛出错误。
a = [('Alice', 1)]
rdd = sc.parallelize(a)
# 指定类型于预期数据对应时,正常创建
output = spark.createDataFrame(rdd, "a: string, b: int").collect()
print(output) # [Row(a='Alice', b=1)]
rdd = rdd.map(lambda row: row[1])
print(rdd) # PythonRDD[7] at RDD at PythonRDD.scala:53
# 只有int类型对应上,过滤掉其他列。
output = spark.createDataFrame(rdd, "int").collect()
print(output) # [Row(value=1)]
# 没有列能对应上,会抛出错误。
output = spark.createDataFrame(rdd, "boolean").collect()
# TypeError: field value: BooleanType can not accept object 1 in type <class 'int'>
注册DataFrame为临时表
spark.registerDataFrameAsTable(df, "table1")
spark.dropTempTable("table1")
获取和修改配置
print(spark.getConf("spark.sql.shuffle.partitions")) # 200
print(spark.getConf("spark.sql.shuffle.partitions", u"10")) # 10
print(spark.setConf("spark.sql.shuffle.partitions", u"50")) # None
print(spark.getConf("spark.sql.shuffle.partitions", u"10")) # 50
注册自定义函数
spark.registerFunction("stringLengthString", lambda x: len(x))
output = spark.sql("SELECT stringLengthString('test')").collect()
print(output)
# [Row(stringLengthString(test)='4')]
spark.registerFunction("stringLengthString", lambda x: len(x), IntegerType())
output = spark.sql("SELECT stringLengthString('test')").collect()
print(output)
# [Row(stringLengthString(test)=4)]
spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
output = spark.sql("SELECT stringLengthInt('test')").collect()
print(output)
# [Row(stringLengthInt(test)=4)]
查看临时表列表
可以查看所有临时表名称和对象。
spark.registerDataFrameAsTable(df, "table1")
print(spark.tableNames()) # ['table1']
print(spark.tables()) # DataFrame[database: string, tableName: string, isTemporary: boolean]
print("table1" in spark.tableNames()) # True
print("table1" in spark.tableNames("default")) # True
spark.registerDataFrameAsTable(df, "table1")
df2 = spark.tables()
df2.filter("tableName = 'table1'").first()
print(df2) # DataFrame[database: string, tableName: string, isTemporary: boolean]
从其他数据源创建DataFrame
MySQL
前提是需要下载jar包。
Mysql-connector-java.jar
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
sc = SparkContext("local", appName="mysqltest")
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(
url="jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"
"useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"
"useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load()
df.show(n=5)
sc.stop()
参考
RDD和DataFrame的区别
spark官方文档 翻译 之pyspark.sql.SQLContext
来源:https://blog.csdn.net/weixin_39198406/article/details/104916715


猜你喜欢
- 前言之前写过一个关于微信授权登陆的文章传送门最近在做小程序的项目,依旧是商城,又开始研究微信的登陆授权坑,第一次接触小程序,授权登陆也是一塌
- 视频才用流媒体,有后台实时返回数据, 要支持flash播放, 所以需安装对应的flash插件。当视频播放时,每间隔3秒向后台发送请求供检测心
- 最近疫情在家,空闲时间比较多,整理下之前写的Golang项目Weave,补充了一些功能,加了前端实现。作为一个Web应用模板,也算是功能比较
- 现在看小说已经有了听书这个功能了,但是有时候你想看的书的听书功能收费,这时候可能大家就只能老老实实选择看或者付费听。(还能拿来练英语听力欸嘿
- 我们知道深度神经网络的本质是输入端数据和输出端数据的一种高维非线性拟合,如何更好的理解它,下面尝试拟合一个正弦函数,本文可以通过简单设置节点
- 1. __init__ 初始化文件路径,关键字1,关键字2;2. key_match 使用with open 方法,以二进制方式(也可以改成
- “网页设计三剑客”可能很多新同学都没听说过,因为缔造神话的公司已经快销声匿迹。“网页设计三剑客”是Macromedia公司旗下Dreamwe
- 如下: Warning at /admin/assets/add/ Incorrect string value: '\xE5\x9
- 神经网络梯度下降法在详细了解梯度下降的算法之前,我们先看看相关的一些概念。1. 步长(Learning rate):步长决定了在梯度下降迭代
- Oracle 的正规表达式的实施是以各种 SQL 函数和一个 WHERE 子句操作符的形式出现的。如果您不熟悉正规表达式,那么这篇文章可以让
- 這兩天﹐對xml作為數據庫產生了興趣﹐找了一些資料﹐也搞出了一點眉目﹐在這里記錄一下。算是對自己學習x
- cookie并不陌生,与session一样,能够让http请求前后保持状态。与session不同之处,在于cookie数据仅保存于客户端。r
- 1、文件上传(input标签) (1)html代码(form表单用post方法提交)<input class="b
- 在做维护项目的时,我们经常会遇到索引维护的问题,通过语句,我们就可以判断某个表的索引是否需要重建。 执行一下语句:先分析表的索引 分析表的索
- 二叉树中和为某一值的路径:输入一颗二叉树的跟节点和一个整数,打印出二叉树中结点值的和为输入整数的所有路径。路径定义为从树的根结点开始往下一直
- 1.在搭建网络开始时,会调用到 keras.models的Sequential()方法,返回一个model参数表示模型2.model参数里面
- #!/usr/bin/env pythonimport sockets = socket.socket(socket.AF_PACKET,
- 学习目标根据原型设计编译自动化数据生成器,熟悉wxPython的基本用法。界面原型设计界面原型设计分析输入参数:最大长度最小长度组成规则多少
- 一、题目描述求解用户登陆信息表中,每个用户连续登陆平台的天数,连续登陆基础为汇总日期必须登陆,表中每天只有一条用户登陆数据(计算中不涉及天内
- 今天安装了几个模块,在cmd测试都正常,但是在pycharm就不行,后面试了以下方法:1、2、3、选择python.exe4、然后在回到se