锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

flink流处理

时间:2022-08-14 15:30:00 abs轮速传感器各自的优缺点

-

-

flink_warehouse

com.kaikeba.flink

1.0-SNAPSHOT

4.0.0

flink_study

-

-

cloudera

https://repository.cloudera.com/artifactory/cloudera-repos/

-

-

org.apache.flink

flink-streaming-scala_2.11

1.8.1

-

org.apache.flink

flink-scala_2.11

1.8.1

-

org.apache.hadoop

hadoop-client

2.6.0-mr1-cdh5.14.2

-

org.apache.hadoop

hadoop-common

2.6.0-cdh5.14.2

-

org.apache.hadoop

hadoop-hdfs

2.6.0-cdh5.14.2

-

org.apache.hadoop

hadoop-mapreduce-client-core

2.6.0-cdh5.14.2

-

-

-

org.apache.maven.plugins

maven-compiler-plugin

3.0

-

1.8

1.8

UTF-8

-

net.alchim31.maven

scala-maven-plugin

3.2.2

-

-

-

compile

testCompile

-

maven-assembly-plugin

-

-

jar-with-dependencies

-

-

-

-

make-assembly

package

-

single

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.core.fs.FileSystem.WriteMode

object BatchOperate {
///实现单词计数统计
def main(args: Array[String]): Unit = {
//获取程序入口类
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

////需要导入隐式转换的包 import org.apache.flink.api.scala._  ///读取数据 val sourceDatas: DataSet[String] = environment.readTextFile("D:\\开课吧课程资料\\Flink实时数仓\\数据\\datas\\count.txt","UTF-8") ///实现单词计数统计 val resultCount: AggregateDataSet[(String, Int)] = sourceDatas.flatMap(x => x.split(" "))   .map(x => (x, 1)) ///记录每个单词1   .groupBy(0) ///根据下表分组   .sum(1)///统计我们的单词 

// resultCount.print()
resultCount.writeAsText(“D:\开课吧课程资料\Flink实时数仓\数据\datas\count_result.txt”,WriteMode.OVERWRITE)
environment.execute() //提交任务
}
}

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

/**

  • 通过flink的程序,从socket接收数据,然后统计单词数据
    */
    object StreamSocket {
    def main(args: Array[String]): Unit = {
    ///如果需要实现流式计算,则需要获得流式计算flink程序入口类 StreamExecutionEnvironment
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //注意:如果需要流式处理,需要引入隐式转换
    import org.apache.flink.api.scala._
    //接受socket里面的数据
    val sockeLine: DataStream[String] = environment.socketTextStream(“node01”,9000)
    ///实现单词计数统计
    val result: DataStream[(String, Int)] = sockeLine.flatMap(x => x.split(" ")) ///按空间切割我们的单词,然后压平
    .map(x => (x, 1)) ///记录每个单词出现的次数1 次
    .keyBy(0) ///将我们的数据按下表0的位置进行分组
    .sum(1)//分组后统计我们的数据
    result.print() //最终通过print触发整个任务执行的方法

    //提交任务
    environment.execute()

}

}

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object FileSource {
/**
* 从hdfs读取文件数据,统计单词出现的次数,并将结果写入hdfs文件里面去
*/
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

import  org.apache.flink.api.scala._ //通过输入路径,在此路径下读取所有文件 val sourceFile: DataStream[String] = environment.readTextFile("hdfs://node01:8020/flink_input") //统计我们的结果 val result: DataStream[(String, Int)] = sourceFile.flatMap(x =>x.split(" ")).map((_,1)).keyBy(0).sum(1)   result.writeAsText("hdfs://node01:8020/out_file").setParallelism(1)  environment.execute() 

}

}

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object MyOwnSource {

///通过自定义数据源接收数据
def main(args: Array[String]): Unit = {

val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ////导入隐式转换的包 import  org.apache.flink.api.scala._ val sourceStream: DataStream[String] = environment.addSource(new MySource)  val result: DataStream[(String, Int)] = sourceSteam.flatMap(x =>x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1)
result.print()
environment.execute("myOwnSource")

}
}

class MySource extends ParallelSourceFunction[String]{
//定义全局的变量
var isRunning:Boolean = true

/**
* 最核心的方法,这个方法主要是用于获取数据
* @param sourceContext
*/
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
//表示程序一直在运行
while(isRunning){
//通过soruceContext调用collect来进行发送数据
sourceContext.collect(“hello spark”)
Thread.sleep(1000)
}
}

/**
* 判断如果flink的程序停止了,那么就不用再继续发送数据了
*/
override def cancel(): Unit = {
isRunning=false

}
}

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time

/**

  • 统计最近5S钟出现的单词的次数
    */
    object SocektSource {

def main(args: Array[String]): Unit = {
//第一步:获取程序入口类
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

//导入隐式转换的包
import org.apache.flink.api.scala._

//第二步:读取socket的数据
val socketStream: DataStream[String] = environment.socketTextStream("node01",9000)


//第三步:统计最近5S钟出现单词的次数
val result: DataStream[(String, Int)] = socketStream.flatMap(x => x.split(" ")).map(x => (x, 1)).keyBy(0)
  .timeWindow(Time.seconds(5), Time.seconds(5))
  .sum(1)
result.print().setParallelism(1)  //设置打印的并行度为1

//第四步:提交任务
environment.execute("socketStream")

}

}
Flink实战
1、Flink基本介绍
1、Flink介绍
Flink起源于一个名为Stratosphere的研究项目,目的是建立下一代大数据分析平台,于2014年4月16日成为Apache孵化器项目。
Apache Flink是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,它基于同一个Flink流式执行模型(streaming execution model),能够支持流处理和批处理两种应用类型。由于流处理和批处理所提供的SLA(服务等级协议)是完全不相同, 流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。比较典型的有:实现批处理的开源方案有MapReduce、Spark;实现流处理的开源方案有Storm;Spark的Streaming 其实本质上也是微批处理。
Flink在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。
2、Flink特性
1:有状态计算的Exactly-once语义。状态是指flink能够维护数据在时序上的聚类和聚合,同时它的checkpoint机制
2:支持带有事件时间(event time)语义的流处理和窗口处理。事件时间的语义使流计算的结果更加精确,尤其在事件到达无序或者延迟的情况下。
3:支持高度灵活的窗口(window)操作。支持基于time、count、session,以及data-driven的窗口操作,能很好的对现实环境中的创建的数据进行建模。
4:轻量的容错处理( fault tolerance)。 它使得系统既能保持高的吞吐率又能保证exactly-once的一致性。通过轻量的state snapshots实现
5:支持高吞吐、低延迟、高性能的流处理
6:支持savepoints 机制(一般手动触发)。即可以将应用的运行状态保存下来;在升级应用或者处理历史数据是能够做到无状态丢失和最小停机时间。
7:支持大规模的集群模式,支持yarn、Mesos。可运行在成千上万的节点上
8:支持具有Backpressure功能的持续流模型
9:Flink在JVM内部实现了自己的内存管理
10:支持迭代计算
11:支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果进行缓存

3、Flink功能模块

 Deployment层: 该层主要涉及了Flink的部署模式,Flink支持多种部署模式:本地、集群(Standalone/YARN),(GCE/EC2)。
 Runtime层:Runtime层提供了支持Flink计算的全部核心实现,比如:支持分布式Stream处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务。
 API层: 主要实现了面向无界Stream的流处理和面向Batch的批处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API。
 Libraries层:该层也可以称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)

4、Flink编程模型

 有状态的数据流处理层。最底层的抽象仅仅提供有状态的数据流,它通过处理函数(Process Function)嵌入到数据流api(DataStream API). 用户可以通过它自由的处理单流或者多流,并保持一致性和容错。同时用户可以注册事件时间和处理时间的回调处理,以实现复杂的计算逻辑。
 核心API层。 它提供了数据处理的基础模块,像各种transformation, join,aggregations,windows,stat 以及数据类型等等
 Table API层。 定了围绕关系表的DSL(领域描述语言)。Table API遵循了关系模型的标准:Table类型关系型数据库中的表,API也提供了相应的操作,像select,project,join,group-by,aggregate等。Table API声明式的定义了逻辑上的操作(logical operation)不是code for the operation;Flink会对Table API逻辑在执行前进行优化。同时代码上,Flink允许混合使用Table API和DataStram/DataSet API
 SQL层。 它很类似Table API的语法和表达,也是定义与Table API层次之上的,但是提供的是纯SQL的查询表达式。
2、Flink重新编译
由于实际生产环境当中,我们一般都是使用基于CDH的大数据软件组件,因此我们Flink也会选择基于CDH的软件组件,但是由于CDH版本的软件并没有对应的Flink这个软件安装包,所以我们可以对开源的Flink进行重新编译,然后用于适配我们对应的CDH版本的hadoop
第一步:准备工作
安装maven3版本及以上:省略
安装jdk1.8:省略
第二步:下载flink源码包
cd /kkb/soft
wget http://archive.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-src.tgz
tar -zxf flink-1.8.1-src.tgz -C /kkb/install/
cd /kkb/install/flink-1.8.1/
mvn -T2C clean install -DskipTests -Dfast -Pinclude-hadoop -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.14.2

编译成功之后的文件夹目录位于
/kkb/install/flink-1.8.1/flink-dist/target

3、Flink架构模型图


Client:Flink 作业在哪台机器上面提交,那么当前机器称之为Client。用户开发的Program 代码,它会构建出DataFlow graph,然后通过Client提交给JobManager。
JobManager:是主(master)节点,相当于YARN里面的ResourceManager,生成环境中一般可以做HA 高可用。JobManager会将任务进行拆分,调度到TaskManager上面执行。
TaskManager:是从节点(slave),TaskManager才是真正实现task的部分。
Client提交作业到JobManager,就需要跟JobManager进行通信,它使用Akka框架或者库进行通信,另外Client与JobManager进行数据交互,使用的是Netty框架。Akka通信基于Actor System,Client可以向JobManager发送指令,比如Submit job或者Cancel /update job。JobManager也可以反馈信息给Client,比如status updates,Statistics和results。
Client提交给JobManager的是一个Job,然后JobManager将Job拆分成task,提交给TaskManager(worker)。JobManager与TaskManager也是基于Akka进行通信,JobManager发送指令,比如Deploy/Stop/Cancel Tasks或者触发Checkpoint,反过来TaskManager也会跟JobManager通信返回Task Status,Heartbeat(心跳),Statistics等。另外TaskManager之间的数据通过网络进行传输,比如Data Stream做一些算子的操作,数据往往需要在TaskManager之间做数据传输。
当Flink系统启动时,首先启动JobManager和一至多个TaskManager。JobManager负责协调Flink系统,TaskManager则是执行并行程序的worker。当系统以本地形式启动时,一个JobManager和一个TaskManager会启动在同一个JVM中。当一个程序被提交后,系统会创建一个Client来进行预处理,将程序转变成一个并行数据流的形式,交给JobManager和TaskManager执行。

4、Flink部署运行模式
类似于spark一样,flink也有各种运行模式,其中flink主要支持三大运行模式
第一种运行模式:local模式,适用于测试调试
Flink 可以运行在 Linux、Mac OS X 和 Windows 上。本地模式的安装唯一需要的只是Java 1.7.x或更高版本,本地运行会启动Single JVM,主要用于测试调试代码。一台服务器即可运行
第二种运行模式:standAlone模式,适用于flink自主管理资源
Flink自带了集群模式Standalone,主要是将资源调度管理交给flink集群自己来处理,standAlone是一种集群模式,可以有一个或者多个主节点JobManager(HA模式),用于资源管理调度,任务管理,任务划分等工作,多个从节点taskManager,主要用于执行JobManager分解出来的任务
第三种模式:flink on yarn模式,适用于使用yarn来统一调度管理资源

Flink ON YARN工作流程如下所示:
首先提交job给YARN,就需要有一个Flink YARN Client。
第一步:Client将Flink 应用jar包和配置文件上传到HDFS。
第二步:Client向ResourceManager注册resources和请求APPMaster Container。
第三步:REsourceManager就会给某一个Worker节点分配一个Container来启动APPMaster,JobManager会在APPMaster中启动。
第四步:APPMaster为Flink的TaskManagers分配容器并启动TaskManager,TaskManager内部会划分很多个Slot,它会自动从HDFS下载jar文件和修改后的配置,然后运行相应的task。TaskManager也会与APPMaster中的JobManager进行交互,维持心跳等。
Flink的支持以上这三种部署模式,一般在学习研究环节,资源不充足的情况下,采用Local模式就行,生产环境中Flink ON YARN比较常见。

5、flink的部署安装
部署安装准备工作:关闭防火墙,关闭selinux,安装jdk,更改主机名,更改主机名与IP地址的映射关系,ssh免密码登录等
1、Flink的local模式部署安装
在local模式下,不需要启动任何的进程,仅仅是使用本地线程来模拟flink的进程,适用于测试开发调试等,这种模式下,不用更改任何配置,只需要保证jdk8安装正常即可
第一步:上传安装包并解压
将我们编译之后的压缩包,上传到node01服务器的/kkb/soft路径下,然后进行解压
cd /kkb/soft/
tar -zxf flink-1.8.1.tar.gz -C /kkb/install/
第二步:直接使用脚本启动
flink在处于local模式下,不需要更改任何配置,直接解压之后启动即可
执行以下命令直接启动local模式
cd /kkb/install/flink-1.8.1
bin/start-cluster.sh
启动成功之后,执行jps就能查看到启动了两个进程
18180 StandaloneSessionClusterEntrypoint
18614 TaskManagerRunner
第三步:webUI界面访问
启动两个进程成功之后,访问8081端口号即可访问到flink的web管理界面
http://node01:8081/#/overview
第四步:运行flink自带的测试
node01使用linux的nc命令来向socket当中发送一些单词
sudo yum -y install nc
nc -lk 9000
node01启动flink的自带的单词统计程序,接受输入的socket数据并进行统计
cd /kkb/install/flink-1.8.1
bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname localhost --port 9000
查看统计结果:
flink自带的测试用例统计结果在log文件夹下面
node01执行以下命令查看统计结果
cd /kkb/install/flink-1.8.1/log
tail -200f flink-hadoop-taskexecutor-0-node01.kaikeba.com.out
local模式运行成功之后,关闭local模式,我们接下来运行standAlone模式
cd /kkb/install/flink-1.8.1
bin/stop-cluster.sh
2、Flink的standAlone模式环境安装
使用standalone模式,需要启动flink的主节点JobManager以及从节点taskManager
服务以及ip 192.168.52.100 192.168.52.110 192.168.52.120
JobManager 是 否 否
TaskManager 是 是 是
第一步:更改配置文件
停止node01服务器上面local模式下的两个进程,然后修改node01服务器配置文件
node01服务器更改flink-conf.yaml配置文件文件
node01服务器执行以下命令更改flink配置文件
cd /kkb/install/flink-1.8.1/conf/
vim flink-conf.yaml
更改这个配置,指定jobmanager所在的服务器为node01
jobmanager.rpc.address: node01
node01服务器更改slaves配置文件
node01执行以下命令更改从节点slaves配置文件
cd /kkb/install/flink-1.8.1/conf
vim slaves

node01
node02
node03
第二步:安装包分发
将node01服务器的flink安装包分发到其他机器上面去
node01服务器执行以下命令分发安装包
cd /kkb/install
scp -r flink-1.8.1/ node02: P W D s c p − r f l i n k − 1.8.1 / n o d e 03 : PWD scp -r flink-1.8.1/ node03: PWDscprflink1.8.1/node03:PWD
第三步:启动flink集群
node01执行以下命令启动flink集群
cd /kkb/install/flink-1.8.1
bin/start-cluster.sh
第四步:页面访问
http://node01:8081/#/overview
第五步:运行flink自带的测试用例
node01执行以下命令启动socket服务,输入单词
nc -lk 9000
node01启动flink的自带的单词统计程序,接受输入的socket数据并进行统计
cd /kkb/install/flink-1.8.1
bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname node01 --port 9000
node01服务器执行以下命令查看统计结果
cd /kkb/install/flink-1.8.1/log
tail -200f flink-hadoop-taskexecutor-0-node01.kaikeba.com.out
3、Flink的standAlone模式的HA环境
在上一节当中,我们实现了flink的standAlone模式的环境安装,并且能够正常提交任务到集群上面去,我们的主节点是jobManager,但是唯一的问题是jobmanager是单节点的,必然会有单节点故障问题的产生,所以我们也可以在standAlone模式下,借助于zk,将我们的jobManager实现成为高可用的模式
首先停止Flink的standAlone模式,并启动zk和hadoop集群服务
第一步:修改配置文件
node01执行以下命令修改Flink的配置文件
node01修改flink-conf.yaml配置文件
cd /kkb/install/flink-1.8.1/conf
vim flink-conf.yaml

jobmanager.rpc.address: node01
high-availability: zookeeper
high-availability.storageDir: hdfs://node01:8020/flink
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:218
node01修改masters配置文件
node01执行以下命令修改master配置文件
cd /kkb/install/flink-1.8.1/conf
vim masters

