package myInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration configuration;
private BytesWritable bytesWritable = new BytesWritable();
private boolean nextValue = false;
/**
* @param inputSplit 文件的切片
* @param taskAttemptContext 上下文
* @throws IOException
* @throws InterruptedException
*/
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
fileSplit = (FileSplit) inputSplit;
configuration = taskAttemptContext.getConfiguration();
}
/**
* @return 返回值是一个boolean 入托为true 读取完毕 ,false继续向下读取文件
* @throws IOException
* @throws InterruptedException
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
//根据文件切片,读取文件,将文件的内容全部读取出来,封装到BytesWritable中
if (!nextValue) {
byte[] bytes = new byte[(int) fileSplit.getLength()];
Path path = fileSplit.getPath();
FileSystem fileSystem = path.getFileSystem(configuration);
FSDataInputStream open = fileSystem.open(path);
IOUtils.readFully(open, bytes, 0, (int) fileSplit.getLength());
bytesWritable.set(bytes, 0, bytes.length);
nextValue = true;
IOUtils.closeStream(open);
return nextValue;
}
return false;
}
/**
* @return 返回k1
* @throws IOException
* @throws InterruptedException
*/
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
/**
* @return 返回v1
* @throws IOException
* @throws InterruptedException
*/
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return bytesWritable;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return nextValue ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MyOutputFormat extends FileOutputFormat<Text,NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
//从这个方法里面可以获取一个configuration
Configuration configuration = context.getConfiguration();
//获取文件系统的对象
FileSystem fileSystem = FileSystem.get(configuration);
//好评文件的输出路径
Path goodComment = new Path("file:///F:\\传智播客大数据离线阶段课程资料\\5、大数据离线第五天\\自定义outputformat\\goodComment\\1.txt");
//差评文件的输出路径
Path badComment = new Path("file:///F:\\传智播客大数据离线阶段课程资料\\5、大数据离线第五天\\自定义outputformat\\badComment\\1.txt");
//获取到了两个输出流
FSDataOutputStream fsDataOutputStream = fileSystem.create(goodComment);
FSDataOutputStream fsDataOutputStream1 = fileSystem.create(badComment);
MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream, fsDataOutputStream1);
return myRecordWriter;
}
}
原文:https://www.cnblogs.com/hatcher-h/p/12993832.html