
ResourceManager :ResourceManager负责整个集群的资源管理和分配,是一个全局的资源管理系统。NodeManager以心跳的方式向ResourceManager汇报资源使用情况。(目前主要是CPU和内存的使用情况)。RM只接受NM的资源回报信息,对于具体的资源处理则交给NM处理。
NodeManager :NodeManager是每个节点上的资源和任务管理器,它是管理这台机器的代理,负责该节点程序的运行,以及该节点资源的管理和监控。YARN集群每个节点都运行一个NodeManager。NodeManager定时想ResourceManager汇报节点资源(CPU、内存)的使用情况和Container的运行状态。当RecourceManager宕机和NodeManager自动连接RM备用节点。NodeManager接收并处理来自ApplicationMaster的Container启动、停止等各种请求。
ApplicationMaster :负责与RM调度器协商以及获取资源(用Container表示)。将得到的任务进一步分配给内部的任务(资源的二次分配)。与NM通信以启动/停止任务。监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。
步骤1:用户向YRAN提交应用程序,其中包括ApplicationMaster程序,启动ApplicationMatser的命令,用户程序等。
步骤2:ResourceManager为该应用程序分配第一个Container,并与对应的NodeManager通信,要求它在这个Container中启动应用程序的ApplicationMaster。
步骤3:ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,直到运行结束,即重复步骤4-7。
步骤4:ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
步骤5:一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。
步骤6:NodeManager为任务设置好运行环境(包括环境变量、Jar包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
步骤7:各个任务通过某个RPC协议向ApplicationMatser汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。
步骤8 应用程序运行完成后,ApplicationMaster想ResourceManager注销并关闭自己。
Flink的检查点和恢复机制定期的会保存应用程序状态的一致性检查点。在故障的情况下,应用程序的状态将会从最近一次完成的检查点恢复,并继续处理。尽管如此,可以使用检查点来重置应用程序的状态无法完全达到令人满意的一致性保证。相反,source和sink的连接器需要和Flink的检查点和恢复机制进行集成才能提供有意义的一致性保证。
对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确。 一条数据不应该丢失,也不应该重复计算 在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。
Flink的 checkpoint机制和故障恢复机制给Flink内部提供了精确一次的保证,需要注意的是,所谓精确一次并不是说精确到每个event只执行一次,而是每个event对状态(计算结果)的影响只有一次。
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性
不同Source 和Sink的一致性保证
整个端到端的一致性级别取决于所有组件中一致性最弱的组件
Fink的检查点和恢复机制和可以重置读位置的source连接器结合使用,可以保证应用程序不会丢失任何数据。尽管如此,应用程序可能会发出两次计算结果,因为从上一次检查点恢复的应用程序所计算的结果将会被重新发送一次(一些结果已经发送出去了,这时任务故障,然后从上一次检查点恢复,这些结果将被重新计算一次然后发送出去)。所以,可重置读位置的source和Flink的恢复机制不足以提供端到端的恰好处理一次语义,即使应用程序的状态是恰好处理一次一致性级别。
端到端恰好处理一次语义一致性的应用程序需要特殊的sink连接器。sink连接器可以在不同的情况下使用两种技术来达到恰好处理一次一致性语义: 幂等性写入和事务性写入 。
所谓幂等 *** 作,是说一个 *** 作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了
必须保证在从检查点恢复以后,它将会覆盖之前已经写入的结果。
从Flink程序sink到的key-value存储中读取数据的应用,在Flink从检查点恢复的过程中,可能会看到不想看到的结果。当重播开始时,之前已经发出的计算结果可能会被更早的结果所覆盖(因为在恢复过程中)。所以,一个消费Flink程序输出数据的应用,可能会观察到时间回退,例如读到了比之前小的计数。
构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中
事务性的方法将不会遭受幂等性写入所遭受的重播不一致的问题。但是,事务性写入却带来了延迟,因为只有在检查点完成以后,我们才能看到计算结果。
Flink提供了两种构建模块来实现事务性sink连接器:write-ahead-log( WAL ,预写式日志)sink和 两阶段提交sink 。
把结果数据先当成状态保存,然后在收到 checkpoint 完成的通知时,一次性写入 sink 系统
简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么 sink 系统,都能用这种方式一批搞定
DataStream API 提供了一个模板类:GenericWriteAheadSink,来实现这种事务性 sink
对于每个 checkpoint,sink 任务会启动一个事务,并将接下来所有接收的数据添加到事务里
然后将这些数据写入外部 sink 系统,但不提交它们 —— 这时只是“预提交”
当它收到 checkpoint 完成的通知时,它才正式提交事务,实现结果的真正写入
这种方式真正实现了 exactly-once,它需要一个提供事务支持的外部 sink 系统。
Flink 提供了 TwoPhaseCommitSinkFunction 接口。
外部 sink 系统必须提供事务支持,或者 sink 任务必须能够模拟外部系统上的事务
在 checkpoint 的间隔期间里,必须能够开启一个事务并接受数据写入
在收到 checkpoint 完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务(例如超时了),那么未提交的数据就会丢失
sink 任务必须能够在进程失败后恢复事务
提交事务必须是 幂等 *** 作
使用flink+kafka来实现一个端对端一致性保证,source -> transform -> sink
图解Exactly-Once 两阶段提交
Exactly-once 两阶段提交1:
JobManager 协调各个 TaskManager 进行 checkpoint 存储 checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存
Exactly-once 两阶段提交2:
当开启了checkpoint ,JobManager 会将检查点分界线(barrier)注入数据流 barrier会在算子间传递下去
每个算子会对当前的状态做个快照,保存到状态后端
checkpoint 机制可以保证内部的状态一致性
每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里
sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务;
遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务
当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成
sink 任务收到确认通知,正式提交之前的事务,kafka 中未确认数据改为“已确认”
总结 Exactly-once 两阶段提交步骤
在使用kafka011 sink 时注意的点:
1为了保证事务特性,在使用其他程序去消费我们flink sink 数据的kafka时,这个consumer需要设置了 isolationlevel = read_committed ,那么它只会读取已经提交了的消息。
2Checkpoint超时时间 必需大于 kafka 提交事务时间。
假如checkpoint失败时间高于 kafka事务等待时间,比如,设置了一个checkpoint最多等待10分钟,10分钟后会失败这个checkpoint的保存。而kafka 的事务只能等待5分钟,5分钟后把uncommitted的事务关掉。这个时候6分钟checkpoint成功了,但是对应kafka数据的事务已经失败。这样就无法保证Exactly-once的实现
这种模式我们一般是在用IDE调试程序的时候用到,当我们在本地用IDE开发程序的时候,执行main方法,flink会在本地启动一个包含jobmanager和taskmanager的进程的minicluster,程序运行完成之后,这个cluster进程退出。
这种模式就是直接在物理机上启动flink集群。我们可以通过 {FLINK_HOME}/conf/flink-confyaml
此外,我们可以用 ${FLINK_HOME}/bin/taskmanagersh start 再启动一个taskmanager。
这时我们通过jps命令查看一下启动的进程
我们看到这时候启动了两个taskmanager
这种部署模式对flink集群的资源管理是flink自己维护的,在生产环境下用的不多,所以我们也不做过多描述
启动集群的命令如下:
这个命令有很多的参数,可以在后面加 -h 看下,我这里着重介绍一下 -d参数。
加上-d之后,指的是隔离模式,也就是启动之后和客户端就断了联系,如果要停止集群,需要通过yarn application -kill {applicationId} 来停止集群
提交成功之后,我们会在yarn的管理页面看到一个类似的任务
这个启动命令也有很多的参数,我就不一一讲解了,我用大白话讲讲我认为最核心的几个参数。
第二,通过命令行来停止:
这个时候需要指定yarn applicationId和flink job id
第三,通过程序来停止
>
根据集群的生命周期、资源隔离方式和应用程序的main()方法执行位置(client或者JobManager)可以将集群部署模式分为:Flink Session Cluster(session mode)、Flink Job Cluster(per-job mode) 和Flink Application Cluster(application mode)三类。
Flink Session Cluster集群部署模式也称为session mode。该模式集群会预先启动、长时间运行,并且能够接收多个job提交运行。也就是提交到集群的job共享一套flink runtime cluster(JobManager和TaskManager)。
该模式特点如下:
Flink Job Cluster集群部署模式又称为per-job mode。该模式与Job绑定,集群管理器会为每个Job启动一个flink runtime cluster(JobManager和TaskManager)。Client会首先向集群管理器请求资源来启动JobManager,并将job提交给其内部的Dispatcher,TaskManager根据job的资源请求延迟分配启动的。
该模式特点如下:
NOTE: Flink Job Cluster模式不支持部署在Kubernetes上。
Flink Application Cluster集群部署模式又称为Application mode。该模式集群属于专属集群模式,只会执行一个Flink Application中的job,集群管理器为每个Flink Application启动一个flink runtime cluster(JobManager和TaskManager)。并且应用程序的main方法是运行在Cluster中,而不是client上。
该模式可以看做是对per-job模式和session模式的优化部署模式。集群执行job粒度上,相较前两者找到了更好的一个隔离点,并且减轻了Client的负载。
该模式特点如下:
Flink是一个基于流计算的分布式引擎,以前的名字叫stratosphere,从2010年开始在德国一所大学里发起,也是有好几年的 历史 了,2014年来借鉴了社区其它一些项目的理念,快速发展并且进入了Apache顶级孵化器,后来更名为Flink。
Flink在德语中是快速和灵敏的意思 ,用来体现流式数据处理速度快和灵活性强等特点。
Flink提供了同时支持高吞吐、低延迟和exactly-once 语义的实时计算能力,另外Flink 还提供了基于流式计算引擎处理批量数据的计算能力,真正意义上实现了流批统一。
Flink 独立于Apache Hadoop,且能在没有任何 Hadoop 依赖的情况下运行。
但是,Flink 可以很好的集成很多 Hadoop 组件,例如 HDFS、YARN 或 HBase。 当与这些组件一起运行时,Flink 可以从 HDFS 读取数据,或写入结果和检查点(checkpoint)/快照(snapshot)数据到 HDFS 。 Flink 还可以通过 YARN 轻松部署,并与 YARN 和 HDFS Kerberos 安全模块集成。
Flink具有先进的架构理念、诸多的优秀特性,以及完善的编程接口。
Flink的具体优势有如下几点:
(1)同时支持高吞吐、低延迟、高性能;
(2)支持事件时间(Event Time)概念;
事件时间的语义使流计算的结果更加精确,尤其在事件到达无序或者延迟的情况下,保持了事件原本产生时的时序性,尽可能避免网络传输或硬件系统的影响。
(3)支持有状态计算;
所谓状态就是在流计算过程中,将算子的中间结果数据保存在内存或者文件系统中,等下一个事件进入算子后,可以从之前的状态中获取中间结果,计算当前的结果,从而无需每次都基于全部的原始数据来统计结果。
(4)支持高度灵活的窗口(Window) *** 作;
(5)基于轻量级分布式快照(Snapshot)实现的容错;
(6)基于JVM实现独立的内存管理;
(7)Save Points(保存点);
保存点是手动触发的,触发时会将它写入状态后端(State Backends)。Savepoints的实现也是依赖Checkpoint的机制。Flink 程序在执行中会周期性的在worker 节点上进行快照并生成Checkpoint。因为任务恢复的时候只需要最后一个完成的Checkpoint的,所以旧有的Checkpoint会在新的Checkpoint完成时被丢弃。Savepoints和周期性的Checkpoint非常的类似,只是有两个重要的不同。一个是由用户触发,而且不会随着新的Checkpoint生成而被丢弃。
在Flink整个软件架构体系中,统一遵循了分层的架构设计理念,在降低系统耦合度的同时,为上层用户构建Flink应用提供了丰富且友好的接口。
整个Flink的架构体系可以分为三层:
Deployment层: 该层主要涉及了Flink的部署模式,Flink支持多种部署模式:本地、集群(Standalone/YARN),云(GCE/EC2),Kubernetes等。
Runtime层:Runtime层提供了支持Flink计算的全部核心实现,比如:支持分布式Stream处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务。
API层: 主要实现了面向无界Stream的流处理和面向Batch的批处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API。
Libraries层:该层也可以称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的计算框架,也分别对应于面向流处理和面向批处理两类。
核心概念:Job Managers,Task Managers,Clients
Flink也是典型的master-slave分布式架构。Flink的运行时,由两种类型的进程组成:
Client: Client不是运行时和程序执行的一部分,它是用来准备和提交数据流到JobManagers。之后,可以断开连接或者保持连接以获取任务的状态信息。
当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager, JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。 TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。
每个Worker(Task Manager)是一个JVM进程,通常会在单独的线程里执行一个或者多个子任务。为了控制一个Worker能够接受多少个任务,会在Worker上抽象多个Task Slot (至少一个)。
只有一个slot的TaskManager意味着每个任务组运行在一个单独JVM中。 在拥有多个slot的TaskManager上,subtask共用JVM,可以共用TCP连接和心跳消息,同时可以共用一些数据集和数据结构,从而减小任务的开销。
Flink的任务运行其实是多线程的方式,这和MapReduce多JVM进程的方式有很大的区别,Flink能够极大提高CPU使用效率,在多个任务之间通过TaskSlot方式共享系统资源,每个TaskManager中通过管理多个TaskSlot资源池对资源进行有效管理。
Apache Flink是目前市场最受关注的流计算处理引擎,相较于Spark Streaming的依托Spark Core实现的微批处理模型,Flink是一个纯粹的流处理引擎,其基于 *** 作符的连续流模型,可以达到微秒级别的延迟。
Flink实现了流批一体化模式,实现按照事件处理和无序处理两种形式,基于内存计算。强大高效的反压机制和内存管理,基于轻量级分布式快照checkpoint机制,从而自动实现了Exactly-Once一致性语义。
1 数据源端
支持可靠的数据源(如kafka), 数据可重读
Apache Flink内置FlinkKafkaConsumer010类,不依赖于 kafka 内置的消费组offset管理,在内部自行记录和维护 consumer 的offset。
2 Flink消费端
轻量级快照机制: 一致性checkpoint检查点
Flink采用了一种轻量级快照机制(检查点checkpoint)来保障Exactly-Once的一致性语义。所谓的一致检查点,即在某个时间点上所有任务状态的一份拷贝(快照)。该时间点是所有任务刚好处理完一个相同数据的时间。
间隔时间自动执行分布式一致性检查点(Checkpoints)程序,异步插入barrier检查点分界线,内存状态自动存储为cp进程文件。保证数据Exactly Oncey精确一次处理。
(1) 从source(Input)端开始,JobManager会向每个source(Input)发送检查点barrier消息,启动检查点。在保证所有的source(Input)数据都处理完成后,Flink开始保存具体的一致性检查点checkpoints,并在过程中启用barrier检查点分界线。
(2) 接收数据和barrier消息,两个过程异步进行。在所有的source(Input)数据都处理完成后,开始将自己的检查点(checkpoints)保存到状态后(StateBackend)中,并通知JobManager将Barrier分发到下游
(3) barrier向下游传递时,会进行barrier对齐确认。待barrier都到齐后才进行checkpoints检查点保存。
(4) 重复以上 *** 作,直到整个流程完成。
3 输出端
与上文Spark的输出端Exactly-Once一致性上实现类似,除了目标源需要满足一定条件以外,Flink内置的二阶段提交机制也变相实现了事务一致性。支持幂等写入、事务写入机制(二阶段提交)
这一块和上文Spark的幂写入特性内容一致,即相同Key/ID 更新写入,数据不变。借助支持主键唯一性约束的存储系统,实现幂等性写入数据,此处将不再继续赘述。
Flink在处理完source端数据接收和operator算子计算过程,待过程中所有的checkpoint都完成后,准备发送数据到sink端,此时启动事务。其中存在两种方式: (1) WAL预写日志: 将计算结果先写入到日志缓存(状态后端/WAL)中,等checkpoint确认完成后一次性写入到sink。(2) 二阶段提交: 对于每个checkpoint创建事务,先预提交数据到sink中,然后等所有的checkpoint全部完成后再真正提交请求到sink, 并把状态改为已确认。
整体思想: 为checkpoint创建事务,等到所有的checkpoint全部真正的完成后,才把计算结果写入到sink中。
Flink 是一个流处理框架,支持流处理和批处理,特点是流处理有限,可容错,可扩展,高吞吐,低延迟。
流处理是处理一条,立马下一个节点会从缓存中取出,在下一个节点进行计算
批处理是只有处理一批完成后,才会经过网络传输到下一个节点
流处理的优点是低延迟 批处理的优点是高吞吐
flink同时支持两种,flink的网络传输是设计固定的缓存块为单位,用户可以设置缓存块的超时值来决定换存块什么时候进行传输。 数据大于0 进行处理就是流式处理。
如果设置为无限大就是批处理模型。
Flink 集群包括 JobManager 和 TaskManager
JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包 等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。
TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要 部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。
flink on yarn 是由client 提交 app到 RM 上, 然后RM 分配一个 AppMaster负责运行 Flink JobManager 和 Yarn AppMaster, 然后 AppMaster 分配 容器去运行 Flink TaskManger
SparkStreaming 是将流处理分成微批处理的作业, 最后的处理引擎是spark job
Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块,Spark Streaming会把每块数据作为一个RDD,并使用RDD *** 作处理每一小块数据。每个块都会生成一个Spark Job处理,然后分批次提交job到集群中去运行,运行每个 job的过程和真正的spark 任务没有任何区别。
JobScheduler, 负责 Job的调度通过定时器每隔一段时间根据Dstream的依赖关系生一个一个DAG图
ReceiverTracker负责数据的接收,管理和分配
ReceiverTracker在启动Receiver的时候他有ReceiverSupervisor,其实现是ReceiverSupervisorImpl, ReceiverSupervisor本身启 动的时候会启动Receiver,Receiver不断的接收数据,通过BlockGenerator将数据转换成Block。定时器会不断的把Block数据通会不断的把Block数据通过BlockManager或者WAL进行存储,数据存储之后ReceiverSupervisorlmpl会把存储后的数据的元数据Metadate汇报给ReceiverTracker,其实是汇报给ReceiverTracker中的RPC实体ReceiverTrackerEndpoin
spark on yarn 的cluster模式, Spark client 向RM提交job请求, RM会分配一个 AppMaster, driver 和 运行在AppMAster节点里, AM然后把Receiver作为一个Task提交给Spark Executor 节点, Receive启动接受数据,生成数据块,并通知Spark Appmaster, AM会根据数据块生成相应的Job, 并把Job 提交给空闲的 Executor 去执行。
1:需要关注流数据是否需要进行状态管理
2:At-least-once或者Exectly-once消息投递模式是否有特殊要求
3:对于小型独立的项目,并且需要低延迟的场景,建议使用storm
4:如果你的项目已经使用了spark,并且秒级别的实时处理可以满足需求的话,建议使用sparkStreaming
5:要求消息投递语义为 Exactly Once 的场景;数据量较大,要求高吞吐低延迟的场景;需要进行状态管理或窗口统计的场景,建议使用flink
Flink 提供的Api右 DataStream 和 DataSet ,他们都是不可变的数据集合,不可以增加删除中的元素, 通过 Source 创建 DataStream 和 DataSet
在创建运行时有:
Flink的每一个Operator称为一个任务, Operator 的每一个实例称为子任务,每一个任务在JVM线程中执行。可以将多个子任务链接成一个任务,减少上下文切换的开销,降低延迟。
source 和 算子map 如果是 one by one 的关系,他们的数据交换可以通过缓存而不是网络通信
TaskManager 为控制执行任务的数量,将计算资源划分多个slot,每个slot独享计算资源,这种静态分配利于任务资源隔离。
同一个任务可以共享一个slot, 不同作业不可以。
这里因为 Source 和 Map 并行度都是4 采用直连方式,他们的数据通信采用缓存形式
所以一共需要两个TaskManager source,Map 一个,reduce一个, 每个TaskManager 要3个slot
JobManager 将 JobGraph 部署 ExecutionGraph
设置的并行度,可以让一个ExecJobVertex 对应 多个并行的ExecVertex 实例。
Flink通过状态机管理 ExecGraph的作业执行进度。
Flink 将对象序列化为固定数量的预先分配的内存段,而不是直接把对象放在堆内存上。
Flink TaskManager 是由几个内部组件组成的:actor 系统(负责与 Flink master 协调)、IOManager(负责将数据溢出到磁盘并将其读取回来)、MemoryManager(负责协调内存使用。
数据源:
Sink:
时间:
处理时间:取自Operator的机器系统时间
事件时间: 由数据源产生
进入时间: 被Source节点观察时的系统时间
如果数据源没有自己正确创建水印,程序必须自己生成水印来确保基于事件的时间窗口可以正常工作。。
DataStream 提供了 周期性水印,间歇式水印,和递增式水印
以上就是关于Flink On Yarn集群部署全部的内容,包括:Flink On Yarn集群部署、Flink 端对端一致性、flink实战教程-集群的部署等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)