node01:8081
node02:8081
node01修改slaves配置文件
node01执行以下命令修改slaves配置文件
cd /kkb/install/flink-1.8.1/conf
vim slaves

node01
node02
node03
第二步:hdfs上面创建flink对应的文件夹
node01执行以下命令,在hdfs上面创建文件夹
hdfs dfs -mkdir -p /flink
第三步:拷贝配置文件
将node01服务器修改后的配置文件拷贝到其他服务器上面去
node01执行以下命令拷贝配置文件
cd /kkb/install/flink-1.8.1/conf
scp flink-conf.yaml masters slaves node02: P W D s c p f l i n k − c o n f . y a m l m a s t e r s s l a v e s n o d e 03 : PWD scp flink-conf.yaml masters slaves node03: PWDscpflinkconf.yamlmastersslavesnode03:PWD
第四步:启动flink集群
node01执行以下命令启动flink集群
cd /kkb/install/flink-1.8.1
bin/start-cluster.sh
第五步:页面访问
访问node01服务器的web界面
http://node01:8081/#/overview
访问node02服务器的web界面
http://node02:8081/#/overview
注意:一旦访问node02的web界面,会发现我们的web界面会自动跳转到node01的web界面上,因为此时,我们的node01服务器才是真正的active状态的节点
第六步:模拟故障宕机实现自动切换
将node01服务器的jobManager进程杀死,然后过一段时间之后查看node02的jobManager是否能够访问
注意: JobManager发生切换时,TaskManager也会跟着发生重启,这其实是一个隐患问题
第七步:flink的standAlone模式在HA下提交任务
在HA这种模式下,提交任务与standAlone单节点模式提交任务是一样的,即使JobManager服务器宕机了也没有关系,会自动进行切换
node01执行以下命令启动socket服务,输入单词
nc -lk 9000
node01启动flink的自带的单词统计程序,接受输入的socket数据并进行统计
cd /kkb/install/flink-1.8.1
bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname node01 --port 9000
node01服务器执行以下命令查看统计结果
cd /kkb/install/flink-1.8.1/log
tail -200f flink-hadoop-taskexecutor-0-node01.kaikeba.com.out

4、flink on yarn模式
flink的任务也可以运行在yarn上面,将flnk的任务提交到yarn平台,通过yarn平台来实现我们的任务统一的资源调度管理,方便我们管理集群当中的CPU和内存等资源
依赖环境说明:
至少hadoop2.2版本及以上
hdfs以及yarn服务正常启动
flink on yarn又分为两种模式:

1、第一种模式:单个yarn session模式
这种方式需要先启动集群,然后在提交作业,接着会向yarn申请一块资源空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,那下一个作业才会正常提交,实际工作当中一般不会使用这种模式
这种模式,不需要做任何配置,直接将任务提价到yarn集群上面去,我们需要提前启动hdfs以及yarn集群即可
启动单个Yarn Session模式
第一步:修改yarn-site.xml配置为文件
node01执行以下命令修改yarn-site.xml,添加以下配置属性
cd /kkb/install/hadoop-2.6.0-cdh5.14.2/etc/hadoop
vim yarn-site.xml

yarn.resourcemanager.am.max-attempts 4 The maximum number of application master execution attempts. 然后将修改后的配置文件拷贝到node02与node03服务器 node01执行以下命令进行拷贝配置文件 cd /kkb/install/hadoop-2.6.0-cdh5.14.2/etc/hadoop scp yarn-site.xml node02:$PWD scp yarn-site.xml node03:$PWD 然后重新启动yarn集群即可

第二步:修改flink配置文件
node01执行以下命令更改flink配置文件
cd /kkb/install/flink-1.8.1/conf
vim flink-conf.yaml

high-availability: zookeeper
high-availability.storageDir: hdfs://node01:8020/flink_yarn_ha
high-availability.zookeeper.path.root: /flink-yarn
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181
yarn.application-attempts: 10

hdfs上面创建文件夹
node01执行以下命令创建hdfs文件夹
hdfs dfs -mkdir -p /flink_yarn_ha
第三步:在yarn当中启动flink集群
直接在node01执行以下命令,在yarn当中启动一个全新的flink集群,可以直接使用yarn-session.sh这个脚本来进行启动
cd /kkb/install/flink-1.8.1/
bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]
我们也可以使用 --help 来查看更多参数设置
bin/yarn-session.sh –help

Usage:
Required
-n,–container Number of YARN container to allocate (=Number of Task Managers)
Optional
-D use value for given property
-d,–detached If present, runs the job in detached mode
-h,–help Help for the Yarn session CLI.
-id,–applicationId Attach to running YARN session
-j,–jar Path to Flink jar file
-jm,–jobManagerMemory Memory for JobManager Container with optional unit (default: MB)
-m,–jobmanager Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
-n,–container Number of YARN container to allocate (=Number of Task Managers)
-nl,–nodeLabel Specify YARN node label for the YARN application
-nm,–name Set a custom name for the application on YARN
-q,–query Display available YARN resources (memory, cores)
-qu,–queue Specify YARN queue.
-s,–slots Number of slots per TaskManager
-sae,–shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
as typing Ctrl + C.
-st,–streaming Start Flink in streaming mode
-t,–ship Ship files in the specified directory (t for transfer)
-tm,–taskManagerMemory Memory per TaskManager Container with optional unit (default: MB)
-yd,–yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,–zookeeperNamespace Namespace to create the Zookeeper sub-paths for high availability mode
注意:如果在启动的时候,yarn的内存太小,可能会报以下错误
Diagnostics: Container [] is running beyond virtual memory limits. Current usage: 250.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing containerpid=6386,containerID=container_1521277661809_0006_01_000001
我们需要修改yarn-site.xml添加以下配置,然后重启yarn即可

