详解Spark Sql在UDF中如何引用外部数据
作者:KYs_Daddy 发布时间:2021-08-17 14:51:17
前言
Spark Sql可以通过UDF来对DataFrame的Column进行自定义操作。在特定场景下定义UDF可能需要用到Spark Context以外的资源或数据。比如从List或Map中取值,或是通过连接池从外部的数据源中读取数据,然后再参与Column的运算。
Excutor中每个task的工作线程都会对UDF的call进行调用,外部资源的使用发生在Excutor端,而资源加载既能发生在Driver端,也可以发生在Excutor端。如果外部资源对象能序列化,我们可以在Driver端进行初始化,然后广播(broadcast)到Excutor端参与运算。对于不能进行序列化的对象,如JedisPool(redis连接池),只能在Excutor端进行初始化。
因此,在UDF中引用外部资源有以下两类方法:
能序列化:在Driver端进行初始化,然后通过spark的broadcast方法广播到Excutor上进行使用;
不能序列化:在Excutor端进行初始化然后使用。
下面我们将用一个实际例子对上述两种方法进行详细介绍。
本文使用环境:Spark-2.3.0,Java 8。
场景介绍
我们以一个DataFrame(两个字段node_1、node_2)作为原始数据;一棵二叉搜索树(BST)作为Spark外部被引用数据;目标是定义一个UDF来判断:BST中是否刚好存在一个父节点,它的左右子节点值与node_1、node_2两个字段值相同。然后将判断结果输出到新列is_bro。其中DataFrame:
BST:
输出DataFrame:
二叉树的定义与判断是否为父节点的左右子节点的逻辑如下:
import java.io.Serializable;
/**
* @author wangjiahui
* @create 2021-03-14-10:57
*/
public class TreeNode implements Serializable{
private Integer val;
private TreeNode left;
private TreeNode right;
public TreeNode() {
}
public TreeNode(Integer val) {
this.val = val;
}
public TreeNode(Integer val, TreeNode left, TreeNode right) {
this.val = val;
this.left = left;
this.right = right;
}
public Integer getVal() {
return val;
}
public void setVal(Integer val) {
this.val = val;
}
public TreeNode getLeft() {
return left;
}
public void setLeft(TreeNode left) {
this.left = left;
}
public TreeNode getRight() {
return right;
}
public void setRight(TreeNode right) {
this.right = right;
}
/**
* 判断是否刚好有一个父节点的左、右子节点值与num1、num2相同
* @param num1
* @param num2
* @return
*/
public Boolean isBro( Integer num1, Integer num2) {
if (null == getLeft()||null == getRight()) {
return false;
}
if (getLeft().getVal().compareTo(num1)==0 && getRight().getVal().compareTo(num2)==0) {
return true;
}
return getLeft().isBro(num1, num2) || getRight().isBro(num1, num2);
}
}
生成上图所示BST的方法createTree()如下:
public static TreeNode createTree(){
TreeNode[] treeNodes = new TreeNode[8];
for(int i=1; i<=7; i++){
treeNodes[i] = new TreeNode(i);
}
treeNodes[2].setLeft(treeNodes[1]);
treeNodes[2].setRight(treeNodes[3]);
treeNodes[6].setLeft(treeNodes[5]);
treeNodes[6].setRight(treeNodes[7]);
treeNodes[4].setLeft(treeNodes[2]);
treeNodes[4].setRight(treeNodes[6]);
return treeNodes[4];
}
方法一 Driver端加载
在Driver端完成初始化并定义UDF
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
// 初始化树
TreeNode tree = createTree();
// broadcast
Broadcast<TreeNode> broadcastTree = javaSparkContext.broadcast(tree);
// lambda表达式定义udf
UserDefinedFunction udf = functions.udf((Integer num1, Integer num2) -> {
return broadcastTree.getValue().isBro(num1,num2);
}, BooleanType);
// 注册udf
spark.udf().register("isBro",udf);
// 使用udf
df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));
方法二 Excutor端加载
如果我们直接在call中进行初始化会存在问题:由于多个task的线程会在同一时刻对UDF中的call进行调用,导致资源对象在同一时刻被初始化多次,造成Excutor内存资源浪费。此外,如果外部资源为连接池对象,在同一时刻初始化多次会建立多个连接,增加外部数据源的访问压力。
为此,我们可以借助单例模式中的懒汉式实现,让资源在每个Excutor中只被初始化一次。懒汉式的实现需要新建一个类(命名为IsBroUDF2)并实现UDF2<Integer, Integer, Boolean>接口,重写UDF2的call方法:
import org.apache.spark.sql.api.java.UDF2;
/**
* @author wangjiahui
* @create 2021-03-14-14:25
*/
public class IsBroUDF2 implements UDF2<Integer,Integer,Boolean> {
// 定义静态的TreeNode成员变量
private static volatile TreeNode treeNode;
public IsBroUDF2() {
}
@Override
public Boolean call(Integer num1, Integer num2) throws Exception {
// 懒汉式 二次判定
if(null==treeNode){
synchronized (IsBroUDF2.class){
if(null==treeNode){
treeNode=createTree();
}
}
}
return treeNode.isBro(num1,num2);
}
// 辅助方法
public static TreeNode createTree(){
TreeNode[] treeNodes = new TreeNode[8];
for(int i=1; i<=7; i++){
treeNodes[i] = new TreeNode(i);
}
treeNodes[2].setLeft(treeNodes[1]);
treeNodes[2].setRight(treeNodes[3]);
treeNodes[6].setLeft(treeNodes[5]);
treeNodes[6].setRight(treeNodes[7]);
treeNodes[4].setLeft(treeNodes[2]);
treeNodes[4].setRight(treeNodes[6]);
return treeNodes[4];
}
}
然后注册和使用UDF
// 注册udf
spark.udf().register("isBro",new IsBroUDF2(), BooleanType);
// 使用udf
df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));
在call方法中通过加锁可以实现TreeNode资源在同一个Excutor中只被初始化一次。除了上面介绍的这种懒汉式的写法之外,还可以通过静态内部类懒加载、枚举等方式实现TreeNode资源在Excutor端只被初始化一次。
小结
想要在Spark Sql的UDF中使用Spark外的资源和数据进行运算,我们既可以在Driver端预先进行初始化然后广播到各Excutor上(要求对象能序列化),也可以直接在Excutor端进行加载;如果在Excutor端加载要保证外部资源对象只被初始化一次。
来源:https://juejin.cn/post/7194498526628282427


