用fsck命令统计 查看HDFS上在某一天日志的大小,分块情况以及平均的块大小,即
[hduser@da-master jar]$ hadoop fsck /wcc/da/kafka/report/2015-01-11 DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. 15/01/13 18:57:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Connecting to namenode via http://da-master:50070 FSCK started by hduser (auth:SIMPLE) from /172.21.101.30 for path /wcc/da/kafka/report/2015-01-11 at Tue Jan 13 18:57:24 CST 2015 .................................................................................................... .................................................................................................... ........................................Status: HEALTHY Total size: 9562516137 B Total dirs: 1 Total files: 240 Total symlinks: 0 Total blocks (validated): 264 (avg. block size 36221652 B) Minimally replicated blocks: 264 (100.0 %) Over-replicated blocks: 0 (0.0 %) Under-replicated blocks: 0 (0.0 %) Mis-replicated blocks: 0 (0.0 %) Default replication factor: 2 Average block replication: 2.0 Corrupt blocks: 0 Missing replicas: 0 (0.0 %) Number of data-nodes: 5 Number of racks: 1 FSCK ended at Tue Jan 13 18:57:24 CST 2015 in 14 milliseconds The filesystem under path '/wcc/da/kafka/report/2015-01-11' is HEALTHY
|
Date Time |
Total(GB) |
Total blocks |
AveBlockSize(MB) |
|
2014/12/21 |
9.39 |
268 |
36 |
|
2014/12/20 |
9.5 |
268 |
36 |
|
2014/12/19 |
8.89 |
268 |
34 |
|
2014/11/5 |
8.6 |
266 |
33 |
|
2014/10/1 |
9.31 |
268 |
36 |
分析问题的存在性:从表中可以看出,每天日志量的分块情况:总共大概有268左右的块数,平均块大小为36MB左右,远远不足128MB,这潜在的说明了一个问题。日志产生了很多小文件,大多数都不足128M,严重影响集群的扩展性和性能:首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1000 0000个小文件,每个文件占用一个block,则namenode大约需要2G空间。如果存储1亿个文件,则namenode需要20G空间,这样namenode内存容量严重制约了集群的扩展; 其次,访问大量小文件速度远远小于访问几个大文件;HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个datanode跳到另一个datanode,严重影响性能;最后,处理大量小文件速度远远小于处理同等大小的大文件的速度,因为每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上,累计起来的总时长必然增加。我们采取的策略是先合并小文件,比如整理日志成user_report.tsv,client_report.tsv,AppLog_UserDevice.tsv, 再运行job。
可以调用API的 FileUtil工具类的方法copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource,Configuration conf, String addString);
但是此方法并不适用,因为某一天日志存在着三种类型的日志,即:
要分别合并成三个文件user_report.tsv,client_report.tsv和AppLog_UserDevice.tsv,故必须重新实现copyMerge函数,先分析copyMerge源码:
/** Copy all files in a directory to one output file (merge). */
public static boolean copyMerge(FileSystem srcFS, Path srcDir,
FileSystem dstFS, Path dstFile,
boolean deleteSource,
Configuration conf, String addString) throws IOException {
//生成合并后的目标文件路径dstFile,文件名为srcDir.getName(),即源路径的目录名,因此这里我们不能自定义生成的日志文件名,非常不方便
dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
//判断源路径是否为文件目录
if (!srcFS.getFileStatus(srcDir).isDirectory())
return false;
//创建输出流,准备写文件
OutputStream out = dstFS.create(dstFile);
try {
// 得到每个源路径目录下的每个文件
FileStatus contents[] = srcFS.listStatus(srcDir);
//排序操作
Arrays.sort(contents);
for (int i = 0; i < contents.length; i++) {
if (contents[i].isFile()) {
//创建输入流,读文件
InputStream in = srcFS.open(contents[i].getPath());
try {
//执行复制操作,写入到目标文件中
IOUtils.copyBytes(in, out, conf, false);
if (addString!=null)
out.write(addString.getBytes("UTF-8"));
} finally {
in.close();
}
}
}
} finally {
out.close();
}
//若deleteSource为true,删除源路径目录下的每个文件
if (deleteSource) {
return srcFS.delete(srcDir, true);
} else {
return true;
}
} /** Copy corresponding files in a directory to related output file (merge). */
@SuppressWarnings("unchecked")
public static boolean merge(FileSystem hdfs, Path srcDir, Path dstFile,
boolean deleteSource, Configuration conf) throws IOException {
if (!hdfs.getFileStatus(srcDir).isDirectory())
return false;
// 得到每个源目录下的每个文件;
FileStatus[] fileStatus = hdfs.listStatus(srcDir);
// 三种不同类型的文件各自的文件路径存入不同的list;
ArrayList<Path> userPaths = new ArrayList<Path>();
ArrayList<Path> clientPaths = new ArrayList<Path>();
ArrayList<Path> appPaths = new ArrayList<Path>();
for (FileStatus fileStatu : fileStatus) {
Path filePath = fileStatu.getPath();
if (filePath.getName().startsWith("user_report")) {
userPaths.add(filePath);
} else if (filePath.getName().startsWith("client_report")) {
clientPaths.add(filePath);
} else if (filePath.getName().startsWith("AppLog_UserDevice")) {
appPaths.add(filePath);
}
}
// 分别写入到目标文件:user_report.tsv中
if (userPaths.size() > 0) {
Path userDstFile = new Path(dstFile.toString() + "/user_report.tsv");
OutputStream out = hdfs.create(userDstFile);
Collections.sort(userPaths);
try {
Iterator<Path> iterator = userPaths.iterator();
while (iterator.hasNext()) {
InputStream in = hdfs.open(iterator.next());
try {
IOUtils.copyBytes(in, out, conf, false);
} finally {
in.close();
}
}
} finally {
out.close();
}
}
// 分别写入到目标文件:client_report.tsv中
if (clientPaths.size() > 0) {
Path clientDstFile = new Path(dstFile.toString()
+ "/client_report.tsv");
OutputStream out = hdfs.create(clientDstFile);
Collections.sort(clientPaths);
try {
Iterator<Path> iterator = clientPaths.iterator();
while (iterator.hasNext()) {
InputStream in = hdfs.open(iterator.next());
try {
IOUtils.copyBytes(in, out, conf, false);
} finally {
in.close();
}
}
} finally {
out.close();
}
}
// 分别写入到目标文件:AppLog_UserDevice.tsv中
if (appPaths.size() > 0) {
Path appDstFile = new Path(dstFile.toString()
+ "/AppLog_UserDevice.tsv");
OutputStream out = hdfs.create(appDstFile);
Collections.sort(appPaths);
try {
Iterator<Path> iterator = appPaths.iterator();
while (iterator.hasNext()) {
InputStream in = hdfs.open(iterator.next());
try {
IOUtils.copyBytes(in, out, conf, false);
} finally {
in.close();
}
}
} finally {
out.close();
}
}
if (deleteSource) {
return hdfs.delete(srcDir, true);
}
return true;
}
public static boolean mergeFiles(FileSystem hdfs, Path srcDir,
Path dstFile, boolean deleteSource, Configuration conf)
throws IOException {
if (!hdfs.getFileStatus(srcDir).isDirectory())
return false;
// 得到每个源目录下的每个文件;
FileStatus[] fileStatus = hdfs.listStatus(srcDir);
// 三种不同类型的文件各自合并
for (FileStatus fileStatu : fileStatus) {
Path filePath = fileStatu.getPath();
Path dstPath = new Path("");
if (filePath.getName().startsWith("user_report")) {
dstPath = new Path(dstFile.toString() + "/user_report.tsv");
} else if (filePath.getName().startsWith("client_report")) {
dstPath = new Path(dstFile.toString() + "/client_report.tsv");
} else if (filePath.getName().startsWith("AppLog_UserDevice")) {
dstPath = new Path(dstFile.toString() + "/client_report.tsv");
}else{
dstPath=new Path( "/error.tsv");
}
OutputStream out = hdfs.create(dstPath);
try {
InputStream in = hdfs.open(filePath);
try {
IOUtils.copyBytes(in, out, conf, false);
} finally {
in.close();
}
} finally {
out.close();
}
}
if (deleteSource) {
return hdfs.delete(srcDir, true);
}
return true;
}
原文:http://blog.csdn.net/lviiii/article/details/42680249