Master-Workers 架构(粗译为主从架构)是分布式系统中常见的一种组织方式,如 GFS 中的 Master、ChunkServers;MapReduce 中的 Master、Workers。面对分布式系统中一堆分离的机器资源,主从架构是一种最自然、直白的组织方式——就像一群人,有个说了算 leader 进行组织、协调,才能最大化这群人的对外输出能力。这也是计算机系统中常见的一种分而治之思想的体现。即将一个复杂的系统,拆解成几个相对高内聚、低耦合的子模块,定义清楚其功能边界和交互接口,使得系统易于理解、维护和扩展。对于主从架构来说,主(Master) 通常会维护集群元信息、进而依靠这些元信息进行调度,从(Workers) 通常负责具体数据切片(存储系统)的读写或者作为子任务(计算系统)的执行单元。

来自 分布式系统架构(一)——Master-Workers 架构

本篇笔记会对大数据组件/分布式系统相关的术语进行整理。一些小术语作为大术语的补充介绍,没有单独设为标题。可以直接Ctrl+F搜索具体术语。

Spark部分内容整理自吴磊在极客时间的课程,Yarn部分内容整理自李智慧在极客时间的课程。其他内容可以在主页阅读清单的文章中找到。

RPC

RPC(Remote Procedure Call)远程过程调用协议,一种通过网络从远程计算机上请求服务,而不需要了解底层网络技术的协议。分布式系统一般通过RPC通信。RPC假定某些协议的存在,例如TPC/UDP等,为通信程序之间携带信息数据。在OSI网络七层模型中,RPC跨越了传输层和应用层,RPC使得开发,包括网络分布式多程序在内的应用程序更加容易。

简单来说成熟的rpc库相对http容器,更多的是封装了“服务发现”,“负载均衡”,“熔断降级”一类面向服务的高级特性。可以这么理解,rpc框架是面向服务的更高级的封装。如果把一个http servlet容器上封装一层服务发现和函数代理调用,那它就已经可以做一个rpc框架了。所以为什么要用rpc调用?因为良好的rpc调用是面向服务的封装,针对服务的可用性和效率等都做了优化。单纯使用http调用则缺少了这些特性。

Hadoop

往小了说,Hadoop特指HDFS、YARN、MapReduce这三个组件,他们分别是Hadoop分布式文件系统、分布式任务调度框架、分布式计算框架。

往大了说,Hadoop生态包含所有由这3个组件衍生出的大数据产品,如Spark、Hive、Hbase、Pig、Sqoop,等等。

注意HDFS 叫分布式文件系统,管MapReduce 叫分布式计算框架,Yarn 叫分布式任务调度框架:HDFS是系统,而其他两个是框架。这是因为框架在架构设计上遵循一个重要的设计原则叫“依赖倒转原则”,依赖倒转原则是高层模块不能依赖低层模块,它们应该共同依赖一个抽象,这个抽象由高层模块定义,由低层模块实现。

实现 MapReduce 编程接口、遵循 MapReduce 编程规范就可以被 MapReduce 框架调用,在分布式集群中计算大规模数据;实现了 Yarn 的接口规范,比如 Hadoop 2 的 MapReduce,就可以被 Yarn 调度管理,统一安排服务器资源。所以说,MapReduce 和 Yarn 都是框架。相反地,HDFS 就不是框架,使用 HDFS 就是直接调用 HDFS 提供的 API 接口,HDFS 作为底层模块被直接依赖。

MapReduce 早已被 Spark 等计算框架赶超,而 HDFS却依然屹立不倒。其原因是 Yarn 的包容,使得更多计算框架可以接入到 HDFS 中,而不单单是 MapReduce。HDFS 可能不是最优秀的大数据存储系统,但却是应用最广泛的大数据存储系统,Yarn 功不可没。

HDFS

Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。

HDFS以流处理访问模式来存储文件的。一次写入,多次读取。数据源通常由源生成或从数据源直接复制而来,接着长时间在此数据集上进行各类分析,大数据不需要搬来搬去。

HDFS是为了处理大型数据集分析任务的,主要是为达到高的数据吞吐量而设计的,这就可能要求以高延迟作为代价。改进策略对于那些有低延时要求的应用程序,HBase是一个更好的选择。

在HDFS的主从结构中,有两类节点 Namenode和Datanode。他们以管理者-工作者模式工作。

相比于Hadoop1.0,Hadoop 2.0中的HDFS增加了两个重大特性,HA和Federaion。HA即为High Availability,用于解决NameNode单点故障问题,该特性通过热备的方式为主NameNode提供一个备用者,一旦主NameNode出现故障,可以迅速切换至备NameNode,从而实现不间断对外提供服务。Federation即为“联邦”,该特性允许一个HDFS集群中存在多个NameNode同时对外提供服务,这些NameNode分管一部分目录(水平切分),彼此之间相互隔离,但共享底层的DataNode存储资源。

