Hadoop RPC 网络通信框架原理剖析

深入理解 Hadoop (一)网络通信架构与源码浅析_大数据

YARN RPC 服务端的工作大致可以分为四个阶段:

第一个阶段: Server 初始化和启动

在 Server 初始化的时候,会初始化 Listener 组件(内部启动了一个 AcceptSelector 绑定了相应的端口,用来处理客户端的 OP_ACCEPT 事件),内部还初始化了一组 Reader 线程,其实就是启动了 ReaderSelector,用来处理 OP_READ 事件。还启动一个 Responder 线程用来处理响应。 在 Server 调用 start() 方法启动的时候,启动 Listener 线程 和 Responder 线程。然后还初始化了一组 Handler 线程,专门用来处理存储在 callQueue 的 RpcCall 请求。

第二个阶段: 接收请求,封装 RpcCall

代码入口就是 Listener 的 run() 方法接收来自各个客户端的 RPC 请求,并将他们封装成 RpcCall 类放到一个共享队列 callQueue 中。再详细点,该过程可以分为建立链接和接受请求两个阶段,分 别由 Listener 和 Reader 两种线程完成。 具体来讲,首先 Listener 接受客户端的请求,然后通过轮询的方式从 Reader 中获取一个线程用来处理 OP_READ 事件读取 RPC 请求数据,如果对应客户端有数据写过来,则该 Reader 负责读取数据,然后封装成 RpcCall 加入到 callQueue 队列中等待 Handler 来执行下一步 处理。

第三个阶段: 处理 RpcCall 请求

Handler 从 callQueue 中获取 RpcCall 来执行处理,并执行对应的 Rpc 函数调用,将得到的结果返回给客户端。

第四个阶段: 返回结果

在 Server 内部只有一个 Responder 线程。它内部的 WriteSelector 负责监听 OP_WRITE 事件。如果 Responder 并不能一次性将结果写出去,就注册 OP_WRITE 事件分多次异步写。

Hadoop RPC 源码分析

RPC 客户端源码分析

客户端请求三层调用关系

// 1、proxy.method()
// 2、invoker.invoke()
// 3、client.call()

Hadoop RPC 客户端的工作机制

当我们获取一个某个通信协议的代理对象的时候,我们可以通过这个代理对象和 Server 进行通信,请求 Server 端的服务 客户端获取的代理对象内部,是构建一个 Invoker,也就是一个 InvocationHandler,当发送 RPC 请求的时候,实际上,就会回调了 Invoker.invoke() 方法 Invoker 的内部封装了 Client,当我们真的通过代理对象调用某个方法的时候,实际上,底层是通过 Client.call() 发送一个 RPC 请求给服务端,再获取到结果 Call 方法的具体实现,分为四步走:

第一步: 构建 Call 对象

第二步: 构建 Connection 链接对象,实际上内部维护了一个 Socket 客户端

第三步: 通过 Connection 发送 RPC 请求到服务端,并且把获取到的结果设置到 Call 对象的 rpcResponse 成员变量上

第四步: 从 Call 上获取 RPC 执行结果

RPC 服务端源码分析

Key - Connection - Channel 一一对应, Connection 又会封装在 Call 之中。调用过程如下如所示:

深入理解 Hadoop (一)网络通信架构与源码浅析_hadoop_02

HDFS RPC 通信协议整理

深入理解 Hadoop (一)网络通信架构与源码浅析_通信协议_03

ClientProtocol(Client -> NameNode) Client 和 NameNode 之间的通信协议,DFSClient 通过这个协议可以操纵HDFS的目录命名空间、打开与关闭文件流,比如向 NameNode 发送 RPC 请求创建文件夹,删除文件等等。

DatanodeProtocol(NameNode -> DataNode) NameNode 和 DataNode之间的通信协议,DataNode 通过此协议向 NameNode 注册,心跳,块汇报等。

ClientDatanodeProtocol(Client -> DataNode) Client 和 DataNode之间的通信协议,用来进行块恢复。

InterDatanodeProtocol(DataNode -> DataNode) DataNode 和 DataNode 之间的通信协议,该协议用于Datanode 进程之间进行通信。

NamenodeProtocol(NameNode -> SecondaryNameNode) NameNode 和 SecondaryNameNode 之间的通信协议,负责进行 checkpoint 相关工作的交互。

DataTransferProtocol(DataNode -> DataNode) DataNode 和 DataNode 之间的通信协议,负责进行 DataNode 相互之间的数据传输的通信协议。

YARN RPC 通信协议整理

深入理解 Hadoop (一)网络通信架构与源码浅析_通信协议_04

ApplicationClientProtocol(Client -> RM) clients 与 RM 之间的协议, JobClient 通过该 RPC 协议提交应用程序、 查询应用程序状态等。

ResourceTracker(NM -> RM) NM 与 RM 之间的协议, NM 通过该 RPC 协议向 RM 注册, 并定时发送心跳信息汇报当前节点的资源使用情况和 Container 运行情况。

ApplicationMasterProtocol(AM -> RM) AM 与 RM 之间的协议, AM 通过该 RPC 协议向 RM 注册和撤销自己, 并为各个任务申请资源。

ContainerManagementProtocol(AM -> NM) AM 与 NM 之间的协议, AM 通过该 RPC 要求 NM 启动或者停止 Container, 获取各个 Container 的使用状态等信息。

ResourceManagerAdministrationProtocol(RM Admin -> RM) Admin 与 RM 之间的通信协议, Admin 通过该 RPC 协议更新系统配置文件, 例如节点黑白名单等。

HAServiceProtocol(Active RM HA Framework Standby RM) Active RM 和 Standby RM 之间的通信协议,提供状态监控和 fail over 的 HA 服务。

TaskUmbilicalProtocol(YarnChild -> MRAppMaster) YarnChild 和 MRAppMaster 之间的通信协议,用于 MRAppMaster 监控跟踪 YarnChild 的运行状态,YarnChild 向 MRAppMaster 拉取 Task 任务信息。

MRClientProtocol(JobClient -> ApplicationMaster) JobClient 和 ApplicationMaster 之间的通信协议。用于客户端拉取 Application 的执行状态,以及 Application 返回执行结果给 客户端。