Spark JDBC操作MySQL方式详细讲解
作者:CarveStone 发布时间:2021-05-24 12:41:51
JDBC操作MySQL
在实际的企业级开发环境中,如果数据规模特S别大,此时采用传统的SQL语句去处理的话一般需要分成很多批次处理,而且很容易造成数据库服务宕机,且实际的处理过程可能会非常复杂,通过传统的Java EE等技术可能很难或者不方便实现处理算法,此时采用SparkSQL进行分布式分析处理就可以非常好的解决该问题,在生产环境下,一般会在Spark SQL和具体要操作的DB之间加上一个缓冲层次,例如中间使用Redis或者Kafka。
Spark SQL可以通过JDBC从传统的关系型数据库中读写数据,读取数据后直接生成的是DataFrame,然后再加上借助于Spark SQL丰富的API来进行各种操作。从计算数据规模的角度去讲,集群并行访问数据库数据,调用Data Frame Reader的Format(“JDBC”)的方式说明Spark SQL操作的数据来源是通过JDBC获得,JDBC后端一般都是数据库,例如MySQL、Oracle等。
JDBC读取数据方式
单Partition(无并发)
调用函数格式:def jdbc(url: String, table: String, properties: Properties): DataFrame
url:代表数据库的JDBC链接地址;
table:具体要链接的数据库;
这种方法是将所有的数据放在一个Partition中进行操作(即并发度为1),意味着无论给的资源有多少,只有一个Task会执行任务,执行效率比较慢,并且容易出现OOM。使用如下,在spark-shell中执行:
/*此为代码格式,实际中使用应替换相应字段中的内容*/
val url = "jdbc:mysql://localhost:/database"
val tableName = "table"
// 设置连接用户&密码
val prop = new java.util.Properties
prop.setProperty("user","username") //实际使用中替换username为相应的用户名
prop.setProperty("password","pwd") //实际使用中替换pwd为相应的密码
根据Long类型字段分区
/*此为代码格式,实际中使用应替换相应字段中的内容*/
def jdbc(
url: String,
table: String,
columnName: String, // 根据该字段分区,需要为整型,比如 id 等
lowerBound: Long, // 分区的下界
upperBound: Long, // 分区的上界
numPartitions: Int, //分区的个数
connectionProperties: Properties): DataFrame
根据字段将数据进行分区,放进不同的Partition中,执行效率较快,但是只能根据数据字段作为分区关键字。使用如下:
/*此为代码格式,实际中使用应替换相应字段中的内容*/
val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
val columnName = "colName"
val lowerBound = 1,
val upperBound = 10000000,
val numPartitions = 10,
// 设置连接用户&密码
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
将字段 colName 中发 1~10000000 条数据分区到 10 个 Partition 中。
根据任意类型字段分区
/*此为代码格式,实际中使用应替换相应字段中的内容*/
jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
以下使用时间字段进行分区:
/*此为代码格式,实际中使用应替换相应字段中的内容*/
val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
// 设置连接用户&密码
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
/**
* 将 9 月 16-12 月 15 三个月的数据取出,按时间分为 6 个 partition
* 为了减少事例代码,这里的时间都是写死的
* modified_time 为时间字段
*/
val predicates =
Array(
"2015-09-16" -> "2015-09-30",
"2015-10-01" -> "2015-10-15",
"2015-10-16" -> "2015-10-31",
"2015-11-01" -> "2015-11-14",
"2015-11-15" -> "2015-11-30",
"2015-12-01" -> "2015-12-15"
).map {
case (start, end) =>
s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time
as date) <= date '$end'"
}
这种方法可以使用任意字段进行分区,比较灵活,适用于各种场景。以MySQL 3000W数据量为例,如果单分区count,若干分钟就会报OOM;如果分成5~20个分区后,count操作只需要2s,效率会明显提高,这里就凸显出JDBC高并发的优势。Spark高并发度可以大幅度提高读取以及处理数据的速度,但是如果设置过高(大量的Partition同时读取)也可能会将数据源数据库宕掉。
JDBC读取MySQL数据
下面来进行实际操作,首先需要配置MySQL
免密登陆:
mysql -uroot
查看数据库:
show databases;
使用MySQL数据库:
use mysql;
修改表格的权限,目的是为了使其他主机可以远程连接 MySQL,通过此命令可以查看访问用户允许的主机名。
查看所有用户及其host:
select host, user from user;
将相应用户数据表中的host字段改成’%':
update user set host="%" where user="root";
刷新修改权限
flush privileges;
通过命令修改host为%,表示任意IP地址都可以登录。出现ERROR 1062 (23000): Duplicate entry '%-root' for key 'PRIMARY'
,是因为 user+host 是主键,不能重复,可以不用理会。也可通过以下命令删除user 为空的内容来解决:delete from user where user='';
。
在MySQL创建数据库和表格,插入数据,查看:
create database test; //创建数据库test
use test; //进入数据库test
create table people( name varchar(12), age int); //创建表格people并构建结构
insert into people values ("Andy",30),("Justin",19),("Dela",25),("Magi",20),("Pule",21),("Mike",12); //向people表中插入数据
select * from people; //输出people表中全部数据
编写代码读取MySQL表中数据:
//导入依赖环境
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SQLContext}
import java.util.Properties
val url = "jdbc:mysql://localhost/test" //MySQL地址及数据库
val username = "root" //用户名
val sqlContext = new SQLContext(sc)
sc.setLogLevel("WARN")
val uri = url + "?user=" + username + "&useUnicode=true&characterEncoding=UTF-8" //设置读取路径及用户名
val properties = new Properties() //创建JDBC连接信息
properties.put("user","root")
properties.put("driver", "com.mysql.jdbc.Driver")
val df_test: DataFrame = spark.sqlContext.read.jdbc(uri, "people", properties) //读取数据
df_test.select("name","age").collect().foreach(row => { //输出数据
println("name " + row(0) + ", age" + row(1))
})
df_test.write.mode("append").jdbc(uri,"people",properties) //向people表中写入读出的数据,相当于people表中有两份一样的数据
来源:https://blog.csdn.net/weixin_44018458/article/details/128800342