NameNode

NameNode位于Master节点,维护着文件系统树及整棵树内所有的文件和目录,HDFS文件系统中处理客服端读写请求、管理数据块(Block)的映射信息、配置副本策略等管理工作由NameNode来完成。

HDFS无法高效存储大量小文件是因为NameNode把文件系统的元数据放置在内存中,所以文件系统所能容纳的文件数目是由NameNode的内存大小来决定。一般来说,每一个文件、文件夹和Block需要占据150字节左右的空间,所以,如果你有100万个文件,每一个占据一个Block,你就至少需要300MB内存。当前来说,数百万的文件还是可行的,当扩展到数十亿时,对于当前的硬件水平来说就没法实现了。

SecondaryNameNode的功能主要是辅助NameNode,分担其工作量;在紧急情况下可以辅助恢复NameNode,但是它不能替换NameNode并提供服务。

Datanode

DataNode位于Worker节点。NameNode 下达命令,DataNode 执行实际操作。DataNode表示实际存储的数据块,同时可以执行数据块的读写操作。

当我们读取一个文件时HDFS client 联系 NameNode,获取文件的 data blocks 组成、以及每个 data block 所在的机器以及具体存放位置。HDFS client 联系 DataNode, 进行具体的读写操作。在读写一个文件时,当从 NameNode。得知应该向哪些 Data nodes 读写之后,就直接和 DataNode 打交道,不再通过 NameNode。

Spark/MapReduce

在Hadoop1中,还没有引入Yarn等调度框架。此时MapReduce自行完成资源调度,进程JobTracker位于Master节点,TaskTracker位于Worker节点。详见Yarn章节。

combiner

MapReduce combiner 也被称为 “微型 reducer ”。当我们用 MapReduce 作业处理大数据集的时候,Mapper 生成的中间结果数据就会比较大,而且这些中间结果数据后续会被传输到 Reducer 继续处理,这会导致非常大的网络开销,甚至网络堵塞。MapReduce 框架提供了一个函数——Hadoop Combiner 函数,它在减少网络阻塞方面扮演着一个关键的角色。combiner 的主要工作就是在 Mapper 的输出数据被传输到 Reducer 之前对这些数据进行处理。它在 mapper 之后 reducer 之前执行,也就是在 mapper 和 reducer 两个阶段的中间执行。并且 combiner 的执行是可选的,即可用可不用。

combiner的优势:

  1. Hadoop combiner 减少了数据从 mapper 到 reducer 之间的传输时间。
  2. 减少 reducer 处理的数据量,但最终计算结果不变。
  3. 提升 reducer 的整体性能。

不足:

  1. MapReduce job 不能依赖于 Hadoop combiner 的执行,因为 combiner 的执行没有保证。
  2. 在本地文件系统中,键值对是存储在 Hadoop 中的,运行 combiner 会导致昂贵的磁盘IO 开销。

Spark以内存计算的方式接替MapReduce作为开源集群计算框架。在整个集群中Master 节点上运行Driver进程,Worker 节点上运行Executors进程。

RDD

Resilient Distributed Datasets。

RDD 是一种抽象,是 Spark 对于分布式数据集的抽象,它用于囊括所有内存中和磁盘中的分布式数据实体,好比单机进程内的数组。

包含以下四大属性:

  1. partitions:数据分片
  2. partitioner:分片切割规则
  3. dependencies:RDD 依赖
  4. compute:转换函数

尽管 RDD API 使用频率越来越低,取而代之的是DataFrame 和 Dataset API。但是,无论采用哪种 API 或是哪种开发语言,应用在 Spark 内部最终都会转化为 RDD 之上的分布式计算。

从开发入口来说,在 RDD 框架下开发的应用程序,会直接交付 Spark Core 运行。而使用 DataFrame API 开发的应用,则会先过一遍 Spark SQL,由 Spark SQL 优化过后再交由 Spark Core 去做执行。

Spark SQL

Spark Core 特指 Spark 底层执行引擎(Execution Engine),它包括了Spark的调度系统、存储系统、内存管理、Shuffle 管理等核心功能模块。

Spark SQL 则凌驾于 Spark Core 之上,是一层独立的优化引擎(Optimization Engine)。

换句话说,Spark Core 负责执行,而 Spark SQL 负责优化,Spark SQL 优化过后的代码,依然要交付 Spark Core 来做执行。

Spark SQL 由两个核心组件构成,分别是 Catalyst 优化器和 Tungsten,其优化过程也分为 Catalyst 和 Tungsten 两个环节。

Catalyst 优化器的职责在于创建并优化执行计划,它包含 3 个功能模块,分别是创建语法树并生成执行计划、逻辑阶段优化和物理阶段优化。Tungsten 用于衔接 Catalyst 执行计划与底层的 Spark Core 执行引擎,它主要负责优化数据结果与可执行代码。

直接与文件系统交互,仅仅是 Spark SQL 数据应用的常见场景之一。Spark SQL 另一类非常典型的场景是与 Hive 做集成、构建分布式数据仓库。数据仓库指的是一类带有主题、聚合层次较高的数据集合,它的承载形式,往往是一系列 Schema 经过精心设计的数据表。在数据分析这类场景中,数据仓库的应用非常普遍。

Hive 是 Apache Hadoop 社区用于构建数据仓库的核心组件,它负责提供种类丰富的用户接口,接收用户提交的 SQL 查询语句。这些查询语句经过 Hive 的解析与优化之后,往往会被转化为分布式任务,并交付 Hadoop MapReduce 付诸执行。在 Hive 与 Spark 的组合中,Hive 擅长元数据管理,而 Spark 的专长是高效的分布式计算。

以响应一个 SQL 查询为例。Hive接收到 SQL 查询之后,Hive 的 Driver 首先使用其 Parser 组件,将查询语句转化为 AST(Abstract Syntax Tree,查询语法树)。紧接着,Planner 组件根据 AST 生成执行计划,而 Optimizer 则进一步优化执行计划。要完成这一系列的动作,Hive 必须要能拿到相关数据表的元信息才行,比如表名、列名、字段类型、数据文件存储路径、文件格式,等等。而这些重要的元信息,通通存储在一个叫作“Hive Metastore”的数据库中。本质上,Hive Metastore 其实就是一个普通的关系型数据库(RDBMS),它可以是免费的 MySQL、Derby,也可以是商业性质的 Oracle、IBM DB2。实际上,除了用于辅助 SQL 语法解析、执行计划的生成与优化,Metastore 的重要作用之一,是帮助底层计算引擎高效地定位并访问分布式文件系统中的数据源。这里的分布式文件系统,可以是 Hadoop 生态的 HDFS,也可以是云原生的 Amazon S3。而在执行方面,Hive 目前支持 3 类计算引擎,分别是 Hadoop MapReduce、Tez 和 Spark。

当 Hive 采用 Spark 作为底层的计算引擎时,我们就把这种集成方式称作“Hive on Spark”。相反,当 Spark 仅仅是把 Hive 当成是一种元信息的管理工具时,我们把 Spark 与 Hive 的这种集成方式,叫作“Spark with Hive”。

在 Hive on Spark 模式下,Hive 与 Spark 衔接的部分是 Spark Core,而不是 Spark SQL,这一点需要特别注意。这也是为什么,相比 Hive on Spark,Spark with Hive 的集成在执行性能上会更胜一筹。Spark SQL + Spark Core 这种原装组合,相比 Hive Driver + Spark Core 这种适配组合,在契合度上要更高一些。

Shuffle

Shuffle 的本意是扑克牌中的“洗牌”,在大数据领域的引申义表示的是

集群范围内跨进程、跨节点的数据交换。

分布式数据集在集群内的分发,会引入大量的磁盘 I/O 与网络 I/O。在绝大多数的业务场景中,Shuffle 操作都是必需的、无法避免的。在Spark/MapReduce中,reduceByKey 的计算被切割为两个执行阶段。约定俗成地,我们把

Shuffle 之前的 Stage 叫作 Map 阶段,而把 Shuffle 之后的 Stage 称作 Reduce 阶段。

以wordcount为例,在 Map 阶段,每个 Executors 先把自己负责的数据分区做初步聚合(又叫 Map 端聚合、局部聚合);在 Shuffle 环节,不同的单词被分发到不同节点的 Executors 中;最后的 Reduce 阶段,Executors 以单词为 Key 做第二次聚合(又叫全局聚合),从而完成统计计数的任务。

Standalone

Spark 支持多种分布式部署模式。Standalone 是 Spark 内置的资源调度器。而 YARN、Mesos、Kubernetes 是独立的第三方资源调度与服务编排框架。题外话 Spark 早期是为了推广 mesos 而开发的。

Standalone 在资源调度层面,采用了一主多从的主从架构,把计算节点的角色分为 Master 和 Worker。其中,Master 有且只有一个,而 Worker 可以有一到多个。所有 Worker 节点周期性地向 Master 汇报本节点可用资源状态,Master 负责汇总、变更、管理集群中的可用资源,并对 Spark 应用程序中 Driver 的资源请求作出响应。

Driver

