Flink支持哪些数据类型?
作者:蓝天elasticsearch 发布时间:2023-01-15 06:55:43
一、支持的数据类型
Flink 对可以在 DataSet 或 DataStream 中的元素类型进行了一些限制。这样做的原因是系统会分析类型以确定有效的执行策略。
1.Java Tuple 和 Scala Case类;
2.Java POJO;
3.基本类型;
4.通用类;
5.值;
6.Hadoop Writables;
7.特殊类型
二、Flink之Tuple类型
Tuple类型 Tuple
是flink
一个很特殊的类型 (元组类型),是一个抽象类,共26个Tuple
子类继承Tuple
他们是 Tuple0
一直到Tuple25
package org.apache.flink.api.java.tuple;
import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.types.NullFieldException;
@Public
public abstract class Tuple implements Serializable {
private static final long serialVersionUID = 1L;
public static final int MAX_ARITY = 25;
private static final Class<?>[] CLASSES = new Class[]{Tuple0.class, Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class};
public Tuple() {
}
public abstract <T> T getField(int var1);
public <T> T getFieldNotNull(int pos) {
T field = this.getField(pos);
if (field != null) {
return field;
} else {
throw new NullFieldException(pos);
}
}
public abstract <T> void setField(T var1, int var2);
public abstract int getArity();
public abstract <T extends Tuple> T copy();
public static Class<? extends Tuple> getTupleClass(int arity) {
if (arity >= 0 && arity <= 25) {
return CLASSES[arity];
} else {
throw new IllegalArgumentException("The tuple arity must be in [0, 25].");
}
}
public static Tuple newInstance(int arity) {
switch(arity) {
case 0:
return Tuple0.INSTANCE;
case 1:
return new Tuple1();
case 2:
return new Tuple2();
case 3:
return new Tuple3();
case 4:
return new Tuple4();
case 5:
return new Tuple5();
case 6:
return new Tuple6();
case 7:
return new Tuple7();
case 8:
return new Tuple8();
case 9:
return new Tuple9();
case 10:
return new Tuple10();
case 11:
return new Tuple11();
case 12:
return new Tuple12();
case 13:
return new Tuple13();
case 14:
return new Tuple14();
case 15:
return new Tuple15();
case 16:
return new Tuple16();
case 17:
return new Tuple17();
case 18:
return new Tuple18();
case 19:
return new Tuple19();
case 20:
return new Tuple20();
case 21:
return new Tuple21();
case 22:
return new Tuple22();
case 23:
return new Tuple23();
case 24:
return new Tuple24();
case 25:
return new Tuple25();
default:
throw new IllegalArgumentException("The tuple arity must be in [0, 25].");
}
}
}
查看源码我们看到Tuple0
一直到Tuple25
我们看flink为我们为我们构造好了0-25个字段的模板类
ackage org.apache.flink.api.java.tuple;
import java.io.ObjectStreamException;
import org.apache.flink.annotation.Public;
@Public
public class Tuple0 extends Tuple {
private static final long serialVersionUID = 1L;
public static final Tuple0 INSTANCE = new Tuple0();
public Tuple0() {
}
public int getArity() {
return 0;
}
public <T> T getField(int pos) {
throw new IndexOutOfBoundsException(String.valueOf(pos));
}
public <T> void setField(T value, int pos) {
throw new IndexOutOfBoundsException(String.valueOf(pos));
}
public Tuple0 copy() {
return new Tuple0();
}
public String toString() {
return "()";
}
public boolean equals(Object o) {
return this == o || o instanceof Tuple0;
}
public int hashCode() {
return 0;
}
private Object readResolve() throws ObjectStreamException {
return INSTANCE;
}
}
三、Tuple的使用
方式一:初始化元组
可使用静态方法 newInstance进行元组构造 指定元组空间大小;
ex: 1 则元组只有一个空间,则实际使用的Tuple1 字段只有f0
ex: 12 则元组只有两个空间,则实际使用的Tuple2 字段只有f0,f1
指定 Tuple元组空间大小 (可理解为字段个数)
Tuple tuple = Tuple.newInstance(1);
方式一:构造元组
使用Tuple.newInstance(xx),指定元组空间大小的话,这样存取虽然能够实现,但会存在存储索引位置使用不正确的情况,可能由于失误操作编写出索引越界异常,而且使用不太方便,使用Tuplex.of(数据)方法构造Tuple元组
Tuple3<String, String, String> tuple3 = Tuple3.of("test0", "test1", "test2");
System.out.println(tuple3.f0); // test0
System.out.println(tuple3.f1); // test1
System.out.println(tuple3.f2); // test2
四、Flink之POJO类型
Java和Scala的类在满足下列条件时,将会被Flink视作特殊的POJO数据类型专门进行处理:
1.是公共类;
2.无参构造是公共的;
3.所有的属性都是可获得的(声明为公共的,或提供get,set方法);
4.字段的类型必须是Flink支持的。Flink会用Avro来序列化任意的对象。
Flink会分析POJO类型的结构获知POJO的字段。POJO类型要比一般类型好用。此外,Flink访问POJO要比一般类型更高效。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) { this.word = word; this.count = count; }
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word");
五、Flink之基本类型
Flink支持Java和Scala所有的基本数据类型,比如 Integer,String,和Double。
六、Flink之通用类型
Flink支持大多数的Java,Scala类(API和自定义)。包含不能序列化字段的类在增加一些限制后也可支持。遵循Java Bean规范的类一般都可以使用。
所有不能视为POJO的类Flink都会当做一般类处理。这些数据类型被视作黑箱,其内容是不可见的。通用类使用Kryo进行序列/反序列化。
七、Flink之值类型Values
通过实现org.apache.flinktypes.Value接口的read和write方法提供自定义代码来进行序列化/反序列化,而不是使用通用的序列化框架。
Flink预定义的值类型与原生数据类型是一一对应的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)。这些值类型作为原生数据类型的可变变体,他们的值是可以改变的,允许程序重用对象从而缓解GC的压力。
八、Flink之Hadoop的Writable类
它实现org.apache.hadoop.Writable接口的类型,该类型的序列化逻辑在write()和readFields()方法中实现。
九、Flink之特殊类型
Flink比较特殊的类型有以下两种:
1.Scala的 Either、Option和Try。
2.Java ApI有自己的Either实现。
Java Api 与 Scala 的 类似Either
,它表示两种可能类型的值,Left或Right。Either
对于错误处理或需要输出两种不同类型的记录的运算符很有用。
类型擦除和类型推理
Java编译器在编译之后会丢弃很多泛型类型信息。这在Java中称为类型擦除。这意味着在运行时,对象的实例不再知道其泛型类型。
例如,在JVM中,DataStream<String>和DataStream<Long>的实例看起来是相同的。
List<String> l1 = new ArrayList<String>();
List<Integer> l2 = new ArrayList<Integer>();
System.out.println(l1.getClass() == l2.getClass());
泛型:一种较为准确的说法就是为了参数化类型,或者说可以将类型当作参数传递给一个类或者是方法。Flink 的Java API会试图去重建(可以做类型推理)这些被丢弃的类型信息,并将它们明确地存储在数据集以及操作中。你可以通过DataStream.getType()方法来获取类型,这个方法将返回一个TypeInformation的实例,这个实例是Flink内部表示类型的方式。
来源:https://blog.csdn.net/test_test111/article/details/117674481
猜你喜欢
- 本文将通过阅读spring源码,分析@ComponentScan注解扫描组件的原理。和@Bean注解一样,@ComponentScan注解也
- 上一节,简单讲述了 Mybatis-Plus 搭建与使用入门,这一节,简单讲一下如何使用 MP 实现多表分页。分析使用的工程,依旧是 spr
- 基于 springboot+vue的测试平台开发一、前端环境搭建在前端框架vue-element-admin这个项目中,有一个简洁轻量型的项
- 1.前言在Mybatis中需要创建的配置文件有sqlMapconfig.xml,映射文件xxxMapper.xml,而这些文件在idea中并
- 一、需求分析:1、输入一个数组-----------------------------------------》程序要接收一组输入的数组,
- 一次正常的请求最近别人需要调用我们系统的某一个功能,对方希望提供一个api让其能够更新数据。由于该同学是客户端开发,于是有了类似以下代码。@
- 当使用spring-Boot时,嵌入式Servlet容器通过扫描注解的方式注册Servlet、Filter和Servlet规范的所有 * (
- 前情提要本文中提供了九种方式获取resources目录下文件的方式。其中打印文件的方法如下: /**
- Jenkins是一个开源软件项目,旨在提供一个开放易用的软件平台,使软件的持续集成变成可能Jenkins是基于Java开发的一种持续集成工具
- 现假设某个公司采用公用电话来传递数据,数据是四位的整数,在传递过程中是加密的。加密规则是每位数字都加上5,然后再用除以10的余数代替该数字,
- 前言使用SpringBoot来开发项目相对于传统模式,要快速优雅许多,相信目前国内绝大部分web项目的开发还没有使用SpringBoot来做
- 一、问题描述有时候,我们会遇到在遍历List集合的过程中删除数据的情况。看着自己写的代码,感觉完全没有问题,但就是达不到预期的效果,这是为什
- 本文通过优化买票的重复流程来说明享元模式,为了加深对该模式的理解,会以String和基本数据类型的包装类对该模式的设计进一步说明。读者可以拉
- 前言Word中可以针对不同文档排版设计要求来设置背景设置颜色。常见的可设置单一颜色、渐变色或加载图片来设置成背景。下面通过Java来设置以上
- Mybatis Plus select 查询部分字段Mybatis Plus select语句默认查询所有字段,如需要指定字段查询,则需使用
- 介绍:上篇给大家介绍了ssm多模块项目的搭建,在搭建过程中spring整合springmvc和mybatis时会有很多的东西需要我们进行配置
- 1.更新同步方式:/** * 三个参数 * the path of the node
- springboot扩展MVC自定义 config -> SpringMvcConfig.java下边就是扩展springMVC的模板
- package airthmatic;public class demo10 { public static void main(
- 题目一链表题——链表合并根据给定的两个升序链表合并为一个新的升序链表具体题目如下解法/** * Definition for singly-