首页 > 其他 > 详细

MapReduce实验——数据清洗

时间:2019-11-13 18:53:13      阅读:511      评论:0      收藏:0      [点我收藏+]

实验要求

https://pan.baidu.com/s/1hTnGnxdvgyNaLwckbKUgJA

程序源代码

package mapreduce;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CleanResult {
    public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException{
        Job job = Job.getInstance();
        job.setJobName("CleanResult");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(doMapper.class);
        job.setReducerClass(doReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        Path in = new Path("hdfs://192.168.12.128:9000/mapreduce/in/result.txt");
        Path out = new Path("hdfs://192.168.12.128:9000/mapreduce/out/clean");
        FileInputFormat.addInputPath(job,in);
        FileOutputFormat.setOutputPath(job,out);
        System.exit(job.waitForCompletion(true) ? 0:1);
    }
    public static class doMapper extends Mapper<Object,Text,Text,IntWritable>{
        public static Text word = new Text();
        public static final IntWritable id = new IntWritable();
        @Override
        protected void map(Object key,Text value,Context context) throws IOException,InterruptedException{
            String[] data = value.toString().split(",");
            String[] time = data[1].split(":");
            data[1] = "2016-11-10 " + time[1] + ":" + time[2] + ":"  + time[3].split(" ")[0];
            String traffic = "";
            for(int i=0;i<data[3].length();i++){
                if(data[3].charAt(i) >= 48 && data[3].charAt(i)<=57) {
                    traffic += data[3].charAt(i);
                }
            }
            data[3] = traffic;
            String newValue = data[0];
            for(int i=1;i<5;i++)
                newValue += "\t" + data[i];
            id.set(Integer.parseInt(data[5]));
            word.set(newValue);
            context.write(word, id);
        }
    }
    
    public static class doReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        private IntWritable result = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
             System.out.println("key:" + key);
                for(IntWritable value:values){
                    result.set(value.get());
                }
                System.out.println("value:" + result);
                context.write(key, result);
        }
        
    }

}

清洗后数据导入到hive的方法:

因为我电脑用的是虚拟机上的hive连接主机的mysql,然后用主机的eclipse进行编程。如果使用Java API将数据导入hive的话,hive和hadoop还得改好几个配置文件,特别容易出错。等我尝试更改之后再用API导入,现在先用hive命令行导入:

load data inpath /mapreduce/out/clean/part-r-00000 into table data;

运行结果截图

数据清洗完毕:

技术分享图片

 文件从hdfs上传至hive后

技术分享图片

 

 

 

MapReduce实验——数据清洗

原文:https://www.cnblogs.com/dream0-0/p/11851540.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!