public static void main(String argv[]) { //未捕获异常处理类 Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); try { //载入控制文件 Configuration conf = new YarnConfiguration(); //创建空RM对象,并未包含任何服务,也未启动 ResourceManager resourceManager = new ResourceManager(); //添加关闭钩子 ShutdownHookManager.get().addShutdownHook( new CompositeServiceShutdownHook(resourceManager), SHUTDOWN_HOOK_PRIORITY); setHttpPolicy(conf); //初始化服务 resourceManager.init(conf); //启动RM resourceManager.start(); } catch (Throwable t) { LOG.fatal("Error starting ResourceManager", t); System.exit(-1); } }上述main函数中主要分析服务初始化和服务启动,RM是个综合服务类继承结构CompositeService->AbstractService,RM初始化是会先进入父类的init函数,AbstractService抽取了服务的基本操作如start、stop、close,只要我们的服务覆盖serviceStart、serviceStop、serviceInit等函数就可以控制自己的服务了,这相当于对服务做了统一的管理。
@Override public void init(Configuration conf) { //服务配置是否为空 if (conf == null) { throw new ServiceStateException("Cannot initialize service " + getName() + ": null configuration"); } //服务是否已经初始化 if (isInState(STATE.INITED)) { return; } synchronized (stateChangeLock) { if (enterState(STATE.INITED) != STATE.INITED) { setConfig(conf); try { //服务初始化,会进入子类RM的同名函数 serviceInit(config); if (isInState(STATE.INITED)) { //if the service ended up here during init, //notify the listeners notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } }RM的serviceInit会初始化所需服务,会创建相应的服务类然后加入服务列表
@Override protected void serviceInit(Configuration conf) throws Exception { //校验配置合法性,yarn.resourcemanager.am.max-attempts ,validate expireIntvl >= heartbeatIntvl validateConfigs(conf); this.conf = conf; //创建RM上下文,初始化内部数据结构如:application、nodes、inactiveNodes this.rmContext = new RMContextImpl(); // register the handlers for all AlwaysOn services using setupDispatcher(). //创建异步事件分发器服务,内部主要包含两个数据结构1、存放事件的阻塞队列 2、存放事件类型和处理器的map集合。 //当一个事件注册时实际是将事件类型和处理器放入一个map集合,需要注意的是处理器可能包含多个,此时处理器类型 //为multiHandler rmDispatcher = setupDispatcher(); //将服务加入服务列表 addIfService(rmDispatcher); //更新RM上下文对象 rmContext.setDispatcher(rmDispatcher); //创建管理员服务,专门为管理员提供的服务:队列刷新、节点刷新、ACL控制等 adminService = createAdminService(); //同上 addService(adminService); //同上 rmContext.setRMAdminService(adminService); //HA选项,暂不分析 this.rmContext.setHAEnabled(HAUtil.isHAEnabled(conf)); if (this.rmContext.isHAEnabled()) { HAUtil.verifyAndSetConfiguration(conf); } //服务初始化操作,最终调用RM的serviceInit对各个服务进行初始化 createAndInitActiveServices(); webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(conf); super.serviceInit(conf); }RM中对父类的serviceInit函数做了覆盖,用以初始化自身服务
@Override protected void serviceInit(Configuration configuration) throws Exception { conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); rmSecretManagerService = createRMSecretManagerService(); addService(rmSecretManagerService); //容器分配超时监控服务 containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher); addService(containerAllocationExpirer); rmContext.setContainerAllocationExpirer(containerAllocationExpirer); //ApplicationMaster状态监控 AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); addService(amLivelinessMonitor); rmContext.setAMLivelinessMonitor(amLivelinessMonitor); AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); addService(amFinishingMonitor); rmContext.setAMFinishingMonitor(amFinishingMonitor); //HA相关内容,RM在failover后是否允许恢复 boolean isRecoveryEnabled = conf.getBoolean( YarnConfiguration.RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); RMStateStore rmStore = null; if(isRecoveryEnabled) { recoveryEnabled = true; rmStore = RMStateStoreFactory.getStore(conf); } else { recoveryEnabled = false; rmStore = new NullRMStateStore(); } try { rmStore.init(conf); rmStore.setRMDispatcher(rmDispatcher); } catch (Exception e) { // the Exception from stateStore.init() needs to be handled for // HA and we need to give up master status if we got fenced LOG.error("Failed to init state store", e); throw e; } rmContext.setStateStore(rmStore); if (UserGroupInformation.isSecurityEnabled()) { delegationTokenRenewer = createDelegationTokenRenewer(); rmContext.setDelegationTokenRenewer(delegationTokenRenewer); } // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); addService(nodesListManager); rmContext.setNodesListManager(nodesListManager); // Initialize the scheduler scheduler = createScheduler(); rmContext.setScheduler(scheduler); schedulerDispatcher = createSchedulerEventDispatcher(); addIfService(schedulerDispatcher); rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); // Register event handler for RmAppEvents rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(rmContext)); // Register event handler for RmAppAttemptEvents rmDispatcher.register(RMAppAttemptEventType.class, new ApplicationAttemptEventDispatcher(rmContext)); // Register event handler for RmNodes rmDispatcher.register( RMNodeEventType.class, new NodeEventDispatcher(rmContext)); //NodeManager监控服务 nmLivelinessMonitor = createNMLivelinessMonitor(); addService(nmLivelinessMonitor); //资源跟踪服务,处理来自NodeManager的请求 resourceTracker = createResourceTrackerService(); addService(resourceTracker); rmContext.setResourceTrackerService(resourceTracker); //Metrics服务 DefaultMetricsSystem.initialize("ResourceManager"); JvmMetrics.initSingleton("ResourceManager", null); try { scheduler.reinitialize(conf, rmContext); } catch (IOException ioe) { throw new RuntimeException("Failed to initialize scheduler", ioe); } // creating monitors that handle preemption createPolicyMonitors(); //处理来自appMaster的请求,包括注册和心跳 masterService = createApplicationMasterService(); addService(masterService) ; rmContext.setApplicationMasterService(masterService); //ACL管理服务 applicationACLsManager = new ApplicationACLsManager(conf); queueACLsManager = createQueueACLsManager(scheduler, conf); rmAppManager = createRMAppManager(); // Register event handler for RMAppManagerEvents rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); //客户端RPC请求 clientRM = createClientRMService(); rmContext.setClientRMService(clientRM); addService(clientRM); rmContext.setClientRMService(clientRM); //appMaster启动服务 applicationMasterLauncher = createAMLauncher(); rmDispatcher.register(AMLauncherEventType.class, applicationMasterLauncher); addService(applicationMasterLauncher); if (UserGroupInformation.isSecurityEnabled()) { addService(delegationTokenRenewer); delegationTokenRenewer.setRMContext(rmContext); } new RMNMInfo(rmContext, scheduler); //进入父类初始化 函 数 //进入父类初始化函数,获得服务列表,循环内上述添加到列表的服务进行初始化(serviceInit) super.serviceInit(conf); }
Hadoop ResourceManager启动之服务初始化,布布扣,bubuko.com
Hadoop ResourceManager启动之服务初始化
原文:http://blog.csdn.net/lihm0_1/article/details/21459503