yarn.nodemanager.vmem-check-enabled
false

第二步:查看yarn管理界面8088
访问yarn的8088管理界面,发现yarn当中有一个应用
http://node01:8088/cluster
yarn当中会存在一个常驻的application,就是为我们flink单独启动的一个session
第三步:提交任务
使用flink自带的jar包,实现单词计数统计功能
node01准备文件并上传hdfs
cd /kkb
vim wordcount.txt
内容如下
hello world
flink hadoop
hive spark

hdfs上面创建文件夹并上传文件
hdfs dfs -mkdir -p /flink_input
hdfs dfs -put wordcount.txt /flink_input

node01执行以下命令,提交任务到flink集群
cd /kkb/install/flink-1.8.1
bin/flink run ./examples/batch/WordCount.jar -input hdfs://node01:8020/flink_input -output hdfs://node01:8020/flink_output/wordcount-result.txt

第四步:验证Yarn Session的高可用
通过node01:8088这个界面,查看yarn session启动在哪一台机器上,然后杀死yarn session进程,我们会发现yarn session会重新启动在另外一台机器上面
找到YarnSessionClusterEntrypoint所在的服务器,然后杀死该进程
[hadoop@node02 ~]$ jps
10065 QuorumPeerMain
10547 YarnSessionClusterEntrypoint
10134 DataNode
10234 NodeManager
10652 Jps
[hadoop@node02 ~]$ kill -9 10547
杀死YarnSessionClusterEntrypoint进程之后,会发现,yarn集群会重新启动一个YarnSessionClusterEntrypoint进程在其他机器上面

2、第二种模式:多个yarn session模式
这种方式的好处是一个任务会对应一个job,即每提交一个作业会根据自身的情况,向yarn申请资源,直到作业执行完成,并不会影响下一个作业的正常运行,除非是yarn上面没有任何资源的情况下。
注意:client端必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败
不需要在yarn当中启动任何集群,直接提交任务即可
第一步:直接执行命令提交任务
cd /kkb/install/flink-1.8.1/
bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar -input hdfs://node01:8020/flink_input -output hdfs://node01:8020/out_result/out_count.txt
第二步:查看输出结果
hdfs执行以下命令查看输出结果
hdfs dfs -text hdfs://node01:8020/out_result/out_count.txt
第三步:查看flink run帮助文档
我们可以使用–help 来查看帮助文档可以添加哪些参数
cd /kkb/install/flink-1.8.1/
bin/flink run --help
得到结果内容如下
Action “run” compiles and runs a program.