在 Spark 的应用开发中,任何一个应用程序的入口,都是带有 SparkSession 的 main 函数。SparkSession 包罗万象,它在提供 Spark 运行时上下文的同时(如调度系统、存储系统、内存管理、RPC 通信),也可以为开发者提供创建、转换、计算分布式数据集(如 RDD)的开发 API。不过在 Spark 分布式计算环境中,有且仅有一个 JVM 进程运行这样的 main 函数,这个特殊的 JVM 进程,在 Spark 中有个专门的术语,叫作“Driver”。Driver 最核心的作用在于,解析用户代码、构建计算流图,然后将计算流图转化为分布式任务,并把任务分发给集群中的执行进程交付运行。

BlockManagerMaster作为存储组件位于Driver端,负责管理数据块的元数据(Meta data),这些元数据记录并维护数据块的地址、位置、尺寸以及状态。

Driver是一个运行包含SparkSession的main函数的JVM进程。Driver进程会使用DAGScheduler、TaskScheduler 和 SchedulerBackend这三个对象,来完成分布式计算:把抽象的计算图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行。

DAGScheduler

DAGScheduler 是任务调度的发起者,根据用户代码构建计算流图DAG后把计算图 DAG 拆分为执行阶段 Stages。Stages 指的是不同的运行阶段,同时还要负责把 Stages 转化为任务集合 TaskSets,也就是把“建筑图纸”转化成可执行、可操作的“建筑项目”。用一句话来概括从 DAG 到 Stages 的拆分过程,那就是:以 Actions 算子为起点,从后向前回溯 DAG,以 Shuffle 操作为边界去划分 Stages。

DAGScheduler 根据用户代码构建计算流图DAG, 以 Shuffle 为边界切割 DAG为Stage,基于 Stages 创建 TaskSets,并将 TaskSets 提交给 TaskScheduler 请求调度。

SchedulerBackend

这里先介绍SchedulerBackend,因为其提供WorkerOffer 给TaskScheduler用于分布式任务调度。SchedulerBackend将分布式任务分发到 Executors 中去。对于集群中可用的计算资源,SchedulerBackend 用一个叫做 ExecutorDataMap 的数据结构,来记录每一个计算节点中 Executors 的资源状态。这里的 ExecutorDataMap 是一种 HashMap,它的 Key 是标记 Executor 的字符串,Value 是一种叫做 ExecutorData 的数据结构。ExecutorData 用于封装 Executor 的资源状态,如 RPC 地址、主机地址、可用 CPU 核数和满配 CPU 核数等等,它相当于是对 Executor 做的“资源画像”。

基于 Executor 资源画像,SchedulerBackend 可以同时提供多个 WorkerOffer 用于分布式任务调度。WorkerOffer 这个名字起得很传神,Offer 的字面意思是公司给你提供的工作机会,到了 Spark 调度系统的上下文,它就变成了使用硬件资源的机会。

值得一提的是,SchedulerBackend 组件的实例化,取决于开发者指定的 Spark MasterURL,也就是我们使用 spark-shell(或是 spark-submit)时指定的–master 参数,如“–master spark://ip:host”就代表 Standalone 部署模式,“–master yarn”就代表 YARN 模式等等。不难发现,SchedulerBackend 与资源管理器(Standalone、YARN、Mesos 等)强绑定,是资源管理器在 Spark 中的代理。

SchedulerBackend 用一个叫做 ExecutorDataMap 的数据结构,来记录每一个计算节点中 Executors 的资源状态,并根据这些状态为TaskScheduler提供Workeroffer。SchedulerBackend 在接收到TaskScheduler 的工作Task后,把工作进一步下发给Executor进程的ExecutorBackend。

TaskScheduler

TaskScheduler 在初始化的过程中,会创建任务调度队列,任务调度队列用于缓存 DAGScheduler 提交的 TaskSets。Task 与 RDD 的 partitions 是一一对应的,在创建 Task 的过程中,DAGScheduler 会根据数据分区的物理地址,来为 Task 设置 locs 属性。locs 属性记录了数据分区所在的计算节点、甚至是 Executor 进程 ID。TaskScheduler 结合 SchedulerBackend 提供的 WorkerOffer,按照预先设置的调度策略依次对队列中的任务进行调度。对于给定的 WorkerOffer,TaskScheduler 是按照任务的本地倾向性,来遴选出 TaskSet 中适合调度的 Tasks。定向到计算节点粒度的本地性倾向,Spark 中的术语叫做 NODE_LOCAL。除了定向到节点,Task 还可以定向到进程(Executor)、机架、任意地址,它们对应的术语分别是 PROCESS_LOCAL、RACK_LOCAL 和 ANY。对于倾向 PROCESS_LOCAL 的 Task 来说,它要求对应的数据分区在某个进程(Executor)中存有副本;而对于倾向 RACK_LOCAL 的 Task 来说,它仅要求相应的数据分区存在于同一机架即可。ANY 则等同于无定向,也就是 Task 对于分发的目的地没有倾向性,被调度到哪里都可以。