猜你喜欢
- 本文实例为大家分享了Java流布局图形界面编写代码,供大家参考,具体内容如下package jisuanqi;import java.awt
- 什么是WebSocket?WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信—
- 本文实例讲述了C#自定义繁体和简体字库实现中文繁体和简体之间转换的方法。分享给大家供大家参考。具体分析如下:这里使用C#自定义繁体和简体字库
- Redis 简介Redis 是完全开源的,遵守 BSD 协议,是一个高性能的 key-value 数据库。Redis 与其他 key - v
- 在spring上传文件中,一般都使用了MultipartFile来接收,但是有需要用到File的地方,这里只介绍两种转为File的方法,当然
- 在对类访问使用时,常用到的有访问类的成员、方法。实例化在对类进行访问时,需要将类进行实例化。并产生一个对象。可以使用关键字new来实现。由于
- ⛳️ 基本类型做形式参数(零散参数的数据接收)1、基本数据类型要求前台页面的表单输入框的name属性值与对应控制器方法中的形式参数名称与类型
- 前言taptap-developer是一个spring boot框架驱动的纯Grpc服务,所以,只用了四步,移除了web和spring cl
- 1、问题引入我们已经完成了后台系统的登录功能开发,但是目前还存在一个问题,就是用户如果不登录,直接访问系统首页面,照样可以正常访问。很明显,
- WPF实现 Gitee泡泡菜单框架使用大于等于.NET40;Visual Studio 2022;项目使用 MIT 开源
- 目录一、首先导入生成二维码和微信支付环境二、在application.yml文件配置微信所有需的基本配置1.导入2.创建MyWXPayCon
- 本文实例讲述了Android7.0开发实现Launcher3去掉应用抽屉的方法。分享给大家供大家参考,具体如下:年初做过一个项目,有一个需求
- DATAXDataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postg
- 之前看过一句话,说的特别好。有人问阅读源码有什么用?学习别人实现某个功能的设计思路,提高自己的编程水平。是的,大家都实现一个功能,不同的人有
- 使用Palette API选择颜色 良好的视觉设计是app成功所必不可少的, 而色彩设计体系是设计的基础构成. Palette包是
- 本文实例讲述了C#基于QRCode实现动态生成自定义二维码图片功能。分享给大家供大家参考,具体如下:二维码早就传遍大江南北了,总以为它是个神
- 前言在Java中,Range方法在IntStream和LongStream类中都可用。在IntStream类中,它有助于返回函数参数范围内I
- 一、身份证结构和形式在通用的身份证号码有15位的和18位的;15位身份证号码各位的含义:1、1-2位省、自治区、直辖市代码;2、3-4位地级
- 一、将已经编译后的java中Class文件进行打包;打包命令JAR 如:将某目录下的所有class文件夹全部进行打包处理;使用的命令:jar
- 一、引言我们都知道,数据库连接是很珍贵的资源,频繁的开关数据库连接是非常浪费服务器的CPU资源以及内存的,所以我们一般都是使用数据库连接池来