Flink开发IDEA环境搭建与测试的方法
作者:Frankdeng 发布时间:2023-02-21 21:20:46
一.IDEA开发环境
1.pom文件设置
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>2.7.6</hadoop.version>
<flink.version>1.6.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!-- <arg>-make:transitive</arg> -->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
2.flink开发流程
Flink具有特殊类DataSet
并DataStream
在程序中表示数据。您可以将它们视为可以包含重复项的不可变数据集合。在DataSet
数据有限的情况下,对于一个DataStream
元素的数量可以是 * 的。
这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除元素。你也不能简单地检查里面的元素。
集合最初通过在弗林克程序添加源创建和新的集合从这些通过将它们使用API方法如衍生map
,filter
等等。
Flink程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:
1.获取execution environment,
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2.加载/创建初始化数据
DataStream<String> text = env.readTextFile(file:///path/to/file);
3.指定此数据的转换
val mapped = input.map { x => x.toInt }
4.指定放置计算结果的位置
writeAsText(String path)
print()
5.触发程序执行
在local模式下执行程序
execute()
将程序达成jar运行在线上
./bin/flink run \
-m node21:8081 \
./examples/batch/WordCount.jar \
--input hdfs:///user/admin/input/wc.txt\
--outputhdfs:///user/admin/output2\
二.Wordcount案例
1.Scala代码
package com.xyg.streaming
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* Author: Mr.Deng
* Date: 2018/10/15
* Desc:
*/
object SocketWindowWordCountScala {
def main(args: Array[String]) : Unit = {
// 定义一个数据类型保存单词出现的次数
case class WordWithCount(word: String, count: Long)
// port 表示需要连接的端口
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
return
}
}
// 获取运行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 连接此socket获取输入数据
val text = env.socketTextStream("node21", port, '\n')
//需要加上这一行隐式转换 否则在调用flatmap方法的时候会报错
import org.apache.flink.api.scala._
// 解析数据, 分组, 窗口化, 并且聚合求SUM
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// 打印输出并设置使用一个并行度
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
}
2.Java代码
package com.xyg.streaming;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Author: Mr.Deng
* Date: 2018/10/15
* Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来
* 先在node21机器上执行nc -l 9000
*/
public class StreamingWindowWordCountJava {
public static void main(String[] args) throws Exception {
//定义socket的端口号
int port;
try{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("没有指定port参数,使用默认值9000");
port = 9000;
}
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入的数据
DataStreamSource<String> text = env.socketTextStream("node21", port, "\n");
//计算数据
DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
String[] splits = value.split("\\s");
for (String word:splits) {
out.collect(new WordWithCount(word,1L));
}
}
})//打平操作,把每行的单词转为<word,count>类型的数据
//针对相同的word数据进行分组
.keyBy("word")
//指定计算数据的窗口大小和滑动窗口大小
.timeWindow(Time.seconds(2),Time.seconds(1))
.sum("count");
//把数据打印到控制台,使用一个并行度
windowCount.print().setParallelism(1);
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
env.execute("streaming word count");
}
/**
* 主要为了存储单词以及单词出现的次数
*/
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
3.运行测试
首先,使用nc命令启动一个本地监听,命令是:
[admin@node21 ~]$ nc -l 9000
通过netstat命令观察9000端口。netstat -anlp | grep 9000,启动监听
如果报错:-bash: nc: command not found,请先安装nc,在线安装命令:yum -y install nc
。
然后,IDEA上运行flink官方案例程序
node21上输入
IDEA控制台输出如下
4.集群测试
这里单机测试官方案例
[admin@node21 flink-1.6.1]$ pwd
/opt/flink-1.6.1
[admin@node21 flink-1.6.1]$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node21.
Starting taskexecutor daemon on host node21.
[admin@node21 flink-1.6.1]$ jps
StandaloneSessionClusterEntrypoint
TaskManagerRunner
Jps
[admin@node21 flink-1.6.1]$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
程序连接到套接字并等待输入。您可以检查Web界面以验证作业是否按预期运行:
单词在5秒的时间窗口(处理时间,翻滚窗口)中计算并打印到stdout
。监视TaskManager的输出文件并写入一些文本nc
(输入在点击后逐行发送到Flink):
三.使用IDEA开发离线程序
Dataset是flink的常用程序,数据集通过source进行初始化,例如读取文件或者序列化集合,然后通过transformation(filtering、mapping、joining、grouping)将数据集转成,然后通过sink进行存储,既可以写入hdfs这种分布式文件系统,也可以打印控制台,flink可以有很多种运行方式,如local、flink集群、yarn等.
1. scala程序
package com.xyg.batch
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
/**
* Author: Mr.Deng
* Date: 2018/10/19
* Desc:
*/
object WordCountScala{
def main(args: Array[String]) {
//初始化环境
val env = ExecutionEnvironment.getExecutionEnvironment
//从字符串中加载数据
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
//分割字符串、汇总tuple、按照key进行分组、统计分组后word个数
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
//打印
counts.print()
}
}
2. java程序
package com.xyg.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Author: Mr.Deng
* Date: 2018/10/19
* Desc:
*/
public class WordCountJava {
public static void main(String[] args) throws Exception {
//构建环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//通过字符串构建数据集
DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
//分割字符串、按照key进行分组、统计相同的key个数
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
//打印
wordCounts.print();
}
//分割字符串的方法
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
3.运行
来源:https://www.cnblogs.com/frankdeng/p/9760015.html
![](https://www.aspxhome.com/images/zang.png)
![](https://www.aspxhome.com/images/jiucuo.png)
猜你喜欢
- 本文实例讲述了C#获取字符串后几位数的方法。分享给大家供大家参考。具体实现方法如下:#region 获取后几位数 public string
- 上一篇JavaMail入门第三篇 发送邮件中,我们学会了如何用JavaMail API提供的Transport类发送邮件,同样,JavaMa
- 问题现象:HTTP Status 403-Invalid CSRF Token 'null' was found on th
- android 实现拨打电话的app,代码非常简单,功能也很实用,分享给大家。MainActivity.javapackage com.bb
- 在需要线程同步的时候如何选择合适的线程锁?例:选择可以存入到常量池当中的对象,String对象等public class SyncTest{
- 插件安装方式:新版本IDE安装方式略有不同,不一一赘述 1、Background Image Plus
-   考虑到直接讲实现一个类Task库思维有点跳跃,所以本节主要讲解Async/Await的本质作用(解决
- 前言java处理excel转pdf一直没找到什么好用的免费jar包工具,自己手写的难度,恐怕高级程序员花费一年的事件,也不能做出来非常好用,
- 概述Spring针对Java Transaction API (JTA)、JDBC、Hibernate和Java Persistence A
- 一、背景今天心血来潮,准备测试一下项目中 logback 的自动刷新功能,但是测试时发现并不生效。logback 的配置如下:<con
- 一、项目运行环境配置:Jdk1.8 + Tomcat8.5 + Mysql + HBuilderX(Webstorm也行)+ Eclispe
- 在Java 字符终端上获取输入有三种方式:1、java.lang.System.in (目前JDK版本均支持)2、java.util.Sca
- 鼠标事件的事件源往往与容器相关,当鼠标进入容器、离开容器,或者在容器中单击鼠标、拖动鼠标时都会发生鼠标事件。java语言为处理鼠标事件提供两
- Java是面向对象的编程语言,在我们开发Java应用的程序员的专业术语里,Java这个单词其实指的是Java开发工具,也就是JDK(Java
- 1、介绍官网地址:https://www.yuque.com/easyexcel特点:1、Java领域解析、生成Excel比较有名的框架有A
- 当遇到以下场景:其他人写的单元测试影响统计结果一些需要调用外部接口的测试暂不运行需要在非本机环境上运行一些不回滚的单元测试则有必要选择以下方
- 上篇文章中我们介绍了浅谈Spring的两种配置容器,接下来我们就了解下spring中的FactoryBean的相关内容,具体如下。从Sess
- 一、查看线程的运行状态题目线程有以下6种状态:新建、运行、阻塞、等待、计时等待和终止。new新线程时,线程处于新建 状态。调用start()
- Java定义Long数据类型Long lg=10L;只需要在定义的的整型后面加个L;就和定义float数据类型一样Float ft=5.20
- 本文实例讲述了Java实现的双向匹配分词算法。分享给大家供大家参考,具体如下:目前比较流行的几大分词算法有:基于字符串匹配的分词方法、基于理