Spark 调度系统的核心思想,是“数据不动、代码动”。也就是说,在任务调度的过程中,为了完成分布式计算,Spark 倾向于让数据待在原地、保持不动,而把计算任务(代码)调度、分发到数据所在的地方,从而消除数据分发引入的性能隐患。毕竟,相比分发数据,分发代码要轻量得多。本地性倾向则意味着代码和数据应该在哪里“相会”,PROCESS_LOCAL 是在 JVM 进程中,NODE_LOCAL 是在节点内,RACK_LOCAL 是不超出物理机架的范围,而 ANY 则代表“无所谓、不重要”。

TaskScheduler 接收到DAGScheduler提供的TaskSets,结合SchedulerBackend提供的 WorkerOffer 与任务的本地性倾向(NODE_LOCAL,PROCESS_LOCAL、RACK_LOCAL 和,ANY),挑选出了适合调度的工作Tasks。接下来,TaskScheduler 就把这些 Tasks 通过 LaunchTask 消息,发送给SchedulerBackend。

Executors

与Driver相对应完成执行的JVM进程。

注意Executors和Executor有区别,这里不再展开。接收到任务之后,Executors 调用内部线程池,结合事先分配好的数据分片,并发地执行任务代码。

假设电脑有 4 个 CPU,那么在命令行敲入 spark-shell 的时候,Spark 会在后台启动 1 个 Driver 进程和 3 个 Executors 进程。

Spark 把 Executor 内存划分为 4 个区域,分别是 Reserved Memory、User Memory、Execution Memory 和 Storage Memory。通过调整 spark.executor.memory、spark.memory.fraction 和 spark.memory.storageFraction 这 3 个配置项,可以灵活地调整不同内存区域的大小,从而去适配 Spark 作业对于内存的需求。在Spark1.6 版本之后统一内存管理模式下,Execution Memory 与 Storage Memory 之间可以互相抢占。

BlockManager、MemoryStore 和 DiskStore作为存储组件位于Executors端。在 Executors 中,BlockManager 通过 MemoryStore 来完成内存的数据存取。MemoryStore 通过一种特殊的数据结构:LinkedHashMap 来完成 BlockId 到 MemoryEntry 的映射。其中,BlockId 记录着数据块的元数据,而 MemoryEntry 则用于封装数据实体。与此同时,BlockManager 通过 DiskStore 来实现磁盘数据的存取与访问。DiskStore 并不直接维护元数据列表,而是通过 DiskBlockManager 这个对象,来完成从数据库到磁盘文件的映射,进而完成数据访问。

ExecutorBackend

ExecutorBackend负责把任务(Task)分配给Executors 线程池中一个又一个的 CPU 线程,每个线程负责处理一个 Task。

每当 Task 处理完毕,这些线程便会通过 ExecutorBackend,向 Driver 端的 SchedulerBackend 发送 StatusUpdate 事件,告知 Task 执行状态。

所以在一次分布式计算中,Task(任务)和Executors线程以及partition(RDD的数据分片)一一对应。

Spark调度

  1. DAGScheduler 以 Shuffle 为边界,将开发者设计的计算图 DAG 拆分为多个执行阶段 Stages,然后为每个 Stage 创建任务集 TaskSet。
  2. SchedulerBackend 通过与 Executors 中的 ExecutorBackend 的交互来实时地获取集群中可用的计算资源,并将这些信息记录到 ExecutorDataMap 数据结构。
  3. 与此同时,SchedulerBackend 根据 ExecutorDataMap 中可用资源创建 WorkerOffer,以 WorkerOffer 为粒度提供计算资源。
  4. 对于给定 WorkerOffer,TaskScheduler 结合 TaskSet 中任务的本地性倾向,按照 PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL 和 ANY 的顺序,依次对 TaskSet 中的任务进行遍历,优先调度本地性倾向要求苛刻的 Task。
  5. 被选中的 Task 由 TaskScheduler 传递给 SchedulerBackend,再由 SchedulerBackend 分发到 Executors 中的 ExecutorBackend。Executors 接收到 Task 之后,即调用本地线程池来执行分布式任务。

Spark部署方式中的client和cluster

这部分内容摘自简书。这部分内容和Yarn相关。

总结下来Standlone模式下,Application的注册(Executor的申请)以及任务调度都是由Driver来的。Spark on yarn模式下,Application的注册(Executor的申请)由AM进行,任务调度由Driver,是分离开来的。

Client

