锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

Flink基础系列2-Flink部署

时间:2022-10-13 19:30:00 2e8起动调整型电阻器zt2

文章目录

  • 一. Standalone模式
    • 1.1 Standalone模式概述
    • 1.2 standalone提交模式任务
      • 1.2.1 Web UI提交Job
      • 1.2.2 命令行提交job
  • 二.yarn模式
    • 2.1 Flink on yarn
      • 2.1.1 Session-Cluster模式
      • 2.1.2 Per-Job-Cluster模式
    • 2.2 Session Cluster
    • 2.3 Per-Job-Cluster
  • 三. Kubernetes部署
  • 参考:

一. Standalone模式

1.1 Standalone模式概述

  1. Flink 中每一个 TaskManager 都是一个JVM在独立的线程上执行一个或多个过程 subtask
  2. 控制一个 TaskManager 能接收多少个? task, TaskManager 通过 task slot 控制(一个 TaskManager 至少有一个 slot)
  3. 每个task slot表示TaskManager具有固定大小的子集资源。TaskManager有三个slot,然后它将其管理的内存分成三部分给每个部分slot(注:这里不涉及CPU的隔离,slot仅用于隔离task受管理内存)
  4. 可调整task slot自定义的数量subtask隔离方式。如一个TaskManager一个slot那么每一个task group独立运行JVM中。而当一个TaskManager多个slot时,多个subtask可以一起享受一个JVM,而在同一个JVM进程中的task将共享TCP连接和心跳消息也可以共享数据集和数据结构,从而减少每个数据task的负载。

image.png

  1. 默认情况下,Flink 允许子任务共享 slot,即使它们是不同任务的子任务(前提是它们来自同一个任务)job)。 结果是,一个 slot 整个管道可以保存。
  2. Task Slot 是静态概念,是指 TaskManager 通过参数具有并发执行能力taskmanager.numberOfTaskSlots配置;并行度parallelism是动态概念,即TaskManager并发能力实际用于操作程序,可通过参数parallelism.default进行配置。
    例如,如果有三个例子:TaskManager,每一个TaskManager中分配了3个TaskSlot,也就是每个TaskManager可以接收3个task,这样,我们总共可以接受9个TaskSot。但如果我们设置的话parallelism.default=1.当程序运行时,9个TaskSlot只有一个操作,8个会处于空闲状态,所以要学会合理设置并行度!具体图解如下:

conf/flink-conf.yaml配置文件中

taskmanager.numberOfTaskSlots parallelism.default 

默认值

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.  taskmanager.numberOfTaskSlots: 1  # The parallelism used for programs that did not specify and other parallelism.  parallelism.default: 1 

注:Flink存储State使用堆外内存,所以web UI里JVM Heap Size和Flink Managed MEM两个分开值。

1.2 standalone模式任务提交

1.2.1 Web UI提交Job

启动Flink后,可以在Web UI的Submit New Job提交jar包,然后指定Job参数。

  1. Entry Class
    程序入口,指定入口类别(类别全限名)

  2. Program Arguments
    例如,程序启动参数–host localhost --port 7777

  3. Parallelism
    设置Job并行度。
    Ps:并行优先级(从上到下递减)
    1)代码中的算子setParallelism()
    2)ExecutionEnvironment env.setMaxParallelism()
    3)设置的Job并行度
    4)集群conf在配置文件中parallelism.default
    ps:socket等特殊的IO操作本身不能平行处理,平行度只能是1

  4. Savepoint Path
    savepoint是通过checkpoint机制为streaming job创建的一致性快照,如数据源offset,状态等。
    (savepoint可以理解为手动备份,checkpoint自动备份)
    ps:提交job注意分配slot如果总数足够,如果总数足够的话slot总数不够,那么job执行失败(资源调度不足)

1.2.2 命令行提交job

  1. 查看已提交的一切job
    若有操作job,这里可以查看具体情况job id
