Hadoop yarn 技术分享
  • 介绍
  • YARN Service
  • 事件处理
    • AsyncDispatcher
    • stateMachine
  • YARN RPC
    • RPC 设计
    • RPC 实现
  • 交互协议
  • 提交app的过程
    • 提交APP概要
    • 客户端提交app
    • RM接收请求
    • RM分配App
    • RM下发App
    • NM拉起APP
  • YARN 高可用
    • RM HA 基础
    • RM HA
  • YARN调度策略
    • Untitled
    • YARN-队列配置
  • 问题分析总结
    • 集群繁忙时偶发性空指针导致app执行失败
    • YARN 资源充足,但app等待调度排长队
    • YARN UI看不了app 日志
    • YARN UI看不了app 日志2
    • TEZ 资源不释放问题分析
    • NM频繁挂掉
    • container执行异常
    • YARN RM主备切换异常
Powered by GitBook
On this page
  • 1. 什么是中央异步调度器
  • 2. 中央异步调度器具体功能
  • 3. 中央异步调度器内部实现
  • 3.1. 概要
  • 3.2 内部实现
  • 4. 中央异步调度器使用例子
  • 4.1 中央异步调度器初始化
  • 4.2 注册事件和对应handler

Was this helpful?

  1. 事件处理

AsyncDispatcher

Previous事件处理NextstateMachine

Last updated 5 years ago

Was this helpful?

1. 什么是中央异步调度器

中央异步调度器,主要协调不同状态机之间交流,是YARN中事件处理的核心。 事件处理跟事件,服务(状态)关系密切。 如下图所示,是一个状态机处理事件,以及不同状态机通过中央异步调度器交流的视图:

2. 中央异步调度器具体功能

如果仅仅从事件处理角度来分析YARN 的事件处理模型可以概括为: AsyncDispatcher(中央异步调度器)将对应的事件分发给事件处理器(handler)或状态机处理,并触发新的事件,直到没有新的事件产生。

3. 中央异步调度器内部实现

3.1. 概要

AsyncDispatcher 首先一个服务,然后是一个事件处理器。内部有一个循环线程将收到的事件交给对应的事件处理器。

3.2 内部实现

  1. 内部缓存

    AsyncDispatcher 内部有两个临时缓存

    BlockingQueue eventQueue 以及 Map, EventHandler> eventDispatchers(事件类型和handler处理关系)

    前者缓存收到的事件,后者放事件和对应事件处理。

  2. 内部核心逻辑

    中央异步处理器启动之后会创建一个新的线程 ”AsyncDispatcher event handler“

    “AsyncDispatcher event handler” 线程不断从eventQueue 队列拿出事件,通过 dispatch(event) 方法 将事件分发给 eventDispatchers 存储的 handler(EventHandler)

    实现逻辑核心如下:

    Class<? extends Enum> type = event.getType().getDeclaringClass();
    EventHandler handler = eventDispatchers.get(type);
    handler.handle(event);
  3. 构建事件和处理的对应关系

    向外提供 register(Class<? extends Enum> eventType,EventHandler handler) 方法

    该方法将事件和handler 写入缓存eventDispatchers。

    为了能够兼容一个事件对应多个handler的情况,创建了 MultiListenerHandler 对象(内部缓存交由List> listofHandlers;)。

  4. 生成事件,触发操作

    向外提供 getEventHandler 方法

    当某个对象要触发一个在本对象处理不了的事件时,通过改方法获取 EventHandler 对象。如此便可以使用 EventHandler.handle(Event event) 将需要中央异步处理器处理的事件 写入 eventQueue 缓存。

    为了能够处理写入缓存的异常情况,创建了 GenericEventHandler 对象。

4. 中央异步调度器使用例子

下面以 NodeManager 为例,看看 AsyncDispatcher 如何使用

4.1 中央异步调度器初始化

  1. NodeManager启动

    NodeManager 启动脚本 bin/YARN 启动脚步指定了 YARN 入口 class

    CLASS='org.apache.hadoop.YARN.server.nodemanager.NodeManager',

    NodeManager.main() 入口函数调用 nodeManager.initAndStartNodeManager(conf, false);

  2. 创建中央异步调度器

    nodeManager.initAndStartNodeManager 会调用 this.init(conf)

    在 NodeManager.serviceInit(Configuration conf) 方法中

    初始化了中央异步处理器

    this.dispatcher = new AsyncDispatcher();

4.2 注册事件和对应handler

  1. 注册事件和对应处理

    dispatcher.register(ContainerManagerEventType.class, containerManager);
    dispatcher.register(NodeManagerEventType.class, this);
  2. 事件处理核心逻辑

    ContainerManagerEventType 事件类型对应的处理(containerManager)的处理逻辑:

    @Override
    public void handle(ContainerManagerEvent event) {
    switch (event.getType()) {
    case FINISH_APPS:
     CMgrCompletedAppsEvent appsFinishedEvent =
         (CMgrCompletedAppsEvent) event;
     for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) {
       .......
       this.dispatcher.getEventHandler().handle(
           new ApplicationFinishEvent(appID,
               diagnostic));
     }
     break;
    case FINISH_CONTAINERS:
     CMgrCompletedContainersEvent containersFinishedEvent =
         (CMgrCompletedContainersEvent) event;
     for (ContainerId container : containersFinishedEvent
         .getContainersToCleanup()) {
         this.dispatcher.getEventHandler().handle(
             new ContainerKillEvent(container,
                 ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
                 "Container Killed by ResourceManager"));
     }
     break;
    default:
       throw new YARNRuntimeException(
           "Got an unknown ContainerManagerEvent type: " + event.getType());
    }
    }

    NodeManagerEventType 事件类型对应的处理(this)的处理逻辑:

    public void handle(NodeManagerEvent event) {
    switch (event.getType()) {
    case SHUTDOWN:
     shutDown();
     break;
    case RESYNC:
     resyncWithRM();
     break;
    default:
     LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
    }
    }

    4.3 生成事件

    上面说了中央异步处理模型,并解释了是如何注册事件和事件处理,下面说说是如何生成事件。

    NodeManager 在 serviceInit() 方法中会通过 createNodeStatusUpdater() 初始化一个 NodeStatusUpdaterImpl 服务。

    该服务在启动过程中(执行 serviceStart 方法)会调用 startStatusUpdater 方法,该方法会新建一个线程“ Node Status Updater”。

    ”Node Status Updater“ 不断循环向 resourceManager 发送心跳,并根据返回结果,决定是否将相关事件发给中央异步处理器

    发送方式如下:

    if (response.getNodeAction() == NodeAction.SHUTDOWN) {
    ...
    dispatcher.getEventHandler().handle(
     new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
    break;
    }
    if (response.getNodeAction() == NodeAction.RESYNC) {
    ...
    dispatcher.getEventHandler().handle(
     new NodeManagerEvent(NodeManagerEventType.RESYNC));
    pendingCompletedContainers.clear();
    break;
    }

通过前文 可知,this.init(conf) 最终调用的是 NodeManager 重写的 serviceInit() 方法。

YARN 服务化