Driver运行在客户端(启动在本地机器上)。
这里Driver会负责所有任务的调度,频繁与集群上的多个Executor通信(Task启动消息、执行统计消息、运行状态、Shuffle输出结果等),频繁大量的通信(一般情况本地机器与集群不在一个机房),会导致本地通信负载高。但是所有的日志都会在本地机器上看到,适合做测试时候使用。

Spark-Standlone-Client

客户端启动后直接运行程序启动Driver相关工作:DAGScheduler和BlockManagerMaster等。客户端的Driver向Master注册。Master会让Worker启动Executor,Worker创建一个ExecutorRunner线程,executorRunner会启动ExecutorBackend进程。ExecutorBackend启动后向Driver的ExecutorBackend注册,Driver的DAGScheduler解析作业并生成相应的Stage,每个Stage包含的task通过taskScheduler分配给executor执行。最后所有的stage执行完之后作业结束。

Spark-Yarn-Client

本地机器提交Application到ResourceManager,会直接在本地机器启动Driver程序,RM在资源充足的一台NodeManager启动ApplicationMaster,之后AM向RM申请Executor,RM根据各个节点的资源情况分配container,并告知AM,各个NodeManager启动Executor,启动之后各个NodeManager向Driver反向注册,这时候Driver就知道自己有哪些资源可以使用了,然后就可以执行Job、拆分Stage等的计算任务了。

Cluster

Driver执行在任务节点(在其中一台NodeManager,和ApplicationMaster运行在一起)。

Spark-Standlone-Cluster

作业提交给Master,Master让一个worker启动Driver(SchedulerBackend),创建一个DriverRunner线程,启动SchedulerBackend进程。Master会让其他worker启动Executor(ExecutorBackend)创建一个ExecutorRunner线程,启动ExecutorBackend进程。ExecutorBackend启动之后向Driver的SchedulerBackend注册,SchedulerBackend进程中包含DAGScheduler,它会根据用户程序生成执行计划并调度执行。对每个stage的task都会存放到taskScheduler中,当ExecutorScheduler向SchedulerBackend汇报的时候吧taskScheduler中的task调度到ExecutorBackend执行。最后所有的stage运行完之后作业结束。

Spark-Yarn-Cluster

本地机器提交Application到ResourceManager,RM在资源充足的一台NodeManager启动ApplicationMaster,Driver与其一同启动,之后向RM申请Executor,RM根据各个节点的资源情况分配container,并告知AM,ApplicationManager向各个NodeManager启动executor,启动之后各个NodeManager向AM反向注册,这时候ApplicationManager(Driver)就知道自己有哪些资源可以使用了,然后就可以执行Job、拆分Stage等的计算任务了。

Structured Streaming

Spark 的流处理框架。流计算场景中,有 3 个重要的基础概念,依次是 Source、流处理引擎与 Sink。其中Source 是流计算的数据源头,也就是源源不断地产生数据的地方。与之对应,Sink 指的是数据流向的目的地,也就是数据要去向的地方在框架下。对于数据源Source ,Spark 支持 Socket、File 和 Kafka,而对于 Sink,Spark 支持 Console、File、Kafka 和 Foreach(Batch)。

对于结果的输出,Structured Streaming 输出模式主要有 3 种,分别是 Complete mode、Append mode 和 Update mode。其中,Complete mode 输出到目前为止处理过的所有数据,而 Update mode 仅输出在当前批次有所更新的数据内容。

Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统(消息队列)。每秒几十万条消息吞吐可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力。

消息/数据的发布者/生产者为Producer,订阅者/消费者为Consumer,Kafka集群作为消息传递的中间件。集群依赖ZooKeeper来保存集群的的元信息,来保证系统的可用性。

Broker

消息中间件的处理节点,一个Kafka节点就是一个Broker,一个或多个Broker可以组成一个Kafka集群。可以认为每个Broker对应一台服务器。每个Kafka集群内的Broker都有一个不重复的编号。

Topic

消息的主题,可以理解为消息的分类。在Kafka中是一个逻辑上的概念,类似于数据库的 table 或者 ES 的 Index。在每个Broker上都可以创建多个Topic。不同的Topic会被订阅该Topic的消费者消费。但是如果Topic中有很多的消息,多到几个TB,为了解决文件过大问题,引入了分区Partition的概念。

Partition

物理上的概念,类似于ES的 shard ,类似于HBase 的Region(HRegion)。

Partition的作用是提高Kafka的吞吐量。每个Topic一般都有多个Partition。同一个Topic在不同的Partition的数据是不重复的。Partition的在服务器上的形式就是一个一个的文件夹。

Offset(偏移量)是一个占8byte的有序id号,记录当前 Topic 消费的进度,可以唯一确定每条消息在parition内的位置。每个Partition中的数据都是有序的(数据的插入顺序),新增的数据都会分配到队列尾部,所以在消费同一个 Partition 的时候可以保证数据有序。

