网络编程
位置:首页>> 网络编程>> Python编程>> pyspark创建DataFrame的几种方法

pyspark创建DataFrame的几种方法

作者:Nick_Spider  发布时间:2023-01-09 01:30:53 

标签:pyspark,创建,DataFrame
目录
  • pyspark创建DataFrame

    • RDD和DataFrame

    • 使用二元组创建DataFrame

    • 使用键值对创建DataFrame

    • 使用rdd创建DataFrame

    • 基于rdd和ROW创建DataFrame

    • 基于rdd和StructType创建DataFrame

    • 基于pandas DataFrame创建pyspark DataFrame

    • 创建有序的DataFrame

  • 配置DataFrame和临时表

    • 创建DataFrame时指定列类型

    • 注册DataFrame为临时表

    • 获取和修改配置

    • 注册自定义函数

    • 查看临时表列表

  • 从其他数据源创建DataFrame

    • MySQL

pyspark创建DataFrame

为了便于操作,使用pyspark时我们通常将数据转为DataFrame的形式来完成清洗和分析动作。

RDD和DataFrame

在上一篇pyspark基本操作有提到RDD也是spark中的操作的分布式数据对象。

这里简单看一下RDD和DataFrame的类型。


print(type(rdd))  # <class 'pyspark.rdd.RDD'>
print(type(df))   # <class 'pyspark.sql.dataframe.DataFrame'>

翻阅了一下源码的定义,可以看到他们之间并没有继承关系。


class RDD(object):

"""
   A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
   Represents an immutable, partitioned collection of elements that can be
   operated on in parallel.
   """


class DataFrame(object):
   """A distributed collection of data grouped into named columns.

A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
   and can be created using various functions in :class:`SparkSession`::
...
   """

RDD是一种弹性分布式数据集,Spark中的基本抽象。表示一种不可变的、分区储存的集合,可以进行并行操作。
DataFrame是一种以列对数据进行分组表达的分布式集合, DataFrame等同于Spark SQL中的关系表。相同点是,他们都是为了支持分布式计算而设计。

但是RDD只是元素的集合,但是DataFrame以列进行分组,类似于MySQL的表或pandas中的DataFrame。

pyspark创建DataFrame的几种方法

实际工作中,我们用的更多的还是DataFrame。

使用二元组创建DataFrame

尝试第一种情形发现,仅仅传入二元组,结果是没有列名称的。
于是我们尝试第二种,同时传入二元组和列名称。


a = [('Alice', 1)]
output = spark.createDataFrame(a).collect()
print(output)
# [Row(_1='Alice', _2=1)]

output = spark.createDataFrame(a, ['name', 'age']).collect()
print(output)
# [Row(name='Alice', age=1)]

这里collect()是按行展示数据表,也可以使用show()对数据表进行展示。


spark.createDataFrame(a).show()
# +-----+---+
# |   _1| _2|
# +-----+---+
# |Alice|  1|
# +-----+---+

spark.createDataFrame(a, ['name', 'age']).show()
# +-----+---+
# | name|age|
# +-----+---+
# |Alice|  1|
# +-----+---+

使用键值对创建DataFrame


d = [{'name': 'Alice', 'age': 1}]
output = spark.createDataFrame(d).collect()
print(output)

# [Row(age=1, name='Alice')]

使用rdd创建DataFrame


a = [('Alice', 1)]
rdd = sc.parallelize(a)
output = spark.createDataFrame(rdd).collect()
print(output)
output = spark.createDataFrame(rdd, ["name", "age"]).collect()
print(output)

# [Row(_1='Alice', _2=1)]
# [Row(name='Alice', age=1)]

基于rdd和ROW创建DataFrame


from pyspark.sql import Row

a = [('Alice', 1)]
rdd = sc.parallelize(a)
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
output = spark.createDataFrame(person).collect()
print(output)

# [Row(name='Alice', age=1)]

基于rdd和StructType创建DataFrame


from pyspark.sql.types import *

a = [('Alice', 1)]
rdd = sc.parallelize(a)
schema = StructType(
   [
       StructField("name", StringType(), True),
       StructField("age", IntegerType(), True)
   ]
)
output = spark.createDataFrame(rdd, schema).collect()
print(output)

# [Row(name='Alice', age=1)]

基于pandas DataFrame创建pyspark DataFrame

df.toPandas()可以把pyspark DataFrame转换为pandas DataFrame。


df = spark.createDataFrame(rdd, ['name', 'age'])
print(df)  # DataFrame[name: string, age: bigint]

print(type(df.toPandas()))  # <class 'pandas.core.frame.DataFrame'>

# 传入pandas DataFrame
output = spark.createDataFrame(df.toPandas()).collect()
print(output)

# [Row(name='Alice', age=1)]

创建有序的DataFrame


output = spark.range(1, 7, 2).collect()
print(output)
# [Row(id=1), Row(id=3), Row(id=5)]

output = spark.range(3).collect()
print(output)
# [Row(id=0), Row(id=1), Row(id=2)]

通过临时表得到DataFrame


spark.registerDataFrameAsTable(df, "table1")
df2 = spark.table("table1")
b = df.collect() == df2.collect()
print(b)
# True

配置DataFrame和临时表

创建DataFrame时指定列类型

在createDataFrame中可以指定列类型,只保留满足数据类型的列,如果没有满足的列,会抛出错误。


a = [('Alice', 1)]
rdd = sc.parallelize(a)

# 指定类型于预期数据对应时,正常创建
output = spark.createDataFrame(rdd, "a: string, b: int").collect()
print(output)  # [Row(a='Alice', b=1)]
rdd = rdd.map(lambda row: row[1])
print(rdd)  # PythonRDD[7] at RDD at PythonRDD.scala:53

# 只有int类型对应上,过滤掉其他列。
output = spark.createDataFrame(rdd, "int").collect()
print(output)   # [Row(value=1)]

# 没有列能对应上,会抛出错误。
output = spark.createDataFrame(rdd, "boolean").collect()
# TypeError: field value: BooleanType can not accept object 1 in type <class 'int'>

注册DataFrame为临时表


spark.registerDataFrameAsTable(df, "table1")
spark.dropTempTable("table1")

获取和修改配置


print(spark.getConf("spark.sql.shuffle.partitions"))  # 200
print(spark.getConf("spark.sql.shuffle.partitions", u"10"))  # 10
print(spark.setConf("spark.sql.shuffle.partitions", u"50"))  # None
print(spark.getConf("spark.sql.shuffle.partitions", u"10"))  # 50

注册自定义函数


spark.registerFunction("stringLengthString", lambda x: len(x))
output = spark.sql("SELECT stringLengthString('test')").collect()
print(output)
# [Row(stringLengthString(test)='4')]

spark.registerFunction("stringLengthString", lambda x: len(x), IntegerType())
output = spark.sql("SELECT stringLengthString('test')").collect()
print(output)
# [Row(stringLengthString(test)=4)]

spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
output = spark.sql("SELECT stringLengthInt('test')").collect()
print(output)
# [Row(stringLengthInt(test)=4)]

查看临时表列表

可以查看所有临时表名称和对象。


spark.registerDataFrameAsTable(df, "table1")
print(spark.tableNames())  # ['table1']
print(spark.tables())  # DataFrame[database: string, tableName: string, isTemporary: boolean]
print("table1" in spark.tableNames())  # True
print("table1" in spark.tableNames("default"))  # True

spark.registerDataFrameAsTable(df, "table1")
df2 = spark.tables()
df2.filter("tableName = 'table1'").first()
print(df2)  # DataFrame[database: string, tableName: string, isTemporary: boolean]

从其他数据源创建DataFrame

MySQL

前提是需要下载jar包。
Mysql-connector-java.jar


from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F

sc = SparkContext("local", appName="mysqltest")
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(
   url="jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"
       "useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"
       "useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load()
df.show(n=5)
sc.stop()

参考

RDD和DataFrame的区别
spark官方文档 翻译 之pyspark.sql.SQLContext

来源:https://blog.csdn.net/weixin_39198406/article/details/104916715

0
投稿

猜你喜欢

  • 1 自动微分我们在《数值分析》课程中已经学过许多经典的数值微分方法。许多经典的数值微分算法非常快,因为它们只需要计算差商。然而,他们的主要缺
  • 之前介绍过python开发工具Jupyter的使用,今天继续讲解python的数据类型,python中有整型、浮点型、字符串、布尔类型,我们
  • 最最简单的操作import numpy as npimport matplotlib.pyplot as pltfig = plt.figu
  • 在定义类的过程中,无论是显式创建类的构造方法,还是向类中添加实例方法,都要求将 self 参数作为方法的第一个参数。例如,定义一个 Pers
  • 多线程类似于同时执行多个不同程序,多线程运行有如下优点:使用线程可以把占据长时间的程序中的任务放到后台去处理。用户界面可以更加吸引人,比如用
  • 一个更易读的网站意味着网站使用性的改良以及提供愉悦的阅读体验。我们希望浏览者们能或者这些好处不是吗?这篇文章我们将介绍5个简单的方法让你能提
  • 一、Tkinter什么是GUI图形用户界面(Graphical User Interface,简称 GUI,又称图形用户接口)是指采用图形方
  • Python版本 实现了比之前的xxftp更多更完善的功能1、继续支持多用户2、继续支持虚拟目录3、增加支持用户根目录以及映射虚拟目录的权限
  • Web技术的发展速度太快了,如果你不与时俱进,就会被淘汰。因此,为了应对即将到来的HTML5,本文总结了22个HTML5的初级技巧,希望能对
  • 本文实例讲述了python socket多线程通讯方法。分享给大家供大家参考,具体如下:#!/usr/bin/evn python"
  • 前言本文主要给大家介绍了关于Django快速分页的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧。分页在web开发
  • 在MySQL数据库中导出整个数据库:1.导出整个数据库mysqldump -u 用户名 -p 数据库名 > 导出的文件名mysqldu
  • 做设计类网址导航的初衷是为了资源整合,也是在尝试解决问题。假定访问用户都是行业人士,或者目地性很强的有一定了解的用户,应该如何考虑这个组织系
  • 本文实例为大家分享了Django文件上传与下载的具体代码,供大家参考,具体内容如下Django1.4首先是上传:#settings.pyME
  • 其实网上有很多关于python2.6.6 升级到python2.7的文章,但是我对比这些类似的文章升级之后,发现其中有错误的地方,于是决定还
  • 最近经常需要出一些临时性的报表,于是就用python 的smtplib 和email 两模块写了个小程序,当数据处理完后通过邮箱把报表文件从
  • 按照惯例,年底的淘宝的确是到了“需要改版的时候”。这次新版的淘宝首页上线,乍看并没有多少夺人眼球的地方,但仔细揣摩其中的细节,还是发现了不少
  • 本文实例讲述了Python 面向对象之封装、继承、多态操作。分享给大家供大家参考,具体如下:封装、继承、多态 是面向对象的3大特性为啥要封装
  • Python:获取&ldquo; 3年前的今天&rdquo;的日期时间Python: get datetime for &#
  • 本文实例讲述了python使用wxPython打开并播放wav文件的方法。分享给大家供大家参考。具体实现方法如下:''
手机版 网络编程 asp之家 www.aspxhome.com