用spark读取sequencefile时,非常消耗时间,默认情况下SequenceFileInputFormat切分文件是沿用FIleInputFormat,对于大文件会切成Hdfs block size大小,如果想切的更小,增加spark任务的并法度,可以自己修改:
class MySequenceFileInputFormat[K, V] extends FileInputFormat[K, V] {
private val LOG: Log = LogFactory.getLog(classOf[MySequenceFileInputFormat[K, V]])
val sequenceFileBlockSize = 30000000 //手动设置blocksize为30M
@throws[IOException]
def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[K, V] = new SequenceFileRecordReader
override protected def getFormatMinSplitSize: Long = 2000L
@throws[IOException]
override protected def listStatus(job: JobContext): List[FileStatus] = {
val files: List[FileStatus] = super.listStatus(job)
val len: Int = files.size
var j: Int = 0
var i: Int = 0
while (i < len) {
{
val file: FileStatus = files.get(i).asInstanceOf[FileStatus]
if (file.isDirectory) {
val p: Path = file.getPath
val fs: FileSystem = p.getFileSystem(job.getConfiguration)
files.set(i, fs.getFileStatus(new Path(p, "data")))
}
if (files.get(i).asInstanceOf[FileStatus].getLen != 0L) {
files.set(j, files.get(i))
j += 1
}
}
{
i += 1;
i
}
}
files.subList(0, j)
}
@throws[IOException]
override def getSplits(job: JobContext): List[InputSplit] = {
val sw: Stopwatch = (new Stopwatch).start
val minSize: Long = Math.max(this.getFormatMinSplitSize, FileInputFormat.getMinSplitSize(job))
val maxSize: Long = FileInputFormat.getMaxSplitSize(job)
val splits: ArrayList[InputSplit] = new ArrayList[InputSplit]
val files: List[FileStatus] = listStatus(job)
val it: Iterator[FileStatus] = files.iterator
while(true){
while(true){
while(it.hasNext){
val file: FileStatus = it.next.asInstanceOf[FileStatus]
val path: Path = file.getPath
val length: Long = file.getLen
if(length!=0L){
var blkLocations: Array[BlockLocation] = null
if (file.isInstanceOf[LocatedFileStatus]) {
blkLocations = file.asInstanceOf[LocatedFileStatus].getBlockLocations
} else {
val blockSize: FileSystem = path.getFileSystem(job.getConfiguration)
blkLocations = blockSize.getFileBlockLocations(file, 0L, length)
}
if (this.isSplitable(job, path)) {
// long blockSize1 = file.getBlockSize();
val blockSize1: Long = sequenceFileBlockSize //手动设置blocksize为50M
val splitSize: Long = this.computeSplitSize(blockSize1, minSize, maxSize)
var bytesRemaining: Long = 0L
var blkIndex: Int = 0
bytesRemaining = length
while (bytesRemaining.toDouble / splitSize.toDouble > 1.1D) {
blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining)
splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations(blkIndex).getHosts, blkLocations(blkIndex).getCachedHosts))
bytesRemaining -= splitSize
}
if (bytesRemaining != 0L) {
blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining)
splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations(blkIndex).getHosts, blkLocations(blkIndex).getCachedHosts))
}
} else {
splits.add(this.makeSplit(path, 0L, length, blkLocations(0).getHosts, blkLocations(0).getCachedHosts))
}
}else{
splits.add(this.makeSplit(path, 0L, length, new Array[String](0)))
}
job.getConfiguration.setLong("mapreduce.input.fileinputformat.numinputfiles", files.size.toLong)
sw.stop
if (LOG.isDebugEnabled) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size + ", TimeTaken: " + sw.elapsedMillis)
}
return splits
}
}
}
return splits
}
}sequenceFileBlockSize 改成自己想要的大小
使用:
val dd = sc.newAPIHadoopFile[BytesWritable,BytesWritable, MySequenceFileInputFormat[BytesWritable,BytesWritable]](sourceDir).flatMap(x=>{
function(new String(x._2.getBytes))
})本文出自 “11660039” 博客,请务必保留此出处http://11670039.blog.51cto.com/11660039/1893871
修改SequenceFileInputFormat hdfs blocksize
原文:http://11670039.blog.51cto.com/11660039/1893871