AsyncDispatcher
Last updated
Was this helpful?
Last updated
Was this helpful?
中央异步调度器,主要协调不同状态机之间交流,是YARN中事件处理的核心。 事件处理跟事件,服务(状态)关系密切。 如下图所示,是一个状态机处理事件,以及不同状态机通过中央异步调度器交流的视图:
如果仅仅从事件处理角度来分析YARN 的事件处理模型可以概括为: AsyncDispatcher(中央异步调度器)将对应的事件分发给事件处理器(handler)或状态机处理,并触发新的事件,直到没有新的事件产生。
AsyncDispatcher 首先一个服务,然后是一个事件处理器。内部有一个循环线程将收到的事件交给对应的事件处理器。
内部缓存
AsyncDispatcher 内部有两个临时缓存
BlockingQueue eventQueue 以及 Map, EventHandler> eventDispatchers(事件类型和handler处理关系)
前者缓存收到的事件,后者放事件和对应事件处理。
内部核心逻辑
中央异步处理器启动之后会创建一个新的线程 ”AsyncDispatcher event handler“
“AsyncDispatcher event handler” 线程不断从eventQueue 队列拿出事件,通过 dispatch(event) 方法 将事件分发给 eventDispatchers 存储的 handler(EventHandler)
实现逻辑核心如下:
构建事件和处理的对应关系
向外提供 register(Class<? extends Enum> eventType,EventHandler handler) 方法
该方法将事件和handler 写入缓存eventDispatchers。
为了能够兼容一个事件对应多个handler的情况,创建了 MultiListenerHandler 对象(内部缓存交由List> listofHandlers;)。
生成事件,触发操作
向外提供 getEventHandler 方法
当某个对象要触发一个在本对象处理不了的事件时,通过改方法获取 EventHandler 对象。如此便可以使用 EventHandler.handle(Event event) 将需要中央异步处理器处理的事件 写入 eventQueue 缓存。
为了能够处理写入缓存的异常情况,创建了 GenericEventHandler 对象。
下面以 NodeManager 为例,看看 AsyncDispatcher 如何使用
NodeManager启动
NodeManager 启动脚本 bin/YARN 启动脚步指定了 YARN 入口 class
NodeManager.main() 入口函数调用 nodeManager.initAndStartNodeManager(conf, false);
创建中央异步调度器
nodeManager.initAndStartNodeManager 会调用 this.init(conf)
通过前文 YARN 服务化 可知,this.init(conf) 最终调用的是 NodeManager 重写的 serviceInit() 方法。
在 NodeManager.serviceInit(Configuration conf) 方法中
初始化了中央异步处理器
注册事件和对应处理
事件处理核心逻辑
ContainerManagerEventType 事件类型对应的处理(containerManager)的处理逻辑:
NodeManagerEventType 事件类型对应的处理(this)的处理逻辑:
4.3 生成事件
上面说了中央异步处理模型,并解释了是如何注册事件和事件处理,下面说说是如何生成事件。
NodeManager 在 serviceInit() 方法中会通过 createNodeStatusUpdater() 初始化一个 NodeStatusUpdaterImpl 服务。
该服务在启动过程中(执行 serviceStart 方法)会调用 startStatusUpdater 方法,该方法会新建一个线程“ Node Status Updater”。
”Node Status Updater“ 不断循环向 resourceManager 发送心跳,并根据返回结果,决定是否将相关事件发给中央异步处理器
发送方式如下: