Spark SQL 编程初级实践详解
作者:WHYBIGDATA 发布时间:2022-06-13 18:37:56
写在前面
Linux:
CentOS7.5
Spark:
spark-3.0.0-bin-hadoop3.2
IDE:
IntelliJ IDEA2020.2.3
第1题:Spark SQL 基本操作
将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。
{ "id":1 , "name":" Ella" , "age":36 }; { "id":2, "name":"Bob","age":29 }; { "id":3 , "name":"Jack","age":29 }; { "id":4 , "name":"Jim","age":28 } ;{ "id":4 , "name":"Jim","age":28 }; { "id":5 , "name":"Damon" } ;{ "id":5 , "name":"Damon" }
为 employee.json 创建 DataFrame,并写出 Scala 语句完成下列操作:
第1小题:查询所有数据;
第2小题:查询所有数据,并去除重复的数据;
第3小题:查询所有数据,打印时去除 id 字段;
第4小题:筛选出 age>30 的记录;
第5小题:将数据按 age 分组;
第6小题:将数据按 name 升序排列;
第7小题:取出前 3 行数据;
第8小题:查询所有记录的 name 列,并为其取别名为 username;
第9小题:查询年龄 age 的平均值;
第10小题:查询年龄 age 的最小值。
主程序代码
import org.apache.spark.sql.{DataFrame, SparkSession}
object t1 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("t1")
.master("local[2]")
.getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read.json("dataset/ch05/employee.json")
// df.show()
// df.distinct().show()
// df.drop("id").show()
// df.filter(df("age") > 20).show()
// df.groupBy("name").count().show()
// df.sort(df("name").asc).show()
// val rows = df.take(3)
// rows.foreach(println)
// df.select(df("name").as("username")).show()
// df.agg("age" -> "avg").show()
df.agg("age" -> "min").show()
}
}
主程序执行结果
下图从上到下、从左到右以此为第一、二、三、…、十道题的执行结果
本题很简单,就是相关方法的调用。
第2题:编程实现将 RDD 转换为 DataFrame
题目
源文件内容如下(包含 id,name,age):
1,Ella,36 2,Bob,29 3,Jack,29
请先将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代码。
主程序代码
import org.apache.spark.sql.{DataFrame, SparkSession}
object t2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("t1")
.master("local[2]")
.getOrCreate()
val employeeInfo = spark.sparkContext.textFile("/input/dataset/employee.txt")
import spark.implicits._
val employeeDF: DataFrame = employeeInfo.map(_.split(","))
.map(attributes =>
Employee(attributes(0).trim.toInt, attributes(1), attributes(2).trim.toInt)
).toDF()
employeeDF.createTempView("employee")
val employeeRDD: DataFrame = spark.sql("select id, name, age from employee")
employeeRDD.map(e => {
"id:" + e(0) + ",name:" + e(1) + ",age:" + e(2)
}).show(10, false)
}
}
case class Employee(id: Long, name: String, age: Long) {
}
主程序执行结果
本题重在map算子的使用并创建视图执行sql查询,注意程序中要使用到import spark.implicits._
,
第3题:编程实现利用 DataFrame 读写 MySQL 的数据
题目
(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 6-2 所示的 两行数据。
(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 6-3 所 示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。
主程序代码
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object t3 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("t3")
.master("local[2]")
.getOrCreate()
val employeeRDD: RDD[Array[String]] = spark.sparkContext.parallelize(
Array("3 Mary F 26", "4 Tom M 23")).map(_.split(" ")
)
val schema: StructType = StructType(List(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
val rowRDD: RDD[Row] = employeeRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
val df: DataFrame = spark.createDataFrame(rowRDD, schema)
val properties = new Properties()
properties.put("user", "root");
properties.put("password", "123456");
properties.put("driver", "com.mysql.jdbc.Driver");
// serverTimezone=UTC语句需要跟在数据库连接语句的第一个位置,否则会报错
df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/bd01_spark?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true&useSSL=false",
"bd01_spark.employee", properties)
val jdbcDF: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/bd01_spark")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "employee")
.option("user", "root")
.option("password", "123456")
.load()
jdbcDF.agg("age" -> "max", "age" -> "sum").show(10, false)
}
}
本题主要在于MySQL的JDBC连接创建。
主程序执行结果
来源:https://juejin.cn/post/7223218860500697145


猜你喜欢
- webflux介绍Spring Boot 2.0spring.io 官网有句醒目的话是:BUILD ANYTHING WITH SPRING
- 本文实例为大家分享了java根据网络地址保存图片的具体代码,供大家参考,具体内容如下import java.io.BufferedInput
- 本文主要介绍了隐式Intent匹配目标组件的规则,若有叙述不清晰或是不准确的地方希望大家指出,谢谢大家: )1. Intent简
- 工厂方法模式,往往是设计模式初学者入门的模式,的确,有人称之为最为典型最具启发效果的模式。android中用到了太多的工厂类,其中有用工厂方
- 一:简述我们很多时候为了实现数据在线程级别下的隔离,会使用到ThreadLocal,那么TheadLocal是如何实现数据隔离的呢?今天就和
- 1.阻塞I/O模型阻塞IO模型是常见的IO模型,在读写数据时客户端会发生阻塞。阻塞IO模型的工作流程为:1.1在用户线程发出IO请求之后,内
- 此处项目环境为简单的springboot+mybatis环境。可查看到上一篇文章搭建的简单springboot+mybatis的项目想要控制
- 方法一:1.在pom.xml文件下添加依赖包<dependency><groupId>com.alibaba<
- springboot http转https一、安全证书的生成可以使用jdk自带的证书生成工具,jdk自带一个叫keytool的证书管理工具,
- 先来了解一下什么是XMLType类型。XMLType是Oracle从9i开始特有的数据类型,是一个继承了Blob的强大存在,可以用来存储xm
- 本文实例讲述了Spring实战之清除缓存操作。分享给大家供大家参考,具体如下:一 配置文件<?xml version="1.
- 前言前一篇文章我们熟悉了HikariCP连接池,也了解到它的性能很高,今天我们讲一下另一款比较受欢迎的连接池:Druid,这是阿里开源的一款
- 使用simplecommand下载网络图片,并显示到ImageView控件上。1 在app module的build.gradle将simp
- ionic App 解决android端在真机上 tab处于顶部的Bug在app.js 页面中添加以下代码.config(function(
- 在我们开发过程中用 Mybatis 经常会用到下面的例子Mapper如下Map<String ,String > testArr
- 最近在做上传文件的服务,简单看了网上的教程。结合实践共享出代码。由于网上的大多数没有服务端的代码,这可不行呀,没服务端怎么调试呢。Ok,先上
- 本文实例讲述了C#设计模式之Facade外观模式解决天河城购物问题。分享给大家供大家参考,具体如下:一、理论定义外观模式 &nbs
- 本文实例讲述了Aspectj框架。分享给大家供大家参考,具体如下:一 环境变量配置CLASSPATH配置为:.;d:\aspectj1.8\
- 一、why(为什么要用Hibernate缓存?)Hibernate是一个持久层框架,经常访问物理数据库。为了降低应用程序对物理数据源访问的频
- Android studio4.1更新后出现的问题如下> Task : app : kaptDebugKotlin FAILEDFAI