https://pan.baidu.com/s/1hTnGnxdvgyNaLwckbKUgJA
package mapreduce; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CleanResult { public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException{ Job job = Job.getInstance(); job.setJobName("CleanResult"); job.setJarByClass(WordCount.class); job.setMapperClass(doMapper.class); job.setReducerClass(doReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path in = new Path("hdfs://192.168.12.128:9000/mapreduce/in/result.txt"); Path out = new Path("hdfs://192.168.12.128:9000/mapreduce/out/clean"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); System.exit(job.waitForCompletion(true) ? 0:1); } public static class doMapper extends Mapper<Object,Text,Text,IntWritable>{ public static Text word = new Text(); public static final IntWritable id = new IntWritable(); @Override protected void map(Object key,Text value,Context context) throws IOException,InterruptedException{ String[] data = value.toString().split(","); String[] time = data[1].split(":"); data[1] = "2016-11-10 " + time[1] + ":" + time[2] + ":" + time[3].split(" ")[0]; String traffic = ""; for(int i=0;i<data[3].length();i++){ if(data[3].charAt(i) >= 48 && data[3].charAt(i)<=57) { traffic += data[3].charAt(i); } } data[3] = traffic; String newValue = data[0]; for(int i=1;i<5;i++) newValue += "\t" + data[i]; id.set(Integer.parseInt(data[5])); word.set(newValue); context.write(word, id); } } public static class doReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { System.out.println("key:" + key); for(IntWritable value:values){ result.set(value.get()); } System.out.println("value:" + result); context.write(key, result); } } }
清洗后数据导入到hive的方法:
因为我电脑用的是虚拟机上的hive连接主机的mysql,然后用主机的eclipse进行编程。如果使用Java API将数据导入hive的话,hive和hadoop还得改好几个配置文件,特别容易出错。等我尝试更改之后再用API导入,现在先用hive命令行导入:
load data inpath ‘/mapreduce/out/clean/part-r-00000‘ into table data;
数据清洗完毕:
文件从hdfs上传至hive后
原文:https://www.cnblogs.com/dream0-0/p/11851540.html