写入到不同的Partition时,是可以并行执行的,所以大大提高了效率,提高了服务系统的吞吐量。

多个消费者可以组成一个消费者组(Consumer Group),每个消费者组都有一个组id。整个消费者组共享一组Offset(防止数据被重复读取),因为一个Topic有多个Partition。同一个消费组者的消费者可以消费同一Topic下不同Partition的数据,但是不会组内多个消费者消费同一Partition的数据。也就是Partition一次只能对应一个消费者组中的一个消费之。所以建议消费者组的Consumer的数量与Partition的数量一致,否则多出来的Consumer无法读取Partition。

Replication

Partition的副本包含Leader和Follower。当主分区(Leader)故障的时候ZooKeeper会选择一个副本(Follower)上位,成为Leader。在Kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量。同一个Partition的Leader和Follower不可以在同一个Broker上,同一个Topic的Leader和Follower可以在同一个Broker上。

Producer在写入数据的时候永远的找Leader,不会直接将数据写入Follower。

Flink

Flink是由Apache开发的开源分布式流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink可以执行批处理和流处理程序,Flink 将批处理(即处理有限的静态数据)视作一种特殊的流处理。

Flink 运行时由两种类型的进程组成:至少一个 JobManager 和一个或者多个 TaskManager。 在高可用(HA)模式中,可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby。JobManager 为 Master 节点,TaskManager 为 Worker 节点。

所有组件之间的通信都是借助于 Akka Framework,包括任务的状态以及 Checkpoint 触发等信息。客户端Client负责将任务提交到集群,与 JobManager 构建 Akka 连接,然后将任务提交到 JobManager,通过和 JobManager 之间进行交互获取任务执行状态。客户端提交任务可以采用 CLI 方式或者通过使用 Flink WebUI 提交,也可以在应用程序中指定 JobManager 的 RPC 网络端口构建 ExecutionEnvironment 提交 Flink 应用。

JobManager

JobManager 协调 Flink 应用程序的分布式执行。它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:

