版本:hadoop2.2.0
源码在https://github.com/hortonworks/simple-yarn-app这里可以下载。之前一直试验这个simpleyarnapp一直没有成功过,作为yarn的hello world应该没有那么难运行吧。几经排查,发现还是classpath路径的问题。
首先,还是要按照http://blog.csdn.net/fansy1990/article/details/22896249配置环境。
这里说是classpath的问题,主要是指linux和windows里面设置java的classpath的方式是不同的。假如按照github上面的源码(由于我是使用windows提交任务的,所以会出现这样的问题,如果是linux提交任务则不会出现这样的问题),设置断点查看到的classpath的路径为:
{CLASSPATH=$HADOOP_CONF_DIR;$HADOOP_COMMON_HOME/share/hadoop/common/*;$HADOOP_COMMON_HOME/share/hadoop/common/lib/*;$HADOOP_HDFS_HOME/share/hadoop/hdfs/*;$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*;$HADOOP_YARN_HOME/share/hadoop/yarn/*;$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*;%PWD%\*}而使用修改过的源码,其路径为:{CLASSPATH=$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*:$PWD/*}
分号和冒号以及$和%的差别。client的源码如下:
package com.hortonworks.simpleyarnapp;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Client {
Logger log = LoggerFactory.getLogger(Client.class);
Configuration conf = new YarnConfiguration();
public void run(String[] args) throws Exception {
final String command = args[0];
final int n = Integer.valueOf(args[1]);
final Path jarPath = new Path(args[2]);
// Create yarnClient
// YarnConfiguraton extends Configuration
// YarnConfiguration conf = new YarnConfiguration();
conf.set("fs.defaultFS", "hdfs://node31:9000");
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.address", "node31:8032");
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
// Create application via yarnClient
YarnClientApplication app = yarnClient.createApplication();
// Set up the container launch context for the application master
ContainerLaunchContext amContainer =
Records.newRecord(ContainerLaunchContext.class);
amContainer.setCommands(
Collections.singletonList(
"$JAVA_HOME/bin/java" +
" -Xmx256M" +
/* " com.hortonworks.simpleyarnapp.Work" +*/
" com.hortonworks.simpleyarnapp.ApplicationMaster" +
" " + command +
" " + String.valueOf(n) +
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
)
);
// Setup jar for ApplicationMaster
LocalResource appMasterJar = Records.newRecord(LocalResource.class);
setupAppMasterJar(jarPath, appMasterJar);
amContainer.setLocalResources(
Collections.singletonMap("simpleapp.jar", appMasterJar));
// Setup CLASSPATH for ApplicationMaster
Map<String, String> appMasterEnv = new HashMap<String, String>();
setupAppMasterEnv(appMasterEnv);
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(256);
capability.setVirtualCores(1);
// Finally, set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appContext =
app.getApplicationSubmissionContext();
appContext.setApplicationName("simple-yarn-app"); // application name
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
appContext.setQueue("default"); // queue
// Submit application
ApplicationId appId = appContext.getApplicationId();
System.out.println("Submitting application " + appId);
log.info("Submitting application " + appId);
try {
yarnClient.submitApplication(appContext);
} catch (Exception e) {
e.printStackTrace();
}
/* log.info("-----------------------------------");
for(ApplicationReport appli:yarnClient.getApplications()){
log.info("appli.getApplicationType():"+appli.getApplicationType()+"\n"
+"appli.getHost():"+appli.getHost()+"\n"
+"appli.getOriginalTrackingUrl():"+appli.getOriginalTrackingUrl()+"\n"
+"appli.getTrackingUrl():"+appli.getTrackingUrl()+"\n"
+"appli.getUser():"+appli.getUser());
}
log.info("--------------------------------------");*/
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
while (appState != YarnApplicationState.FINISHED &&
appState != YarnApplicationState.KILLED &&
appState != YarnApplicationState.FAILED) {
Thread.sleep(100);
appReport = yarnClient.getApplicationReport(appId);
appState = appReport.getYarnApplicationState();
}
System.out.println(
"Application " + appId + " finished with" +
" state " + appState +
" at " + appReport.getFinishTime());
}
private void setupAppMasterJar(Path jarPath, LocalResource appMasterJar) throws IOException {
FileStatus jarStat = FileSystem.get(conf).getFileStatus(jarPath);
appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
appMasterJar.setSize(jarStat.getLen());
appMasterJar.setTimestamp(jarStat.getModificationTime());
appMasterJar.setType(LocalResourceType.FILE);
appMasterJar.setVisibility(LocalResourceVisibility.PUBLIC);
}
private static void addToEnvironment(
Map<String, String> environment,
String variable, String value) {
String val = environment.get(variable);
String separator = ":";
if (val == null) {
val = value;
} else {
val = val +separator + value;
}
environment.put(StringInterner.weakIntern(variable),
StringInterner.weakIntern(val));
}
private void setupAppMasterEnv(Map<String, String> appMasterEnv) {
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(),
c.trim());
}
/* Apps.addToEnvironment(appMasterEnv,
Environment.CLASSPATH.name(),
Environment.PWD.$() + File.separator + "*");*/
addToEnvironment(appMasterEnv,
Environment.CLASSPATH.name(),
"$PWD" + Path.SEPARATOR + "*");
}
public static void main(String[] args) throws Exception {
Client c = new Client();
String[] arg= {"/root/myShell.sh","1","hdfs://node31:9000/input/"};
/*String[] arg= {"java","1","hdfs://node31:9000/input/"};*/
c.run(arg);
}
}package com.hortonworks.simpleyarnapp;
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ApplicationMaster {
static Logger log = LoggerFactory.getLogger(ApplicationMaster.class);
public static void main(String[] args) throws Exception {
final String command = args[0];
final int n = Integer.valueOf(args[1]);
if(log.isDebugEnabled()){
log.debug("Entering the ApplicationMaster");
}
// Initialize clients to ResourceManager and NodeManagers
Configuration conf = new YarnConfiguration();
/*conf.set("fs.defaultFS", "hdfs://node31:9000");
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.address", "node31:8032");*/
AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
rmClient.init(conf);
rmClient.start();
NMClient nmClient = NMClient.createNMClient();
nmClient.init(conf);
nmClient.start();
// Register with ResourceManager
System.out.println("registerApplicationMaster 0");
rmClient.registerApplicationMaster("", 0, "");
System.out.println("registerApplicationMaster 1");
// Priority for worker containers - priorities are intra-application
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(0);
// Resource requirements for worker containers
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(128);
capability.setVirtualCores(1);
// Make container requests to ResourceManager
for (int i = 0; i < n; ++i) {
ContainerRequest containerAsk = new ContainerRequest(capability, null, null, priority);
System.out.println("Making res-req " + i);
rmClient.addContainerRequest(containerAsk);
}
// Obtain allocated containers and launch
int allocatedContainers = 0;
while (allocatedContainers < n) {
AllocateResponse response = rmClient.allocate(0);
for (Container container : response.getAllocatedContainers()) {
++allocatedContainers;
// Launch container by create ContainerLaunchContext
ContainerLaunchContext ctx =
Records.newRecord(ContainerLaunchContext.class);
ctx.setCommands(
Collections.singletonList(
command +
/* "$JAVA_HOME/bin/java" +*/
/*"/bin/bash /root/myShell.sh" +*/
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
));
System.out.println("Launching container " + allocatedContainers);
nmClient.startContainer(container, ctx);
}
Thread.sleep(100);
}
// Now wait for containers to complete
int completedContainers = 0;
while (completedContainers < n) {
AllocateResponse response = rmClient.allocate(completedContainers/n);
for (ContainerStatus status : response.getCompletedContainersStatuses()) {
++completedContainers;
System.out.println("Completed container " + completedContainers);
}
Thread.sleep(100);
}
// Un-register with ResourceManager
rmClient.unregisterApplicationMaster(
FinalApplicationStatus.SUCCEEDED, "", "");
}
}把上面两个文件编译打包放在$hadoop_home/share/hadoop/yarn/lib下面即可。
编写shell文件:在/root/myShell.sh中输入下面的内容:
#!/bin/bash touc "/root/a.txt" cho "oh ,it works !" > /root/a.txt
然后运行client的程序,查看/root下面是否有a.txt文件。如果有,则说明确实是执行了shell文件了。如果没有则说明有问题。(正常的情况下是有这个文件的)。同时如果把shell的错误改为正确的,还可以看到a.txt里面的文字:oh, it works !
这里可以知道运行shell其实是失败了的,但是在resourcemanager的log里面看到这个任务是成功的,并且没有提示其他错误信息。所以,这说明其实这个程序只是可以运行shell而已,至于是否运行正确或者错误就不管了?
又或者说是我的shell编写的太简单了,没有含有程序失败的控制之类的或者说是容错的程序代码段?可以看到在AppMaster中其实也是有log的
ctx.setCommands(
Collections.singletonList(
command +
/* "$JAVA_HOME/bin/java" +*/
/*"/bin/bash /root/myShell.sh" +*/
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
));command是在container中运行的,resourcemanager应该可以获取container的任务执行状态。所以应该是编写的shell没有通知到container来做相应的变化么?
另外,如果我去掉AppMaster的话,而是自己写一个一般的java程序,比如就把一些数据写入hdfs,如下:
package com.hortonworks.simpleyarnapp;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.io.Closeables;
public class Work {
/**
* @param args
*/
public static void main(String[] args) {
String info="first argument is:"+args[0]+"\n"
+"second argument is :"+args[1];
System.out.println("--------------------------------"+info);
Configuration conf = new YarnConfiguration();
conf.set("fs.defaultFS", "hdfs://node31:9000");
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.address", "node31:8032");
writeString(info,conf);
}
private static void writeString(String value,Configuration conf) {
Path path=new Path("hdfs://node31:9000/input/work.info");
FileSystem fs;
FSDataOutputStream out=null;
try {
fs = FileSystem.get(path.toUri(),conf);
out = fs.create(path);
out.writeUTF(value);
} catch(Exception e){
e.printStackTrace();
}finally {
Closeables.closeQuietly(out);
}
}
}
amContainer.setCommands(
Collections.singletonList(
"$JAVA_HOME/bin/java" +
" -Xmx256M" +
" com.hortonworks.simpleyarnapp.Work" +
/* " com.hortonworks.simpleyarnapp.ApplicationMaster" +*/
" " + command +
" " + String.valueOf(n) +
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
)
);那么,其实也是可以运行Work的,不过,Work的内容正确运行(确实在hdfs中写入了数据),但是job的状态返回的是fail的。但是确实是可以提交任务的。还有一点,假如,我把AppMaster的command换为java命令,然后来执行我的Work,这样应该也是可以的。但是目前的情况是,任务执行成功,但是Work的内容却是没有执行(hdfs没有写入数据)。
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990
yarn运行simpleyarnapp,布布扣,bubuko.com
原文:http://blog.csdn.net/fansy1990/article/details/23347759