RM接收请求
RM 接收到客户端提交到运行APP请求,然后发送一个命令给NM 让其启动一个APP 其过程可分为两步:接收请求,创建APP
1. 接收请求
客户端发送RPC请求之后,RM如何接收到到呢?
处理关系如下图所示:

1.1 连接准备
1.1.1 RM启动
ResourceManager.main 是RM 入口函数 ResourceManager 是一个service ,在其serviceInit 方法中,创建ClientRMService 服务
1.1.2 ClientRMService
创建YarnRPC对象,初始化org.apache.hadoop.ipc.Server 对象,并启动server。 这是RM 接收 客户端请求三个准备工作。
YarnRPC rpc = YarnRPC.create(conf);
this.server =
rpc.getServer(ApplicationClientProtocol.class, this,
clientBindAddress,
conf, this.rmDTSecretManager,
conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT));
this.server.start();1.1.3 YarnRPC 初始化
YarnRPC的实现类为 HadoopYarnProtoRPC
1.1.4 org.apache.hadoop.ipc.Server 创建
rpc.getServer 执行逻辑是 HadoopYarnProtoRPC.getServer HadoopYarnProtoRPC.getServer 核心是
因为 RpcFactoryProvider.getServerFactory(conf) 返回的是 RpcServerFactoryPBImpl 对象。 因而rpc.getServer 返回对象为 RpcServerFactoryPBImpl.getServer 返回结果。
1.1.5 RPC.Server创建
RpcServerFactoryPBImpl 创建server 最终执行代码如下:
1.2 Server 初始化
1.2.1 构建
上面 Builder.build 方法将返回一个Server 对象,实现逻辑是 getProtocolEngine().getServer() 。 getProtocolEngine 返回的是WritableRpcEngine 对象,所以 WritableRpcEngine.getServer方法返回的是RPC.Server 对象。(目前最新版本 默认不是 WritableRpcEngine,而是 ProtobufRpcEngine 但是逻辑差不多 ) WritableRpcEngine.Server 继承自 RPC.Server,RPC.Server 继承自 org.apache.hadoop.ipc.Server 初始化 Server 对象之后,会将注册的接口和接口对应实现放入缓存 RPC.Server.protocolImplMapArray RPC.Server.protocolImplMapArray 缓存内容为 getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version), new ProtoClassProtoImpl(protocolClass, protocolImpl))
1.2.2 org.apache.hadoop.ipc.Server 初始化
其中包括两个对象 listener = new Listener();和responder = new Responder(); 2.2.1 Listener 对象 其中包括 Reader 对象,负责OP_READ 事件处理 Listener 对象负责处理 OP_ACCEPT事件 2.2.2 回复由responder 处理 其监听的是writeSelector.select(PURGE_INTERVAL); 其内部有一个线程,循环处理回复,doRunLoop();
1.2.3 Server 启动
启动代码:
1.3 消息接收,处理,回复过程分析
当 new Listener(), new Responder() , new Handler(i) 初始化并启动。真正的处理过程才开始
1.3.1 消息处理
Listener 处理OP_ACCEPT事件,就通过reader.addConnection(c); 传递给 Reader ,Reader 内部有一个线程循环处理,最终处理读的是 Server.Connection.readAndProcess方法, 读分为三步: 1. 消息长度(放到dataLengthBuffer ) 2. 消息头(放到connectionHeaderBuf) 3. 消息内容(放到data )
processOneRpc(data.array()) 来处理消息,先解析消息头(processRpcOutOfBandRequest方法),再解析消息体(processRpcRequest方法) 解析消息体 包括: 1. 构建 Writable rpcRequest 对象( rpcRequest.readFields(dis) ) 2. 然后基于Writable rpcRequest 对象构建new Call() 对象。 3. 再将call 放入缓存 callQueue.put(call);
1.3.2 call 对象解析
Handler 内部线程循环遍历缓存 callQueue 获取call 对象 call 对象的处理逻辑在如下方法中: value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp); call 方法实现在 ProtobufRpcEngine 其处理核心是
获取value 对象之后,将处理结果放入response
分析上面的代码可知 setupResponse 方法,首先返回结果放入ByteArrayOutputStream 对象中,最后调用call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray())); 将返回结果放入call 对象中 接着 调用Responder.responder.doRespond(call); 将带有返回值的call 放入缓存call.connection.responseQueue 处理返回结果从 call = responseQueue.removeFirst();拿出一个,再通过 int numBytes = channelWrite(channel, call.rpcResponse); 利用channel.write(buffer) (或者channelIO 遍历写入) 将数据写回管道。
1.3.3 Server处理消息对象
记得最前面server启动的最后调用了
blockingService来自
service来自
constructor 来自
getPbServiceImplClassName 方法获取对应协议的实现。 假设protocol 是XXX ,所属包名是YYY ,则实现类在 YYY.impl.pb.service.XXXPBServiceImpl 这里协议指的是 ApplicationClientProtocol 则协议的实现为: org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl
值得注意的是 ApplicationClientProtocolPBServiceImpl 构造函数 传入的参数为 ClientRMService
2. 创建App
2.1 创建应用
客户端执行 submitApplication 提交一个执行应用请求之后 触发 RM 的
在 createAndPopulateNewRMApp 方法最后,创建对象RMAppImpl
Last updated
Was this helpful?