D:\flink\flink-1.9.0\bin>flink list log4j:WARN No appenders could be found for logger (org.apache.flink.client.cli.CliFrontend). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Waiting for response... No running jobs. No scheduled jobs. 
  1. 提交job
    -c指定入口类
    -p指定job的并行度
bin/flink run -c <入口类> -p <并行度>  <启动参数> 

举例:

$ bin/flink run -c wc.StreamWordCount -p 3 /tmp/Flink_Tutorial-1.0-SNAPSHOT.jar --host localhost --port 7777 Job has been submitted with JobID 33a5d1f00688a362837830f0b85fd75e 
  1. 取消job
    bin/flink cancel
$ bin/flink cancel 30d9dda946a170484d55e41358973942 Cancelling job 30d9dda946a170484d55e41358973942. Cancelled job 30d9dda946a170484d55e41358973942. 

注:Total Task Slots只要不小于Job中Parallelism最大值。
eg:我在这里设置文件taskmanager.numberOfTaskSlots: 4,实际Job运行时总Tasks显示9,但具体4个任务步骤分别需要(1、3、3、2)的数量Tasks,4>三、满足最大Parallelism可以成功运行。

二.yarn模式

以Yarn模式部署Flink任务时,要求Flink是有 Hadoop 支持版,Hadoop 环境需要保证版本在 2.2 以上安装在集群中 HDFS 服务。

.1 Flink on yarn

Flink提供了两种在yarn上运行的模式,分别为Session-Cluster和Per-Job-Cluster模式。

2.1.1 Session-Cluster模式

Session-Cluster 模式需要先启动集群,然后再提交作业,接着会向 yarn 申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到 yarn 中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享 Dispatcher 和 ResourceManager;共享资源;适合规模小执行时间短的作业。

在 yarn 中初始化一个 flink 集群,开辟指定的资源,以后提交任务都向这里提交。这个 flink 集群会常驻在 yarn 集群中,除非手工停止。

2.1.2 Per-Job-Cluster模式

一个 Job 会对应一个集群,每提交一个作业会根据自身的情况,都会单独向 yarn 申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享 Dispatcher 和 ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。

每次提交都会创建一个新的 flink 集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

2.2 Session Cluster

  1. 启动hadoop集群(略)

  2. 启动yarn-session
    ./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
    其中:
    1)-n(–container):TaskManager的数量。
    2)-s(–slots):每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
    3)-jm:JobManager的内存(单位MB)。
    4)-tm:每个taskmanager的内存(单位MB)。
    5)-nm:yarn 的appName(现在yarn的ui上的名字)。
    6)-d:后台执行。

  3. 执行任务
    ./flink run -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777

  4. 去 yarn 控制台查看任务状态

  5. 取消 yarn-session
    yarn application --kill application_1577588252906_0001

2.3 Per-Job-Cluster

  1. 启动hadoop集群(略)
  2. 不启动yarn-session,直接执行job
/flink run –m yarn-cluster -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777

三. Kubernetes部署

容器化部署时目前业界很流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetes(k8s),而Flink也在最近的版本中支持了k8s部署模式。

  1. 搭建Kubernetes集群(略)
  2. 配置各组件的yaml文件
    在k8s上构建Flink Session Cluster,需要将Flink集群的组件对应的docker镜像分别在k8s上启动,包括JobManager、TaskManager、JobManagerService三个镜像服务。每个镜像服务都可以从中央镜像仓库中获取。
  3. 启动Flink Session Cluster
// 启动jobmanager-service 服务
kubectl create -f jobmanager-service.yaml
// 启动jobmanager-deployment服务
kubectl create -f jobmanager-deployment.yaml
// 启动taskmanager-deployment服务
kubectl create -f taskmanager-deployment.yaml
  1. 访问Flink UI页面
    集群启动后,就可以通过JobManagerServicers中配置的WebUI端口,用浏览器输入以下url来访问Flink UI页面了:
http://{JobManagerHost:Port}/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy

参考:

  1. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_3-flink%e9%83%a8%e7%bd%b2
锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章