不多说,直接上代码。
生成的结果,作为输入源。






代码
package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter;
import java.net.URI;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * 
 * @function 统计无效数据和对输出结果进行压缩
 * @author 小讲
 * 
 */
public class CompressAndCounter extends Configured implements Tool 
{
	// 定义枚举对象
	public static enum LOG_PROCESSOR_COUNTER
	{
		BAD_RECORDS
	};
	/**
	 * 
	 * @function Mapper 解析数据,统计无效数据,并输出有效数据
	 *
	 */
	public static class CompressAndCounterMap extends Mapper<LongWritable, Text, Text, Text>
	{							
		protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException 
		{
			// 解析每条机顶盒记录,返回list集合
			List<String> list = ParseTVData.transData(value.toString()); //调用ParseTVData.java下的transData方法
			int length = list.size();
			// 无效记录
			if (length == 0) 
			{
				// 动态自定义计数器
				context.getCounter("ErrorRecordCounter", "ERROR_Record_TVData").increment(1);
				// 枚举声明计数器
				context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS).increment(1);
			} else
			{
				for (String validateRecord : list) 
				{
					//输出解析数据
					context.write(new Text(validateRecord), new Text(""));
				}
			}
		}
	}
	/**
	 * @function 任务驱动方法
	 * 
	 */
	@Override
	public int run(String[] args) throws Exception 
	{
		// TODO Auto-generated method stub
		//读取配置文件
		Configuration conf = new Configuration();
		//文件系统接口
		URI uri = new URI("hdfs://HadoopMaster:9000");
		//输出路径
		Path mypath = new Path(args[1]);
		// 创建FileSystem对象
		FileSystem hdfs = FileSystem.get(uri, conf);
		if (hdfs.isDirectory(mypath)) 
		{
			//删除已经存在的文件路径
			hdfs.delete(mypath, true);
		}
		Job job = new Job(conf, "CompressAndCounter");//新建一个任务
		job.setJarByClass(CompressAndCounter.class);//设置主类
		
		job.setMapperClass(CompressAndCounterMap.class);//只有 Mapper
		job.setOutputKeyClass(Text.class);//输出 key 类型
		job.setOutputValueClass(Text.class);//输出 value 类型
		
		FileInputFormat.addInputPath(job, new Path(args[0]));//输入路径
		FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出路径
		
		
		FileOutputFormat.setCompressOutput(job, true);//对输出结果设置压缩
		FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);//设置压缩类型
		
		job.waitForCompletion(true);//提交任务
		return 0;
	}
	/**
	 * @function main 方法
	 * @param args 输入    输出路径
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception
	{		
		String[] date = {"20120917","20120918","20120919","20120920","20120921","20120922","20120923"};
		int ec = 1;
		for(String dt:date)
		{
			String[] args0 = { "hdfs://HadoopMaster:9000/middle/tv/"+dt+".txt",
			"hdfs://HadoopMaster:9000/junior/tvCompressResult/"+dt };
			
//			String[] args0 = { "./data/compressAndCounter/"+dt+".txt",
//					"hdfs://HadoopMaster:9000/junior/tvCompressResult/"+dt };
			
			ec = ToolRunner.run(new Configuration(), new CompressAndCounter(), args0);
		}		
		System.exit(ec);
	}
}
package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter;
import java.util.ArrayList;
import java.util.List;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
/**
 * 
 * @function 解析数据
 * 
 * 
 */
public class ParseTVData
{
	/**
	 * @function 使用 Jsoup 工具,解析输入数据,
	 * @param text
	 * @return list
	 */
	public static List<String> transData(String text) 
	{
		List<String> list = new ArrayList<String>();
		Document doc;
		String rec = "";
		try 
		{
			doc = Jsoup.parse(text);// jsoup解析数据
			Elements content = doc.getElementsByTag("WIC");
			String num = content.get(0).attr("cardNum");// 记录编号
			if (num == null || num.equals("")) 
			{
				num = " ";
			}
			String stbNum = content.get(0).attr("stbNum");// 机顶盒号
			if (stbNum.equals("")) 
			{
				return list;
			}
String date = content.get(0).attr("date");// 日期
			Elements els = doc.getElementsByTag("A");
			if (els.isEmpty()) 
			{
				return list;
			}
			for (Element el : els)
			{
				String e = el.attr("e");// 结束时间
String s = el.attr("s");// 开始时间
String sn = el.attr("sn");// 频道名称
				rec = stbNum + "@" + date + "@" + sn + "@" + s + "@" + e;
				list.add(rec);
			}
		} catch (Exception e) 
		{
			System.out.println(e.getMessage());
			return list;
		}
		return list;
	}
}
Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)
原文:http://www.cnblogs.com/zlslch/p/6171823.html