SpringBoot使用Spark过程详解
作者:我有一只肥螳螂 发布时间:2021-07-30 06:19:21
前提: 可以参考文章 SpringBoot 接入 Spark
SpringBoot 已经接入 Spark
已配置 JavaSparkContext
已配置 SparkSession
@Resource
private SparkSession sparkSession;
@Resource
private JavaSparkContext javaSparkContext;
读取 txt 文件
测试文件 word.txt
java 代码
textFile:获取文件内容,返回 JavaRDD
flatMap:过滤数据
mapToPair:把每个元素都转换成一个<K,V>类型的对象,如 <123,1>,<456,1>
reduceByKey:对相同key的数据集进行预聚合
public void testSparkText() {
String file = "D:\\TEMP\\word.txt";
JavaRDD<String> fileRDD = javaSparkContext.textFile(file);
JavaRDD<String> wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String, Integer> wordAndOneRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> wordAndCountRDD = wordAndOneRDD.reduceByKey((a, b) -> a + b);
//输出结果
List<Tuple2<String, Integer>> result = wordAndCountRDD.collect();
result.forEach(System.out::println);
}
结果得出,123 有 3 个,456 有 2 个,789 有 1 个
读取 csv 文件
测试文件 testcsv.csv
java 代码
public void testSparkCsv() {
String file = "D:\\TEMP\\testcsv.csv";
JavaRDD<String> fileRDD = javaSparkContext.textFile(file);
JavaRDD<String> wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());
//输出结果
System.out.println(wordsRDD.collect());
}
输出结果
读取 MySQL 数据库表
format:获取数据库建议是 jdbc
option.url:添加 MySQL 连接 url
option.user:MySQL 用户名
option.password:MySQL 用户密码
option.dbtable:sql 语句
option.driver:数据库 driver,MySQL 使用 com.mysql.cj.jdbc.Driver
public void testSparkMysql() throws IOException {
Dataset<Row> jdbcDF = sparkSession.read()
.format("jdbc")
.option("url", "jdbc:mysql://192.168.140.1:3306/user?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
.option("dbtable", "(SELECT * FROM xxxtable) tmp")
.option("user", "root")
.option("password", "xxxxxxxxxx*k")
.option("driver", "com.mysql.cj.jdbc.Driver")
.load();
jdbcDF.printSchema();
jdbcDF.show();
//转化为RDD
JavaRDD<Row> rowJavaRDD = jdbcDF.javaRDD();
System.out.println(rowJavaRDD.collect());
}
也可以把表内容输出到文件,添加以下代码
List<Row> list = rowJavaRDD.collect();
BufferedWriter bw;
bw = new BufferedWriter(new FileWriter("d:/test.txt"));
for (int j = 0; j < list.size(); j++) {
bw.write(list.get(j).toString());
bw.newLine();
bw.flush();
}
bw.close();
结果输出
读取 Json 文件
测试文件 testjson.json,内容如下
[{
"name": "name1",
"age": "1"
}, {
"name": "name2",
"age": "2"
}, {
"name": "name3",
"age": "3"
}, {
"name": "name4",
"age": "4"
}]
注意:testjson.json 文件的内容不能带格式,需要进行压缩
java 代码
createOrReplaceTempView:读取 json 数据后,创建数据表 t
sparkSession.sql:使用 sql 对 t 进行查询,输出 age 大于 3 的数据
public void testSparkJson() {
Dataset<Row> df = sparkSession.read().json("D:\\TEMP\\testjson.json");
df.printSchema();
df.createOrReplaceTempView("t");
Dataset<Row> row = sparkSession.sql("select age,name from t where age > 3");
JavaRDD<Row> rowJavaRDD = row.javaRDD();
System.out.println(rowJavaRDD.collect());
}
输出结果
中文输出乱码
测试文件 testcsv.csv
public void testSparkCsv() {
String file = "D:\\TEMP\\testcsv.csv";
JavaRDD<String> fileRDD = javaSparkContext.textFile(file);
JavaRDD<String> wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());
//输出结果
System.out.println(wordsRDD.collect());
}
输出结果,发现中文乱码,可恶
原因:textFile 读取文件没有解决乱码问题,但 sparkSession.read() 却不会乱码
解决办法:获取文件方式由 textFile 改成 hadoopFile,由 hadoopFile 指定具体编码
public void testSparkCsv() {
String file = "D:\\TEMP\\testcsv.csv";
String code = "gbk";
JavaRDD<String> gbkRDD = javaSparkContext.hadoopFile(file, TextInputFormat.class, LongWritable.class, Text.class).map(p -> new String(p._2.getBytes(), 0, p._2.getLength(), code));
JavaRDD<String> gbkWordsRDD = gbkRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());
//输出结果
System.out.println(gbkWordsRDD.collect());
}
输出结果
来源:https://blog.csdn.net/weixin_42555971/article/details/129008012