Syntax: run [OPTIONS]
“run” action options:
-c,–class Class with the program entry point
(“main” method or “getPlan()” method.
Only needed if the JAR file does not
specify the class in its manifest.
-C,–classpath Adds a URL to each user code
classloader on all nodes in the
cluster. The paths must specify a
protocol (e.g. file://) and be
accessible on all nodes (e.g. by means
of a NFS share). You can use this
option multiple times for specifying
more than one URL. The protocol must
be supported by the {@link
java.net.URLClassLoader}.
-d,–detached If present, runs the job in detached
mode
-n,–allowNonRestoredState Allow to skip savepoint state that
cannot be restored. You need to allow
this if you removed an operator from
your program that was part of the
program when the savepoint was
triggered.
-p,–parallelism The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
-q,–sysoutLogging If present, suppress logging output to
standard out.
-s,–fromSavepoint Path to a savepoint to restore the job
from (for example
hdfs:///flink/savepoint-1537).
-sae,–shutdownOnAttachedExit If the job is submitted in attached
mode, perform a best-effort cluster
shutdown when the CLI is terminated
abruptly, e.g., in response to a user
interrupt, such as typing Ctrl + C.
Options for yarn-cluster mode:
-d,–detached If present, runs the job in detached
mode
-m,–jobmanager Address of the JobManager (master) to
which to connect. Use this flag to
connect to a different JobManager than
the one specified in the
configuration.
-sae,–shutdownOnAttachedExit If the job is submitted in attached
mode, perform a best-effort cluster
shutdown when the CLI is terminated
abruptly, e.g., in response to a user
interrupt, such as typing Ctrl + C.
-yD use value for given property
-yd,–yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,–yarnhelp Help for the Yarn session CLI.
-yid,–yarnapplicationId Attach to running YARN session
-yj,–yarnjar Path to Flink jar file
-yjm,–yarnjobManagerMemory Memory for JobManager Container with
optional unit (default: MB)
-yn,–yarncontainer Number of YARN container to allocate
(=Number of Task Managers)
-ynl,–yarnnodeLabel Specify YARN node label for the YARN
application
-ynm,–yarnname Set a custom name for the application
on YARN
-yq,–yarnquery Display available YARN resources
(memory, cores)
-yqu,–yarnqueue Specify YARN queue.
-ys,–yarnslots Number of slots per TaskManager
-yst,–yarnstreaming Start Flink in streaming mode
-yt,–yarnship Ship files in the specified directory
(t for transfer)
-ytm,–yarntaskManagerMemory Memory per TaskManager Container with
optional unit (default: MB)
-yz,–yarnzookeeperNamespace Namespace to create the Zookeeper
sub-paths for high availability mode
-z,–zookeeperNamespace Namespace to create the Zookeeper
sub-paths for high availability mode

Options for default mode:
-m,–jobmanager Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,–zookeeperNamespace Namespace to create the Zookeeper sub-paths
for high availability mode
3、flink run脚本分析
我们提交flink任务的时候,可以加以下这些参数
1、默认查找当前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】:
bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
2、连接指定host和port的jobmanager:
bin/flink run -m node01:8081 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
3、启动一个新的yarn-session:
bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
注意:yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀
例如:bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

6、Flink编程入门案例

实时处理代码开发
开发flink代码,实现统计socket当中的单词数量
第一步:创建maven工程,导入jar包



org.apache.flink
flink-streaming-scala_2.11
1.8.1


    org.apache.flink
    flink-scala_2.11
    1.8.1

org.apache.maven.plugins maven-compiler-plugin 3.0 1.8 1.8 UTF-8 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile maven-assembly-plugin jar-with-dependencies make-assembly package single

第二步:开发flink代码统计socket当中的单词数量
开发flink代码实现接受socket单词数据,然后对数据进行统计
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time

case class CountWord(word:String,count:Long)

object FlinkCount {

def main(args: Array[String]): Unit = {
//获取程序入口类
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//从socket当中获取数据
val result: DataStream[String] = environment.socketTextStream(“node01”,9000)
//导入隐式转换的包,否则时间不能使用
import org.apache.flink.api.scala._
//将数据进行切割,封装到样例类当中,然后进行统计
val resultValue: DataStream[CountWord] = result
.flatMap(x => x.split(" "))
.map(x => CountWord(x,1))
.keyBy(“word”)
// .timeWindow(Time.seconds(1),Time.milliseconds(1)) 按照每秒钟时间窗口,以及每秒钟滑动间隔来进行数据统计
.sum(“count”)
//打印最终输出结果
resultValue.print().setParallelism(1)
//启动服务
environment.execute()
}
}

第三步:打包上传到服务器运行
将我们的程序打包,然后上传到服务器进行运行,将我们打包好的程序上传到node01服务器,然后体验在各种模式下进行运行我们的程序
1、standAlone模式运行程序
第一步:启动flink集群
node01执行以下命令启动flink集群
cd /kkb/install/flink-1.8.1
bin/start-cluster.sh

第二步:启动node01的socket服务,并提交flink任务
node01执行以下命令启动node01的socket服务
nc -lk 9000
提交任务
将我们打包好的jar包上传到node01服务器的/kkb路径下,然后提交任务,注意,在pom.xml当中需要添加我们的打包插件,然后将任务代码进行打包,且集群已有的代码需要将打包scope设置为provided,在pom.xml将我们关于flink的jar包scope设置为provided

打包,并将我们的jar-with-dependencies的jar包上传到node01服务器的/kkb路径下

node01执行以下命令提交任务
cd /kkb/install/flink-1.8.1/
bin/flink run --class com.kkb.flink.demo1.FlinkCount /kkb/flink_day01-1.0-SNAPSHOT-jar-with-dependencies.jar
第三步:查询运行结果
node01查看运行结果
cd /kkb/install/flink-1.8.1/log
tail -200f flink-hadoop-taskexecutor-1-node01.kaikeba.com.out

注意:结果保存在以.out结尾的文件当中,哪个文件当中有数据,就查看哪个文件即可

离线批量处理代码开发
flink也可以通过批量处理代码来实现批量数据处理
需求:处理附件中的count.txt文件,实现单词计数统计

import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment}

object BatchOperate {
def main(args: Array[String]): Unit = {

val inputPath = "D:\\count.txt"
val outPut = "D:\\data\\result2"

//获取程序入口类ExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile(inputPath)

//引入隐式转换
import org.apache.flink.api.scala._

val value: AggregateDataSet[(String, Int)] = text.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1)
value.writeAsText("d:\\datas\\result.txt").setParallelism(1)

env.execute("batch word count")

}

}

7、Flink的shell命令行代码调试
为了方便我们的开发调试,Flink支持通过shell命令行的方式来对我们的代码进行开发运行,类似于Spark的shell命令行对代码的调试是一样的,可以方便的对我们的代码执行结果进行跟踪调试,查验代码的问题所在
Flink shell方式支持流处理和批处理。当启动shell命令行之后,两个不同的ExecutionEnvironments会被自动创建。使用senv(Stream)和benv(Batch)分别去处理流处理和批处理程序。(类似于spark-shell中sc变量)
批量处理代码调试
第一步:进入flink的scala-shell
node01执行以下命令进入scala-shell
cd /kkb/install/flink-1.8.1/
bin/start-scala-shell.sh local
或者我们也可以启动flink的集群,然后进入flink的shell客户端,将任务提交到flink集群上面去
cd /kkb/install/flink-1.8.1/
bin/start-scala-shell.sh remote node01 8081

第二步:使用benv变量执行批量处理
在scala-shell下,使用批处理来调试代码
val line =benv.fromElements(“hello world”,“spark flink”)
line.flatMap(x => x.split(" “)).map(x =>(x,1)).groupBy(0).sum(1).print
实时处理代码调试
通过senv变量实现代码
第一步:node01启动nc -lk 服务端
node01执行以下命令启动服务端
[hadoop@node01 ~]$ nc -lk 9000
第二步:进入scala-shell客户端
node01执行以下命令进入scala-shell
cd /kkb/install/flink-1.8.1/
bin/start-scala-shell.sh local
第三步:使用senv来统计单词出现次数
node01使用senv变量来实时统计单词出现的次数
senv.socketTextStream(“node01”,9000).flatMap(x => x.split(” ")).map(x =>(x,1)).keyBy(0).sum(1).print
senv.execute
第四步:node01发送单词
node01服务器发送单词

8、Flink实时处理之DataStream
Flink的API概览

1、dataStream的数据源
1、socket数据源
从socket当中接收数据,并统计最近5秒钟每个单词出现的次数
第一步:node01开发socket服务
node01执行以下命令开启socket服务
nc -lk 9000
第二步:开发代码实现
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time

object FlinkSource1 {
def main(args: Array[String]): Unit = {
//获取程序入口类
val streamExecution: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val socketText: DataStream[String] = streamExecution.socketTextStream(“node01”,9000)
//注意:必须要添加这一行隐式转行,否则下面的flatmap方法执行会报错
import org.apache.flink.api.scala._
val result: DataStream[(String, Int)] = socketText.flatMap(x => x.split(" "))
.map(x => (x, 1))
.keyBy(0)
.timeWindow(Time.seconds(5), Time.seconds(5)) //统计最近5秒钟的数据
.sum(1)

//打印结果数据
result.print().setParallelism(1)
//执行程序
streamExecution.execute()

}
}

2、文件数据源
读取hdfs路径下面所有的文件数据进行处理
第一步:添加maven依赖


cloudera
https://repository.cloudera.com/artifactory/cloudera-repos/

org.apache.hadoop hadoop-client 2.6.0-mr1-cdh5.14.2 org.apache.hadoop hadoop-common 2.6.0-cdh5.14.2 org.apache.hadoop hadoop-hdfs 2.6.0-cdh5.14.2 org.apache.hadoop hadoop-mapreduce-client-core 2.6.0-cdh5.14.2

第二步:代码实现
object FlinkSource2 {
def main(args: Array[String]): Unit = {
val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//从文本读取数据
val hdfStream: DataStream[String] = executionEnvironment.readTextFile(“hdfs://node01:8020/flink_input/”)
val result: DataStream[(String, Int)] = hdfStream.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1)

result.print().setParallelism(1)

executionEnvironment.execute("hdfsSource")

}
}

3、从一个已经存在的集合当中获取数据
代码实现
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object FlinkSource3 {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val value: DataStream[String] = environment.fromElements[String](“hello world”,“spark flink”)
val result2: DataStream[(String, Int)] = value.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1)
result2.print().setParallelism(1)
environment.execute()
}
}

4、自定义数据源
如果flink自带的一些数据源还不够的工作使用的话,我们还可以自定义数据源
flink提供了大量的已经实现好的source方法,你也可以自定义source
通过实现sourceFunction接口来自定义source,
或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义source。
1、通过ParallelSourceFunction 来实现自定义数据源
如果需要实现一个多并行度的数据源,那么我们可以通过实现ParallelSourceFunction 接口或者继承RichParallelSourceFunction 来自定义有并行度的source。
第一步:使用scala代码实现ParallelSourceFunction接口
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}

class MyParalleSource extends ParallelSourceFunction[String] {
var isRunning:Boolean = true

override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
while (true){
sourceContext.collect(“hello world”)
}
}
override def cancel(): Unit = {
isRunning = false
}
}

第二步:使用自定义数据源
object FlinkSource5 {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val sourceStream: DataStream[String] = environment.addSource(new MyParalleSource)
val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")).map(x => (x, 1))
.keyBy(0)
.sum(1)
result.print().setParallelism(2)
environment.execute(“paralleSource”)
}
}

2、dataStream的算子介绍
官网算子介绍:
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/index.html

flink当中对于实时处理,有很多的算子,我们可以来看看常用的算子主要有哪些,dataStream当中的算子主要分为三大类,
Transformations:转换的算子,都是懒执行的,只有真正碰到sink的算子才会真正加载执行
partition:对数据进行重新分区等操作
Sink:数据下沉目的地

DataStream的Transformations算子
map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
flatmap:输入一个元素,可以返回零个,一个或者多个元素
filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
keyBy:根据指定的key进行分组,相同key的数据会进入同一个分区【典型用法见备注】
reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
aggregations:sum(),min(),max()等
window:在后面单独详解
Union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。
Connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。
CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap
Split:根据规则把一个数据流切分为多个流
Select:和split配合使用,选择切分后的流
案例一:使用union算子来合并多个DataStream
获取两个dataStream,然后使用union将两个dataStream进行合并
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object FlinkUnion {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//获取第一个dataStream
val firstStream: DataStream[String] = environment.fromElements(“hello world”,“test scala”)
//获取第二个dataStream
val secondStream: DataStream[String] = environment.fromElements(“second test”,“spark flink”)
//将两个流进行合并起来
val unionAll: DataStream[String] = firstStream.union(secondStream)
//结果不做任何处理
val unionResult: DataStream[String] = unionAll.map(x => {
// println(x)
x
})
//调用sink算子,打印输出结果
unionResult.print().setParallelism(1)
//开始运行
environment.execute()
}
}
案例二:使用connect实现不同类型的DataStream进行连接
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment}

object FlinkConnect {

def main(args: Array[String]): Unit = {
//获取程序入口类
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//导入隐式转换的包
import org.apache.flink.api.scala._
//定义string类型的dataStream
val strStream: DataStream[String] = environment.fromElements(“hello world”,“abc test”)
//定义int类型的dataStream
val intStream: DataStream[Int] = environment.fromElements(1,2,3,4,5)
//两个流进行connect操作
val connectedStream: ConnectedStreams[String, Int] = strStream.connect(intStream)
//通过map对数据进行处理,传入两个函数
val connectResult: DataStream[Any] = connectedStream.map(x =>{ x + “abc”},y =>{ y * 2 })
connectResult.print().setParallelism(1)
environment.execute(“connect stream”)
}
}

案例三:使用split将一个DataStream切成多个DataStream
import java.{lang, util}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}

object FlinkSplit {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//获取第一个dataStream
val resultDataStream: DataStream[String] = environment.fromElements(“hello world”,“test spark”,“spark flink”)
//通过split来对我们的流进行切分操作
val splitStream: SplitStream[String] = resultDataStream.split(new OutputSelector[String] {
override def select(out: String): lang.Iterable[String] = {
val strings = new util.ArrayListString
if (out.contains(“hello”)) {
//如果包含hello,那么我们就给这个流起名字叫做hello
strings.add(“hello”)
} else {
strings.add(“other”)
}
strings
}
})
//对我么的stream进行选择
val helloStream: DataStream[String] = splitStream.select(“hello”)
//打印包含hello的所有的字符串
helloStream.print().setParallelism(1)
environment.execute()
}
}