猜你喜欢
- JAVA中反射机制(JavaBean的内省与BeanUtils库)内省(Introspector) 是Java 语言对JavaBean类属性
- /// <summary>/// 获取本机在局域网的IP地址/// </summary>/// <return
- 一、Canvas Canvas中的方法很多,这里我们只挑常用的进行讲解说明 Canvas可以绘制的对象有:弧线(arcs) canvas.填
- 整理文档,搜刮出一个spring boot实现过滤器和 * demo ,稍微整理精简一下做下分享。 * 定义:@WebServletpubl
- 在日常开发过程中,偶尔会出现一些极端问题。比如 网络重复请求,很难过滤 请求的问题。下面一段代码,可以解决这个重复请求的问题。下面上一段代码
- 本文介绍的仿IOS对话框的实现,先来看一下效果图具体代码如下:public class AlertDialog { private Cont
- 概述本文的编写初衷,是想了解一下Spring Boot2中,具体是怎么序列化和反序列化JSR 310日期时间体系的,Spring MVC应用
- 一、前言最近在加强 ITAEM 团队的一个 app 项目——学生教师学习交流平台人员组成:安卓 + 前端 + 后台后台 DAO 层借鉴了华工
- Android中ImageView的使用:点击按钮,改变图片透明度,切换图片布局是三个按钮组件和一个ImageView组件<?xml
- 目录Handler概要构造器sendMessageAtTimedispatchMessageThreadLocalLooperMessage
- 本文实例讲述了C# winform实现右下角弹出窗口结果的方法。分享给大家供大家参考,具体如下:using System.Runtime.I
- APP启动速度非常重要,APP启动速度慢,可能会造成用户体验不良好,尤其是在最近用Android studio之后,如果长时间不打开app,
- 前言在上篇文章《初识GraphQL》中我们大致的了解了GraphQL作用,并通过简单示例初步体验了GraphQL的使用。下面我们从Hello
- 完美地模仿了2048游戏,是根据网友的一个2048改的。Block.javaimport javax.swing.*;import java
- 本文实例讲述了Java中计算时间差的方法。分享给大家供大家参考。具体如下:假设现在是2004-03-26 13:31:40过去是:2004-
- 是否允许循环依赖和bean的命名重复取决于beanfactory的两大属性allowBeanDefinitionOverriding和all
- 0 实验环境在Android Studio中进行有关代码的编写和界面效果展示。SQLite数据库的图形化工具SQLiteStudio下载网址
- 前言之前在SpringBoot项目中简单使用定时任务,不过由于要借助cron表达式且都提前定义好放在配置文件里,不能在项目运行中动态修改任务
- 背景:在平时的开发中,我们时常会遇到下列场景公司的组织架构的数据存储与展示文件夹层级的数据存储与展示评论系统中,父评论与诸多子评论的数据存储
- 前言之所以会有这篇文章,是因为公司的开发环境比较老,寻找一些jar包的时候总是会纠结对应的编译版本,感觉很麻烦,所以写了一个工具类用于读取c