猜你喜欢
- 1.监听(Listener)<!-- 配置监听 --><listener><listener-class>
- 一. 概念简介在开始学习今天的知识之前,有必要先给大家讲解一下与今天内容相关的一些概念,否则可能会让一些小白产生迷惑。1. 日期和时间的区别
- 一 问题描述小明为位置1,求他到其他各顶点的距离。二 实现package graph.dijkstra;import j
- 尽管Java提供了一个可以处理文件的IO操作类。 但是没有一个复制文件的方法。 复制文件是一个重要的操作,当你的程序必须处理很多文件相关的时
- 一个让人赏心悦目的界面对软件来说非常重要,因此图形图像资源也显得非常重要。本讲就要谈一谈Android中处理图形图像的最重要的一个类Draw
- 一、题目描述题目实现:不同的客户端之间需要进行通信,一个客户端与指定的另一客户端进行通信,实现一对一聊天功能。实现一个客户端与指定的另一客户
- 使用enum进行定义/*枚举类型演示*/#include <stdio.h>int main() { enum /*
- 项目介绍:Android上最让人头疼的莫过于从网络获取图片、显示、回收,任何一个环节有问题都可能直接OOM,这个项目或许能帮到你。Unive
- 前言关于android的volley封装之前写过一篇文章,见链接(https://www.jb51.net/article/155875.h
- 我用的是Eclipse打包,但在CMD窗口执行的时候报“ActiveMQ.jar中没有主清单属性”错误。在网上搜了下,这个与MANIFEST
- 写在前面 众所周知,kafka是现代流行的消息队列,它使用经典的消息订阅发布模式实现消息的流转,大部分代码结合kaf
- 使用udp实现消息的接收和发送代码比较简单,但是别忘记关闭防火墙进行测试。首先便是服务端,使用Socket进行实现,参考代码如下: 
- 本文将介绍使用Spring Boot集成Mybatis并实现主从库分离的实现(同样适用于多数据源)。延续之前的Spring Boot 集成M
- 引导语Socket 面试最终题一般都是让你写一个简单的客户端和服务端通信的例子,本文就带大家一起来写这个 demo。1、要求可以使用 Soc
- 从现在开始,大家可以跟随着我的脚步来自定义一个属于自己的Spring框架。但是,在学习自定义Spring框架之前,我们得先来回顾一下Spri
- 接着上篇java验证码制作(上篇)给大家介绍有关java验证码的相关知识!方法三:用开源组件Jcaptcha实现,与Spring组合使用可产
- 这篇文章主要介绍了Java如何把int类型转换成byte,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的
- 一、基本介绍 1、介绍学习很多算法知识,力争做到最优解的学习过程中,很多时候都会遇到PriorityQueue(优先队列)。一个基
- C# WPF ListView控件的实例详解C#的WPF作为现在微软主流的桌面程序开发平台,相比过去的MFC时代,有了非常多的不同。本人刚从
- 简介本文用示例介绍使用MyBatis-Plus进行多表查询的方法,包括静态查询和动态查询。代码controllerpackage com.e