DataStream的Partition算子
https://blog.csdn.net/lmalds/article/details/60575205 flink的各种算子介绍

partition算子允许我们对数据进行重新分区,或者解决数据倾斜等问题
Random partitioning:随机分区
•dataStream.shuffle()
Rebalancing:对数据集进行再平衡,重分区,消除数据倾斜
•dataStream.rebalance()
Rescaling:Rescaling是通过执行oepration算子来实现的。由于这种方式仅发生在一个单一的节点,因此没有跨网络的数据传输。
•dataStream.rescale()

Custom partitioning:自定义分区
•自定义分区需要实现Partitioner接口
•dataStream.partitionCustom(partitioner, “someKey”)
•或者dataStream.partitionCustom(partitioner, 0);
Broadcasting:广播变量,后面详细讲解

需求:对我们filter过后的数据进行重新分区
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object FlinkPartition {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

import org.apache.flink.api.scala._
val dataStream: DataStream[String] = environment.fromElements("hello world","test spark","abc hello","hello flink")

val resultStream: DataStream[(String, Int)] = dataStream.filter(x => x.contains("hello"))
  // .shuffle  //随机的重新分发数据,上游的数据,随机的发送到下游的分区里面去
 // .rescale
  .rebalance //对数据重新进行分区,涉及到shuffle的过程
  .flatMap(x => x.split(" "))
  .map(x => (x, 1))
  .keyBy(0)
  .sum(1)

resultStream.print().setParallelism(1)
environment.execute()

}
}
案例实战:自定义分区策略
如果以上的几种分区方式还没法满足我们的需求,我们还可以自定义分区策略来实现数据的分区
需求:自定义分区策略,实现不同分区的数据发送到不同分区里面去进行处理,将包含hello的字符串发送到一个分区里面去,其他的发送到另外一个分区里面去
第一步:自定义分区类
import org.apache.flink.api.common.functions.Partitioner

class MyPartitioner extends Partitioner[String]{
override def partition(word: String, num: Int): Int = {
println(“分区个数为” + num)
if(word.contains(“hello”)){
0
}else{
1
}
}
}

第二步:代码实现进行分区
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object FlinkCustomerPartition {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置我们的分区数,如果不设置,默认使用CPU核数作为分区个数

environment.setParallelism(2)
import  org.apache.flink.api.scala._
//获取dataStream
val sourceStream: DataStream[String] = environment.fromElements("hello world","spark flink","hello world","hive hadoop")
val rePartition: DataStream[String] = sourceStream.partitionCustom(new MyPartitioner,x => x +"")
rePartition.map(x =>{
  println("数据的key为" +  x + "线程为" + Thread.currentThread().getId)
  x
})
rePartition.print()
environment.execute()

}
}

DataStream的sink算子
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/

writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
print() / printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
自定义输出addSink【kafka、redis】
我们可以通过sink算子,将我们的数据发送到指定的地方去,例如kafka或者redis或者hbase等等,前面我们已经使用过将数据打印出来调用print()方法,接下来我们来实现自定义sink将我们的数据发送到redis里面去
第一步:导入flink整合redis的jar包

org.apache.bahir flink-connector-redis_2.11 1.0

第二步:代码开发
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object Stream2Redis {

def main(args: Array[String]): Unit = {
//获取程序入口类
val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

import org.apache.flink.api.scala._
//组织数据
val streamSource: DataStream[String] = executionEnvironment.fromElements("hello world","key value")
//将数据包装成为key,value对形式的tuple
val tupleValue: DataStream[(String, String)] = streamSource.map(x =>(x.split(" ")(0),x.split(" ")(1)))


val builder = new FlinkJedisPoolConfig.Builder

builder.setHost("node03")
builder.setPort(6379)

builder.setTimeout(5000)
builder.setMaxTotal(50)
builder.setMaxIdle(10)
builder.setMinIdle(5)
val config: FlinkJedisPoolConfig = builder.build()
//获取redis  sink
val redisSink = new RedisSink[Tuple2[String,String]](config,new MyRedisMapper)

//使用我们自定义的sink
tupleValue.addSink(redisSink)
//执行程序
executionEnvironment.execute("redisSink")

}
}

class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.SET)

}

override def getKeyFromData(data: (String, String)): String = {
data._1

}

override def getValueFromData(data: (String, String)): String = {
data._2

}
}

9、Flink的window和Time详解
对于流式处理,如果我们需要求取总和,平均值,或者最大值,最小值等,是做不到的,因为数据一直在源源不断的产生,即数据是没有边界的,所以没法求最大值,最小值,平均值等,所以为了一些数值统计的功能,我们必须指定时间段,对某一段时间的数据求取一些数据值是可以做到的。或者对某一些数据求取数据值也是可以做到的
所以,流上的聚合需要由 window 来划定范围,比如 “计算过去的5分钟” ,或者 “最后100个元素的和” 。
window是一种可以把无限数据切割为有限数据块的手段
窗口可以是 时间驱动的 【Time Window】(比如:每30秒)或者 数据驱动的【Count Window】 (比如:每100个元素)。

窗口类型汇总:

1、窗口的基本类型介绍
窗口通常被区分为不同的类型:
tumbling windows:滚动窗口 【没有重叠】
sliding windows:滑动窗口 【有重叠】
session windows:会话窗口 ,一般没人用
tumbling windows类型:没有重叠的窗口

sliding windows:滑动窗口 【有重叠】

2、Flink的窗口介绍
Time Window窗口的应用
time window又分为滚动窗口和滑动窗口,这两种窗口调用方法都是一样的,都是调用timeWindow这个方法,如果传入一个参数就

锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章