ResourceManager: 负责 Flink 集群中的资源提供、回收、分配 。 它管理 task
slots,这是 Flink 集群中资源调度的单位。Flink 为不同的环境和资源提供者(例如
YARN、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。
Dispatcher:提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。
JobMaster:负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。

JobManager 和 TaskManager 之间通过 Actor System 进行通信,获取任务执行的情况并通过 Actor System 将应用的任务执行情况发送给客户端。

同时在任务执行的过程中,Flink JobManager 会触发 Checkpoint 操作,每个 TaskManager 节点 收到 Checkpoint 触发指令后,完成 Checkpoint 操作,所有的 Checkpoint 协调过程都是在 Fink JobManager 中完成。

当任务完成后,Flink 会将任务执行的信息反馈给客户端,并且释放掉 TaskManager 中的资源以供下一次提交任务使用。

TaskManager

TaskManager负责具体的任务执行和对应任务在每个节点上的资源申请和管理。它执行作业流的 task,并且缓存和交换数据流。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。

对于分布式执行,Flink 将算子的 subtasks 链接成 tasks。每个 task 由一个线程执行。将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量。

每个 task slot 代表 TaskManager 中资源的固定子集。例如,具有 3 个 slot 的 TaskManager,会将其托管内存 1/3 用于每个 slot。分配资源意味着 subtask 不会与其他作业的 subtask 竞争托管内存,而是具有一定数量的保留托管内存。注意此处没有 CPU 隔离;当前 slot 仅分离 task 的托管内存。

通过调整 task slot 的数量,用户可以定义 subtask 如何互相隔离。每个 TaskManager 有一个 slot,这意味着每个 task 组都在单独的 JVM 中运行。具有多个 slot 意味着更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 连接(通过多路复用)和心跳信息。它们还可以共享数据集和数据结构,从而减少了每个 task 的开销。

客户端通过将编写好的 Flink 应用编译打包,提交到 JobManager,然后 JobManager 会根据已注册在 JobManager 中 TaskManager 的资源情况,将任务分配给有资源的 TaskManager节点,然后启动并运行任务。

TaskManager 从 JobManager 接收需要部署的任务,然后使用 Slot 资源启动 Task,建立数据接入的网络连接,接收数据并开始数据处理。同时 TaskManager 之间的数据交互都是通过数据流的方式进行的。

Yarn

Yarn 作为分布式任务调度框架,并不是随 Hadoop 的推出一开始就有的。在 MapReduce 应用程序的启动过程中,最重要的就是要把 MapReduce 程序分发到大数据集群的服务器上,在 Hadoop 1 中,这个过程主要是通过 TaskTracker 和 JobTracker 通信来完成。

这种架构方案的主要缺点是,服务器集群资源调度管理和 MapReduce 执行过程耦合在一起,如果想在当前集群中运行其他计算任务,比如 Spark 或者 Storm,就无法统一使用集群中的资源了。

在 Hadoop 早期的时候,大数据技术就只有 Hadoop 一家,这个缺点并不明显。但随着大数据技术的发展,各种新的计算框架不断出现,我们不可能为每一种计算框架部署一个服务器集群,而且就算能部署新集群,数据还是在原来集群的 HDFS 上。所以我们需要把 MapReduce 的资源管理和计算框架分开,这也是 Hadoop 2 最主要的变化,就是将 Yarn 从 MapReduce 中分离出来,成为一个独立的资源调度框架。

Yarn 是“Yet Another Resource Negotiator”的缩写,字面意思就是“另一种资源调度器”。事实上,在 Hadoop 社区决定将资源管理从 Hadoop 1 中分离出来,独立开发 Yarn 的时候,业界已经有一些大数据资源管理产品了,比如 Mesos 等,所以 Yarn 的开发者索性管自己的产品叫“另一种资源调度器”。

Yarn 包括两个部分:一个是资源管理器(Resource Manager),一个是节点管理器(Node Manager)。这也是 Yarn 的两种主要进程:ResourceManager 进程负责整个集群的资源调度管理,通常部署在独立的服务器上;NodeManager 进程负责具体服务器上的资源和任务管理,在集群的每一台计算服务器上都会启动,基本上跟 HDFS 的 DataNode 进程一起出现。

Resource Manager

资源管理器又包括两个主要组件:调度器(Scheduler)和应用程序管理器(ApplicationManager)。

调度器(Scheduler)其实就是一个资源分配算法,根据应用程序(Client)提交的资源申请和当前服务器集群的资源状况进行资源分配。Yarn 内置了几种资源调度算法,包括 Fair Scheduler、Capacity Scheduler 等,你也可以开发自己的资源调度算法供 Yarn 调用。

应用程序管理器(ApplicationManager)负责应用程序的提交、监控应用程序运行状态等。应用程序启动后需要在集群中运行一个 ApplicationMaster,ApplicationMaster 也需要运行在容器里面。每个应用程序启动后都会先启动自己的 ApplicationMaster,由 ApplicationMaster 根据应用程序的资源需求进一步向 ResourceManager 进程申请容器资源,得到容器以后就会分发自己的应用程序代码到容器上启动,进而开始分布式计算。

注意是分发自己的应用程序代码到容器上启动开始分布式计算。如果以传统的思路来看,是程序运行着不动,然后数据进进出出不停流转。但当数据量大的时候就没法这么玩了,因为海量数据移动成本太大,时间太长。大数据分布式计算就是这种思想,既然大数据难以移动,那我就把容易移动的应用程序发布到各个节点进行计算。

Node Manager

Yarn 进行资源分配的单位是容器(Container),每个容器包含了一定量的内存、CPU 等计算资源,默认配置下,每个容器包含一个 CPU 核心。容器由 NodeManager 进程启动和管理,NodeManger 进程会监控本节点上容器的运行状况并向 ResourceManger 进程汇报。

以一个 MapReduce 程序为例,来看一下 Yarn 的整个工作流程。

  1. 我们向 Yarn 提交应用程序,包括 MapReduce ApplicationMaster、我们的 MapReduce 程序,以及 MapReduce Application 启动命令。
  2. ResourceManager 进程和 NodeManager 进程通信,根据集群资源,为用户程序分配第一个容器,并将 MapReduce ApplicationMaster 分发到这个容器上面,并在容器里面启动 MapReduce ApplicationMaster。
  3. MapReduce ApplicationMaster 启动后立即向 ResourceManager 进程注册,并为自己的应用程序申请容器资源。
  4. MapReduce ApplicationMaster 申请到需要的容器后,立即和相应的 NodeManager 进程通信,将用户 MapReduce 程序分发到 NodeManager 进程所在服务器,并在容器中运行,运行的就是 Map 或者 Reduce 任务。
  5. Map 或者 Reduce 任务在运行期和 MapReduce ApplicationMaster 通信,汇报自己的运行状态,如果运行结束,MapReduce ApplicationMaster 向 ResourceManager 进程注销并释放所有的容器资源。

MapReduce 如果想在 Yarn 上运行,就需要开发遵循 Yarn 规范的 MapReduce ApplicationMaster,相应地,其他大数据计算框架也可以开发遵循 Yarn 规范的 ApplicationMaster,这样在一个 Yarn 集群中就可以同时并发执行各种不同的大数据计算框架,实现资源的统一调度管理。比如上文中提到了Spark有多种分布式部署模式:Standalone 、YARN、Mesos、Kubernetes 。

Q.E.D.