- Flink run -t yarn-per-job -c xxxx xxx.jar
flink run
入口类org.apache.flink.client.cli.CliFrontend
,通过config.sh
读取Flink相关环境信息;- 核心逻辑main方法,具体代码分析可以跟进
CliFrontend#run
方法
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
// 获取flink-conf.yaml路径
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
// 根据路径加载配置
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
// 加载自定义命令行,依次添加generic、yarn、default三种命令行客户端
final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);
try {
// 创建CliFrontend
final CliFrontend cli = new CliFrontend(
configuration,
customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
int retCode = SecurityUtils.getInstalledContext()
.runSecured(() -> cli.parseParameters(args));
System.exit(retCode);
}
catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
System.exit(31);
}
}
- 参数解析、封装CommandLine:三个执行对应命令、配置封装、执行用户代码、生成StreamGraph、Executor生成JobGraph、集群描述符:上传jar包、配置,封装提交给yarn、yarnClient提交应用
-
applicationMaster执行入口类
-
Dispatcher的创建和启动
-
ResourceManager的创建、启动:里面有slotmanager(管理slot资源,向yarn申请资源)
-
Dispatcher启动JobManager(里面有一个slotpool,发送真正的请求)
-
slootpool向slotmanager申请资源,slotmanager向yarn申请资源(启动新节点)
- TaskManager的入口类
- 启动TaskExecutor
- 向RM注册slot,RM分配Slot,taskExecutor接收到分配的指令,提供offset给JobMaster(slotpool)
- JobMaster提交任务给TaskExecutor执行任务
- Flink内部节点之间的通信是用的Akka,数据的网络传输是通过Netty。0.9版本开始使用Akka,主要影响的组件是JobManager、TaskManager、Dispatcher、ResourceManager等。
- 定义通信行为,用于调用RpcEndpoint的某些方法,相当于ActorRef。
- JobMaster、ResourcceManager、Dispatcher、TaskExecutor都有对应的网关接口。
- 通信终端,提供RPC服务组件的生命周期管理(start、stop),每个RpcEndpoint对应了一个路径(endpointId和actorSystem确定),每个路径对应一个Actpr,其实现了RpcGateway接口。
- RpcEndpoint的成员变量
- 根据提供的RpcEndpoint来启动和停止RpcServer(Actor)
- 根据提供的地址链接对方的RpcServer,并返回一个RpcGateway
- 延迟/立刻调度Runnable、Callable
Flink中实现类为AkkaRpcService,是Akka的ActorSystem的封装,基本可以理解为ActorSystem的一个适配器。
- 组件自身的代理,A组件通过B组件Gateway调用自身的rpcServer。
- AkkaInvocationHandler,封装ActorRef
- FencedAkkaInvocationHandler,封装ActorRef
- 在RpcService中调用connect方法与对端的RpcEndpoint(RpcServer)建立连接,connect方法根据给的地址返回InvocationHandler(AkkaInvacationHandler、FencedAkkaInvocationHanlder动态代理。),核心查看invoke动态代理核心逻辑。
- AkkaRpcActor处理对应响应
- Flink的执行图可以分成四层:StreamGraph->JobGraph->ExecutionGraph->物理执行图
- StreamGraph:是根据用户通过StreamAPI编写的代码生成的最初的图,表示程序的拓扑结构。
- StreamNode:用来代表operator的类,并具有所有相关的属性,如并发度,入边和出边(表示算子的上游和下游)等。
- StreamEdge:表示连续两个StreamNode的边。
- JobGraph:StreamGraph经过优化后生成了JobGraph,提交给JobManager的数据结构,会进行chain链优化,减少各个节点所需的序列化/反序列化/传输消耗。
- JobVertex:经过优化后符合条件的多个StreamNode可能会chain在一起生成一个Vertext,即一个JobVertex包含一个或多个opeartor,JobVertext的输入是Jobedge,输出是IntermediateDataSet。
- IntermediateDataSet:表示JobVertex的输出,即经过opeartor处理产生的数据集,producer是JobVertex,consumer是JobEdge。
- JobEdge:代表了JobGraph中的一条数据传输通道。source是IntermediateDataSet,target是JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标的JobVertex。
- ExecutionGraph:JobManager根据JobGraph生成ExecutionGraph,是并行版本的JobGraph,是调度层最核心的数据结构。
- ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。
- IntermediateResult:和JobGraph的IntermediateDataSet一一对应。一个IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并行度。
- IntermediateResultPartition:表示ExecutionVertex的一个输入分区,producer是ExecutionVertex,consumer是若干个Executionedge。
- ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,traget是ExecutionVertex,source和target都只能有一个。
- Execution:是执行一个ExecutionVertex的一次尝试,当发生故障或者数据需要重算的情况下ExecutionVertex可能会有多个ExecutionAttemptID,一个Execution通过ExecutionAttemptID来唯一标识。JM和TM之间关于task的部署和task status的更新都是通过ExecutionAttemptID来确定消息接受者。
- 物理执行图:JobManager根据executionGraph对Job进行调度后,在各个TaskManager上部署Task后形成的"图",并不是一个具体的数据结构。
- Task:Execution被调度后分配的TaskManager中启动对应的Task。Task包裹来具有用户执行逻辑的operator。
- ResultPartition:代表一个Task的生成的数据,和ExecutionGraph的IntermediateResultPartition一一对应。
- ResultSubPartition:是ResultPartition的一个子分区。每个ResultPartition都多个ResultSubPartition,其数目要由下游消费Task数和DistributionPattern来决定。
- InputGate:代表Task的输入封装,和JobGraph中JobEdge一一对应。每个InputGate消费一个或多个ResultPArtition。
- InputChannel:每个InputGate会包含一个以上的InputChannel,和ExecutionGraph的ExecutionEdge一一对应,也和ResultSubPartition一对一地相连,即一个InputChannel接收一个ResultSubPartition的输出。
- StreamExecutionEnvironment.execute()
-->execute(jobName):
-->getStreamGraph(jobName);
-->getStreamGraphGenerator()
-->最终通过StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();生成流图
- 核心逻辑查看github flink源码的StreamGraph和StreamNode及StreamEdge相关源码。
-->PipelineExecutor#execute()
-->PipelineExecutorUtils.getJobGraph(pipeline, configuration);
-->StreamGraphTranslator#translateToJobGraph
- StreamNode转换JobVertex
- 每个JobVertex都对应可序列化的StreamConfig,用来发送给JobManager和TaskManager。最后在TM中起Task时,需要从这里反序列化出所需要的配置信息,包含用户代码含有的StreamOpeator。
- setChaining会对source调用createChain方法,将StreamNode转换成JobVertex放置在内存里,并将配置放入StreamConfig中。
- StreamEdge转换JobEdge
- JobEdge和JobVertex之间创建IntermediateDataSet来连接
- connect方法创建JobEdge和创建中间结果集连接。
- 将JobGraph并行化,JobVertex转换为ExecutionJobVertex,interalmediaDataset转换IntermediateResult,JobEdge转换ExecutionJobEdge。
-->Dispatcher#runJob()
-->Dispatcher#createJobManagerRunner
-->DefaultJobManagerRunnerFactory#createJobManagerRunner
-->DefaultJobMasterServiceFactory#createJobMasterService
-->JobMaster构造方法的createScheduler方法
-->DefaultSchedulerFactory#createInstance
-->SchedulerBase#createAndRestoreExecutionGraph
- createAndRestoreExecutionGraph内部涉及到jobGraph各个组件转换的ExecutionGraph的操作。
- ExecutionGraph各个组件拆分执行逻辑流程
-->JobMaster#startJobExecution
-->JobMaster#resetAndStartScheduler
-->SchedulerBase#startScheduling
-->SchedulerBase#startSchedulingInternal
-->DefaultScheduler#startScheduling
-->PipelinedRegionSchedulingStrategy#startScheduling 默认调度策略
-->PipelinedRegionSchedulingStrategy#maybeScheduleRegions
-->DefaultScheduler#allocateSlotsAndDeploy
-->Execution#deploy
-->TaskManangerGateway#submitTask
-->RpcTaskManangerGateway#submitTask
-->TaskExecutor#submitTask
-->Task#startTaskThread
-->StreamTask#invoke
-->MailboxProcessor#runMailboxLoop
- SchedulerNG及其子类、实现类
- 作业的生命周期管理,如作业的发布、挂起、取消。
- 作业执行资源的申请、分配、释放。
- 作业的状态管理,作业发布过程中的状态变化和作业异常时的FailOver等。
- 作业的信息提供,对外提供作业的详细信息。
- SchedulerBase、DefaultScheduler
- 主要分为流和批次
- 适用于流计算,一次性申请需要的所有资源,如资源不足,则作业启动失败。
- LAZY_FROM_SOURCES适用于批处理,聪SourceTask开始分阶段调度,申请资源的时候,一次性申请本阶段所需要的所有资源。上游Task执行完毕后开始调度执行下游的Task,读取上游的数据,执行本阶段的计算任务,执行完毕之后,调度后一个阶段的Task,依次进行调度,直到作业完成。
- LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST适用于批处理。与分阶段调度基本一样,区别在于该模式下适用批处理资源申请模式,可以在资源不足的情况下执行作业,但是需要确保在本阶段的作业执行中没有Shuffle行为。
- SchedulingStrategy
- EagerSchedulingStrategy:适用于流计算,同时调度所有的task
- LazyFromSourcesSchedulingStrategy:适用于批计算,当输入数据准备好时(上游处理完)进行vertices调度。
- PipelinedRegionSchedulingStrategy:以流水线的局部为粒度进行调度。从1.11加入,1.12开始作为任务的默认调度策略。
- Flink自定义了内存管理机制,规避传统JVM内存管理存在的问题,多级缓存为命中,内存占用过大,Full GC问题等
jobmananger.memory.process.size
管理堆外堆内内存,已经metaspace、jvmoverhead相关
* ┌ ─ ─ Total Flink Memory - ─ ─ ┐
* ┌───────────────────────────┐
* | │ JVM Heap Memory │ |
* └───────────────────────────┘
* │ ┌───────────────────────────┐ │
* | Off-heap Heap Memory │ -─ JVM Direct Memory
* │ └───────────────────────────┘ │
* └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
-->YarnClusterDescriptor#startAppMaster
-->JobManagerProcessUtils#processSpecFromConfigWithNewOptionToInterpretLegacyHeap
-->JobManagerProcessUtils#processSpecFromConfig
-->JobManagerProcessUtils#createMemoryProcessSpec
taskManager使用堆上和堆外内存
- Flink框架内存使用了堆上和堆外内存,不计入slot资源
- Task执行的内存使用了堆上内存和堆外内存。
- 网络缓冲内存:网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区
框架堆外内存、Task堆外内存、网络缓冲内存,都在堆外的直接内存里
- 管理内存:Flink管理的堆外内存。用于管理排序、哈希表、缓存中间结果及RocksDB State Backend的本地内存。
- JVM特有内存:JVM本身占用的内存,包括Metaspace和执行开销JVMOverhead。
Flink使用内存=框架堆内和对外内存+Task堆内和堆外内存+网络缓冲内存+管理内存
进程内存=Flink内存+JVM特有内存
* ┌ ─ ─ Total Flink Memory - ─ ─ ┐
* |┌ ─ ─ - - - On-Heap - - - ─ ─ ┐|
* ┌───────────────────────────┐
* |││ Framework Heap Memory ││|
* └───────────────────────────┘
* │ ┌───────────────────────────┐ │
* || Task Heap Memory ││
* │ └───────────────────────────┘ │
* └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
* |┌ ─ ─ - - - Off-Heap - - ─ ─ ┐|
* │┌───────────────────────────┐│
* │ │ Framework Off-Heap Memory │ │ ─┐
* │└───────────────────────────┘│ │
* │ ┌───────────────────────────┐ │ │
* ││ Task Off-Heap Memory ││ ┼─ JVM Direct Memory
* │ └───────────────────────────┘ │ │
* │┌───────────────────────────┐│ │
* │ │ Network Memory │ │ ─┘
* │└───────────────────────────┘│
* │ ┌───────────────────────────┐ │
* |│ Managed Memory │|
* │ └───────────────────────────┘ │
* └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
* └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
-->ActiveResourceManager#requestNewWorker
-->TaskExecutorProcessUtils#processSpecFromWorkerResourceSpec
- 内存段是MemorySegment,是Flink中最小的内存分配单元,默认大小32KB。它是堆上内存(Java的byte数组),也可以是堆外内存 (给予Netty的DirectByteBuffer),同时提供了对二进制数据进行读取和写入的方法。
- 实现类为HeapMemorySegment、HybridMemorySegment
- HeapMemorySegment:用来分配堆上内存
- HybridMemorySegment用来分配堆外和堆上内存,目前主要使用HybridMemorySegment
- 内存页是MemorySegement之上的数据访问时图,数据读取抽象为DataInputView,数据写入抽象为DataOutputView。
- Task算子之间在网络层面上传输数据,使用的是Buffer,申请和释放由Flink自行管理,实现类为NetworkBuffer。1个NetworkBuffer包装一个MemorySegment,同时继承了AbstractReferenceCountedByteBuffer,是Netty的抽象类。
- BufferPool用来管理Buffer,包含Buffer的申请、释放、销毁、可用Buffer通知等,实现类是LocalBufferPool,每个Task拥有自己的LocalBufferPool。
- BufferPoolFactory用来提供BufferPool的创建和销毁,唯一的实现类是NetworkBufferPool,每个TaskManager只有一个NetworkBufferPool。同一个TaskManager上的Task共享NetworkBufferPool。
- MemoryManager用来管理Flink中用于排序、Hash表、中间结果缓存或使用堆外内存的状态后端的内存。1.10之前负责TM所有内存,1.10之后范围为Slot级别。
-->MemoryManager#allocatePages
-->MemorySegmentFactory#allocateOffHeapUnsafeMemory 创建默认的HybridMemorySegment
- 可以查看Flink反压机制