public abstract class ManagedBuffer {/** Number of bytes of the data. */public abstract long size();/*** Exposes this buffer‘s data as an NIO ByteBuffer. Changing the position and limit of the* returned ByteBuffer should not affect the content of this buffer.*/// TODO: Deprecate this, usage may require expensive memory mapping or allocation.public abstract ByteBuffer nioByteBuffer() throws IOException;/*** Exposes this buffer‘s data as an InputStream. The underlying implementation does not* necessarily check for the length of bytes read, so the caller is responsible for making sure* it does not go over the limit.*/public abstract InputStream createInputStream() throws IOException;/*** Increment the reference count by one if applicable.*/public abstract ManagedBuffer retain();/*** If applicable, decrement the reference count by one and deallocates the buffer if the* reference count reaches zero.*/public abstract ManagedBuffer release();/*** Convert the buffer into an Netty object, used to write the data out.*/public abstract Object convertToNetty() throws IOException;}
public final class FileSegmentManagedBuffer extends ManagedBuffer {private final TransportConf conf;private final File file;private final long offset;private final long length;public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {this.conf = conf;this.file = file;this.offset = offset;this.length = length;}@Overridepublic long size() {return length;}@Overridepublic ByteBuffer nioByteBuffer() throws IOException {FileChannel channel = null;try {channel = new RandomAccessFile(file, "r").getChannel();// Just copy the buffer if it‘s sufficiently small, as memory mapping has a high overhead.if (length < conf.memoryMapBytes()) {ByteBuffer buf = ByteBuffer.allocate((int) length);channel.position(offset);while (buf.remaining() != 0) {if (channel.read(buf) == -1) {throw new IOException(String.format("Reached EOF before filling buffer\n" +"offset=%s\nfile=%s\nbuf.remaining=%s",offset, file.getAbsoluteFile(), buf.remaining()));}}buf.flip();return buf;} else {return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);}} catch (IOException e) {try {if (channel != null) {long size = channel.size();throw new IOException("Error in reading " + this + " (actual file length " + size + ")",e);}} catch (IOException ignored) {// ignore}throw new IOException("Error in opening " + this, e);} finally {JavaUtils.closeQuietly(channel);}}@Overridepublic InputStream createInputStream() throws IOException {FileInputStream is = null;try {is = new FileInputStream(file);ByteStreams.skipFully(is, offset);return new LimitedInputStream(is, length);} catch (IOException e) {try {if (is != null) {long size = file.length();throw new IOException("Error in reading " + this + " (actual file length " + size + ")",e);}} catch (IOException ignored) {// ignore} finally {JavaUtils.closeQuietly(is);}throw new IOException("Error in opening " + this, e);} catch (RuntimeException e) {JavaUtils.closeQuietly(is);throw e;}}@Overridepublic ManagedBuffer retain() {return this;}@Overridepublic ManagedBuffer release() {return this;}@Overridepublic Object convertToNetty() throws IOException {if (conf.lazyFileDescriptor()) {return new LazyFileRegion(file, offset, length);} else {FileChannel fileChannel = new FileInputStream(file).getChannel();return new DefaultFileRegion(fileChannel, offset, length);}}public File getFile() { return file; }public long getOffset() { return offset; }public long getLength() { return length; }@Overridepublic String toString() {return Objects.toStringHelper(this).add("file", file).add("offset", offset).add("length", length).toString();}}
public final class NettyManagedBuffer extends ManagedBuffer {private final ByteBuf buf;public NettyManagedBuffer(ByteBuf buf) {this.buf = buf;}@Overridepublic long size() {return buf.readableBytes();}@Overridepublic ByteBuffer nioByteBuffer() throws IOException {return buf.nioBuffer();}@Overridepublic InputStream createInputStream() throws IOException {return new ByteBufInputStream(buf);}@Overridepublic ManagedBuffer retain() {buf.retain();return this;}@Overridepublic ManagedBuffer release() {buf.release();return this;}@Overridepublic Object convertToNetty() throws IOException {return buf.duplicate();}@Overridepublic String toString() {return Objects.toStringHelper(this).add("buf", buf).toString();}}
public final class NioManagedBuffer extends ManagedBuffer {private final ByteBuffer buf;public NioManagedBuffer(ByteBuffer buf) {this.buf = buf;}@Overridepublic long size() {return buf.remaining();}@Overridepublic ByteBuffer nioByteBuffer() throws IOException {return buf.duplicate();}@Overridepublic InputStream createInputStream() throws IOException {return new ByteBufInputStream(Unpooled.wrappedBuffer(buf));}@Overridepublic ManagedBuffer retain() {return this;}@Overridepublic ManagedBuffer release() {return this;}@Overridepublic Object convertToNetty() throws IOException {return Unpooled.wrappedBuffer(buf);}@Overridepublic String toString() {return Objects.toStringHelper(this).add("buf", buf).toString();}}
原文:http://www.cnblogs.com/gaoxing/p/4985558.html