Java 数据流之Broadcast State
作者:Vicky_Tang 发布时间:2022-05-21 15:17:19
标签:Java,Broadcast,State,数据流
一、BroadcastState 的介绍
广播状态(Broadcast State)是 Operator State 的一种特殊类型。如果我们需要将配置 、规则等低吞吐事件流广播到下游所有 Task 时,就可以使用 BroadcastState。下游的 Task 接收这些配置、规则并保存为 BroadcastState,所有Task 中的状态保持一致,作用于另一个数据流的计算中。
简单理解:一个低吞吐量流包含一组规则,我们想对来自另一个流的所有元素基于此规则进行评估。
场景:动态更新计算规则。
广播状态与其他操作符状态的区别在于:
它有一个 map 格式,用于定义存储结构
它仅对具有广播流和非广播流输入的特定操作符可用
这样的操作符可以具有不同名称的多个广播状态
二、BroadcastState 操作流程
三、案例实现
从端口读取Json数据作为事件流
从Mysql读取数据作为广播流
关联广播流和事件流
匹配对应的用户信息
package cn.kgc.broadcast
import java.sql.{Connection, DriverManager, PreparedStatement}
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
// (001,'tom',18,'北京',15830010002)
// 定义样例类 接受 MySQL的用户数据
case class BaseUserInfo(id:Long,name:String,age:Int,city:String,phone:Long)
// user_id、user_name、user_addrss、behaviour、url
// 输出数据类型
case class UserVisitInfo(id:Long,name:String,city:String,behaviour:String,url:String)
// 实现广播ProcessFunction
class MyBroadcastFunc extends BroadcastProcessFunction[String,(Long, BaseUserInfo),UserVisitInfo]{
lazy val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])
// 处理的是日志流中的每条数据
override def processElement(value: String, ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#ReadOnlyContext, out: Collector[UserVisitInfo]): Unit = {
// {"user_id":"001","ts":"2021-07-10 11:10:05","behaviour":"browse","url":"https://www.tb1.com/1.html"}
val user_id = JSON.parseObject(value).getLong("user_id")
val behaviour = JSON.parseObject(value).getString("behaviour")
val url = JSON.parseObject(value).getString("url")
val mapState = ctx.getBroadcastState(mapStateDes)
val userInfo = mapState.get(user_id)
out.collect(UserVisitInfo(user_id,userInfo.name,userInfo.city,behaviour,url))
}
// 处理的是广播流的每个值
override def processBroadcastElement(value: (Long, BaseUserInfo), ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#Context, out: Collector[UserVisitInfo]): Unit = {
val mapState: BroadcastState[Long, BaseUserInfo] = ctx.getBroadcastState(mapStateDes)
mapState.put(value._1,value._2)
}
}
class UserSourceFunc extends RichParallelSourceFunction[BaseUserInfo]{
var conn:Connection = _
var statement: PreparedStatement = _
var flag:Boolean = true
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC","root","liu911223")
statement = conn.prepareStatement("select * from base_user")
}
override def run(ctx: SourceFunction.SourceContext[BaseUserInfo]): Unit = {
while (flag){
Thread.sleep(5000)
val resultSet = statement.executeQuery()
while (resultSet.next()){
val id = resultSet.getLong(1)
val name = resultSet.getString(2)
val age = resultSet.getInt(3)
val city = resultSet.getString(4)
val phone = resultSet.getLong(5)
ctx.collect(BaseUserInfo(id,name,age,city,phone))
}
}
}
override def cancel(): Unit = {
flag = false
}
override def close(): Unit = {
if (statement != null) statement.close()
if (conn != null) conn.close()
}
}
object BroadcastDemo01 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 定义为KV,一方面是为了广播的时候定义为map,另一方面是为了做关联操作
val userBaseDS: DataStream[(Long, BaseUserInfo)] = env.addSource(new UserSourceFunc)
.map(user => (user.id, user))
val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])
val broadCastStream: BroadcastStream[(Long, BaseUserInfo)] = userBaseDS.broadcast(mapStateDes)
// 日志JSON数据
val dataInfoDS: DataStream[String] = env.socketTextStream("master",1314)
dataInfoDS.connect(broadCastStream)
.process(new MyBroadcastFunc)
.print()
env.execute()
}
}
来源:https://blog.csdn.net/sweet19920711/article/details/120027690


猜你喜欢
- 曾经遇到过这样的问题,在我的代码中使用了通知栏,一切都正常,但是就是正在进行的通知栏中属于我的程序的那一条总是上下跳来跳去,一闪一闪的。感觉
- 为帮助开发者更方便、更安全地开发和调试基于微信的网页,微信推出了 web 开发者工具。它是一个桌面应用,通过模拟微信客户端的表现,使得开发者
- Jfreechart本身不能生成SVG图形,但是可以借助另外一个东西,辅助生成.好像是这个:batik ,具体代码请看下文一:Java生成s
- 1.引入依赖 <!--mybatisplus依赖--> <dependency> &nbs
- 在使用Android Studio开发的时候,如遇到多个项目引用同一个library的情况时,会遇到在每个项目中都要有一套library的代
- 在C程序代码中我们可以利用操作系统提供的互斥锁来实现同步块的互斥访问及线程的阻塞及唤醒等工作。然而在Java中除了提供LockAPI外还在语
- mybatis if test判断BigDecimal遇到的坑<update id="test" paramete
- 了解过spring-Boot这个技术的,应该知道Spring-Boot的核心配置文件application.properties,当然也可以
- 本文实例为大家分享了Android实现秒表功能的具体代码,供大家参考,具体内容如下设计完成一个秒表,具备启停功能,正确使用工作线程完成界面刷
- 源代码:http://github.com/lovewenyo/HttpDemo1. HttpURLConnection使用JDK原生提供的
- 项目中遇到springBoot+docker需要配置不同环境变量的问题,做个简单的总结:1.开发环境ide中启动项目可以通过ide的环境变量
- 话说RecyclerView已经面市很久,也在很多应用中得到广泛的使用,在整个开发者圈子里面也拥有很不错的口碑,那说明RecyclerVie
- 一、Maven生命周期、阶段、目标 &nbs
- 本文实例讲述了Android开发使用Messenger及Handler进行通信的方法。分享给大家供大家参考,具体如下:1. 客户端servi
- Java中获取整点时间戳在实际的开发过程中,前端给后端传时间的时候,有时候传的是整点数值,比如:timeList=[00,01,02,03,
- wait(), notify(), notifyAll()等方法介绍在Object.java中,定义了wait(), notify()和no
- 前言需要对一个List中的对象进行唯一值属性去重,属性求和,对象假设为BillsNums,有id、nums、sums三个属性,其中id表示唯
- 将自然语言编写的测试用例转换为可执行的测试,可以大大降低需求与开发之间的沟通成本,这是BDD(行为驱动开发)希望达到的效果。SpecFlow
- 现有的热修复框架很多,尤以AndFix 和Tinker比较多具体的实现方式和项目引用可以参考网络上的文章,今天就不谈,也不是主要目的今天就来
- using System.IO;using System.IO.Compression;using System.Web;using Sys