首页 > 其他 > 详细

mapreduce练习

时间:2020-05-30 19:34:19      阅读:55      评论:0      收藏:0      [点我收藏+]

自定义inputFormat小文件合并

package myInput;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> {
    private FileSplit fileSplit;
    private Configuration configuration;
    private BytesWritable bytesWritable = new BytesWritable();
    private boolean nextValue = false;

    /**
     * @param inputSplit         文件的切片
     * @param taskAttemptContext 上下文
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        fileSplit = (FileSplit) inputSplit;
        configuration = taskAttemptContext.getConfiguration();
    }

    /**
     * @return 返回值是一个boolean 入托为true 读取完毕 ,false继续向下读取文件
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        //根据文件切片,读取文件,将文件的内容全部读取出来,封装到BytesWritable中
        if (!nextValue) {
            byte[] bytes = new byte[(int) fileSplit.getLength()];
            Path path = fileSplit.getPath();
            FileSystem fileSystem = path.getFileSystem(configuration);
            FSDataInputStream open = fileSystem.open(path);
            IOUtils.readFully(open, bytes, 0, (int) fileSplit.getLength());
            bytesWritable.set(bytes, 0, bytes.length);
            nextValue = true;
            IOUtils.closeStream(open);
            return nextValue;
        }
        return false;
    }

    /**
     * @return 返回k1
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    /**
     * @return 返回v1
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return bytesWritable;

    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return nextValue ? 1.0f : 0.0f;
    }

    @Override
    public void close() throws IOException {
    }
}

自定义outputFormat输出到不同文件夹

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MyOutputFormat extends FileOutputFormat<Text,NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
       //从这个方法里面可以获取一个configuration
        Configuration configuration = context.getConfiguration();
        //获取文件系统的对象
        FileSystem fileSystem = FileSystem.get(configuration);
        //好评文件的输出路径
        Path goodComment = new Path("file:///F:\\传智播客大数据离线阶段课程资料\\5、大数据离线第五天\\自定义outputformat\\goodComment\\1.txt");

        //差评文件的输出路径
        Path badComment = new Path("file:///F:\\传智播客大数据离线阶段课程资料\\5、大数据离线第五天\\自定义outputformat\\badComment\\1.txt");

        //获取到了两个输出流
        FSDataOutputStream fsDataOutputStream = fileSystem.create(goodComment);
        FSDataOutputStream fsDataOutputStream1 = fileSystem.create(badComment);

        MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream, fsDataOutputStream1);

        return myRecordWriter;
    }
}

mapreduce练习

原文:https://www.cnblogs.com/hatcher-h/p/12993832.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!