Netty是一个高性能的NIO通信框架,提供异步的、事件驱动的网络编程模型。使用Netty可以方便用户开发各种常用协议的网络程序。例如:TCP、UDP、HTTP等等。
Netty的最新版本是3.2.7,官网地址是:http://www.jboss.org/netty
本文的主要目的是基于Netty实现一个通用二进制协议的高效数据传输。协议是通用的二进制协议,高效并且扩展性很好。
一个好的协议有两个标准:
(1)生成的传输数据要少,即数据压缩比要高。这样可以减少网络开销。
(2)传输数据和业务对象之间的转换速度要快。
(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen)
一、协议的定义
无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后。
(1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
       编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、数据包长(4byte)
(2)数据:由数据包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
       数据格式定义:
       字段1键名长度    字段1键名 字段1值长度    字段1值
       字段2键名长度    字段2键名 字段2值长度    字段2值
       字段3键名长度    字段3键名 字段3值长度    字段3值
       …    …    …    …
       长度为整型,占4个字节
代码中用两个Vo对象来表示:XLRequest和XLResponse。
 package org.jboss.netty.example.xlsvr.vo;
package org.jboss.netty.example.xlsvr.vo;
 import java.util.HashMap;
import java.util.HashMap; import java.util.Map;
import java.util.Map;
 /**
/** *  @author hankchen
 *  @author hankchen *  2012-2-3 下午02:46:52
 *  2012-2-3 下午02:46:52 */
 */

 /**
/** * 响应数据
 * 响应数据 */
 */
 /**
/** * 通用协议介绍
 * 通用协议介绍 *
 *  * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
 * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节: * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
 * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte) * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
 * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String> * 数据格式定义:
 * 数据格式定义: * 字段1键名长度    字段1键名 字段1值长度    字段1值
 * 字段1键名长度    字段1键名 字段1值长度    字段1值 * 字段2键名长度    字段2键名 字段2值长度    字段2值
 * 字段2键名长度    字段2键名 字段2值长度    字段2值 * 字段3键名长度    字段3键名 字段3值长度    字段3值
 * 字段3键名长度    字段3键名 字段3值长度    字段3值 * …    …    …    …
 * …    …    …    … * 长度为整型,占4个字节
 * 长度为整型,占4个字节 */
 */ public class XLResponse {
public class XLResponse { private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
    private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1 private byte encrypt;// 加密类型。0表示不加密
    private byte encrypt;// 加密类型。0表示不加密 private byte extend1;// 用于扩展协议。暂未定义任何值
    private byte extend1;// 用于扩展协议。暂未定义任何值 private byte extend2;// 用于扩展协议。暂未定义任何值
    private byte extend2;// 用于扩展协议。暂未定义任何值 private int sessionid;// 会话ID
    private int sessionid;// 会话ID private int result;// 结果码
    private int result;// 结果码 private int length;// 数据包长
    private int length;// 数据包长 
     private Map<String,String> values=new HashMap<String, String>();
    private Map<String,String> values=new HashMap<String, String>(); 
     private String ip;
    private String ip; 
     public void setValue(String key,String value){
    public void setValue(String key,String value){ values.put(key, value);
        values.put(key, value); }
    } 
     public String getValue(String key){
    public String getValue(String key){ if (key==null) {
        if (key==null) { return null;
            return null; }
        } return values.get(key);
        return values.get(key); }
    }
 public byte getEncode() {
    public byte getEncode() { return encode;
        return encode; }
    }
 public void setEncode(byte encode) {
    public void setEncode(byte encode) { this.encode = encode;
        this.encode = encode; }
    }
 public byte getEncrypt() {
    public byte getEncrypt() { return encrypt;
        return encrypt; }
    }
 public void setEncrypt(byte encrypt) {
    public void setEncrypt(byte encrypt) { this.encrypt = encrypt;
        this.encrypt = encrypt; }
    }
 public byte getExtend1() {
    public byte getExtend1() { return extend1;
        return extend1; }
    }
 public void setExtend1(byte extend1) {
    public void setExtend1(byte extend1) { this.extend1 = extend1;
        this.extend1 = extend1; }
    }
 public byte getExtend2() {
    public byte getExtend2() { return extend2;
        return extend2; }
    }
 public void setExtend2(byte extend2) {
    public void setExtend2(byte extend2) { this.extend2 = extend2;
        this.extend2 = extend2; }
    }
 public int getSessionid() {
    public int getSessionid() { return sessionid;
        return sessionid; }
    }
 public void setSessionid(int sessionid) {
    public void setSessionid(int sessionid) { this.sessionid = sessionid;
        this.sessionid = sessionid; }
    }
 public int getResult() {
    public int getResult() { return result;
        return result; }
    }
 public void setResult(int result) {
    public void setResult(int result) { this.result = result;
        this.result = result; }
    }
 public int getLength() {
    public int getLength() { return length;
        return length; }
    }
 public void setLength(int length) {
    public void setLength(int length) { this.length = length;
        this.length = length; }
    }
 public Map<String, String> getValues() {
    public Map<String, String> getValues() { return values;
        return values; }
    }
 public String getIp() {
    public String getIp() { return ip;
        return ip; }
    }
 public void setIp(String ip) {
    public void setIp(String ip) { this.ip = ip;
        this.ip = ip; }
    }
 public void setValues(Map<String, String> values) {
    public void setValues(Map<String, String> values) { this.values = values;
        this.values = values; }
    }
 @Override
    @Override public String toString() {
    public String toString() { return "XLResponse [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2
        return "XLResponse [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2 + ", sessionid=" + sessionid + ", result=" + result + ", length=" + length + ", values=" + values + ", ip=" + ip + "]";
                + ", sessionid=" + sessionid + ", result=" + result + ", length=" + length + ", values=" + values + ", ip=" + ip + "]"; }
    } }
}
 package org.jboss.netty.example.xlsvr.vo;
package org.jboss.netty.example.xlsvr.vo;
 import java.util.HashMap;
import java.util.HashMap; import java.util.Map;
import java.util.Map;
 /**
/** *  @author hankchen
 *  @author hankchen *  2012-2-3 下午02:46:41
 *  2012-2-3 下午02:46:41 */
 */
 /**
/** * 请求数据
 * 请求数据 */
 */
 /**
/** * 通用协议介绍
 * 通用协议介绍 *
 *  * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
 * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节: * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
 * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte) * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
 * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String> * 数据格式定义:
 * 数据格式定义: * 字段1键名长度    字段1键名 字段1值长度    字段1值
 * 字段1键名长度    字段1键名 字段1值长度    字段1值 * 字段2键名长度    字段2键名 字段2值长度    字段2值
 * 字段2键名长度    字段2键名 字段2值长度    字段2值 * 字段3键名长度    字段3键名 字段3值长度    字段3值
 * 字段3键名长度    字段3键名 字段3值长度    字段3值 * …    …    …    …
 * …    …    …    … * 长度为整型,占4个字节
 * 长度为整型,占4个字节 */
 */ public class XLRequest {
public class XLRequest { private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
    private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1 private byte encrypt;// 加密类型。0表示不加密
    private byte encrypt;// 加密类型。0表示不加密 private byte extend1;// 用于扩展协议。暂未定义任何值
    private byte extend1;// 用于扩展协议。暂未定义任何值 private byte extend2;// 用于扩展协议。暂未定义任何值
    private byte extend2;// 用于扩展协议。暂未定义任何值 private int sessionid;// 会话ID
    private int sessionid;// 会话ID private int command;// 命令
    private int command;// 命令 private int length;// 数据包长
    private int length;// 数据包长 
     private Map<String,String> params=new HashMap<String, String>(); //参数
    private Map<String,String> params=new HashMap<String, String>(); //参数 
     private String ip;
    private String ip;
 public byte getEncode() {
    public byte getEncode() { return encode;
        return encode; }
    }
 public void setEncode(byte encode) {
    public void setEncode(byte encode) { this.encode = encode;
        this.encode = encode; }
    }
 public byte getEncrypt() {
    public byte getEncrypt() { return encrypt;
        return encrypt; }
    }
 public void setEncrypt(byte encrypt) {
    public void setEncrypt(byte encrypt) { this.encrypt = encrypt;
        this.encrypt = encrypt; }
    }
 public byte getExtend1() {
    public byte getExtend1() { return extend1;
        return extend1; }
    }
 public void setExtend1(byte extend1) {
    public void setExtend1(byte extend1) { this.extend1 = extend1;
        this.extend1 = extend1; }
    }
 public byte getExtend2() {
    public byte getExtend2() { return extend2;
        return extend2; }
    }
 public void setExtend2(byte extend2) {
    public void setExtend2(byte extend2) { this.extend2 = extend2;
        this.extend2 = extend2; }
    }
 public int getSessionid() {
    public int getSessionid() { return sessionid;
        return sessionid; }
    }
 public void setSessionid(int sessionid) {
    public void setSessionid(int sessionid) { this.sessionid = sessionid;
        this.sessionid = sessionid; }
    }
 public int getCommand() {
    public int getCommand() { return command;
        return command; }
    }
 public void setCommand(int command) {
    public void setCommand(int command) { this.command = command;
        this.command = command; }
    }
 public int getLength() {
    public int getLength() { return length;
        return length; }
    }
 public void setLength(int length) {
    public void setLength(int length) { this.length = length;
        this.length = length; }
    }
 public Map<String, String> getParams() {
    public Map<String, String> getParams() { return params;
        return params; }
    } 
     public void setValue(String key,String value){
    public void setValue(String key,String value){ params.put(key, value);
        params.put(key, value); }
    } 
     public String getValue(String key){
    public String getValue(String key){ if (key==null) {
        if (key==null) { return null;
            return null; }
        } return params.get(key);
        return params.get(key); }
    }
 public String getIp() {
    public String getIp() { return ip;
        return ip; }
    }
 public void setIp(String ip) {
    public void setIp(String ip) { this.ip = ip;
        this.ip = ip; }
    }
 public void setParams(Map<String, String> params) {
    public void setParams(Map<String, String> params) { this.params = params;
        this.params = params; }
    }
 @Override
    @Override public String toString() {
    public String toString() { return "XLRequest [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2
        return "XLRequest [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2 + ", sessionid=" + sessionid + ", command=" + command + ", length=" + length + ", params=" + params + ", ip=" + ip + "]";
                + ", sessionid=" + sessionid + ", command=" + command + ", length=" + length + ", params=" + params + ", ip=" + ip + "]"; }
    } }
}
二、协议的编码和解码
对于自定义二进制协议,编码解码器往往是Netty开发的重点。这里直接给出相关类的代码。
 package org.jboss.netty.example.xlsvr.codec;
package org.jboss.netty.example.xlsvr.codec;
 import java.nio.ByteBuffer;
import java.nio.ByteBuffer;
 import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler; import org.jboss.netty.example.xlsvr.util.ProtocolUtil;
import org.jboss.netty.example.xlsvr.util.ProtocolUtil; import org.jboss.netty.example.xlsvr.vo.XLResponse;
import org.jboss.netty.example.xlsvr.vo.XLResponse; import org.slf4j.Logger;
import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import org.slf4j.LoggerFactory;
 /**
/** *  @author hankchen
 *  @author hankchen *  2012-2-3 上午10:48:15
 *  2012-2-3 上午10:48:15 */
 */
 /**
/** * 服务器端编码器
 * 服务器端编码器 */
 */ public class XLServerEncoder extends SimpleChannelDownstreamHandler {
public class XLServerEncoder extends SimpleChannelDownstreamHandler { Logger logger=LoggerFactory.getLogger(XLServerEncoder.class);
    Logger logger=LoggerFactory.getLogger(XLServerEncoder.class); 
     @Override
    @Override public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { XLResponse response=(XLResponse)e.getMessage();
        XLResponse response=(XLResponse)e.getMessage(); ByteBuffer headBuffer=ByteBuffer.allocate(16);
        ByteBuffer headBuffer=ByteBuffer.allocate(16); /**
        /** * 先组织报文头
         * 先组织报文头 */
         */ headBuffer.put(response.getEncode());
        headBuffer.put(response.getEncode()); headBuffer.put(response.getEncrypt());
        headBuffer.put(response.getEncrypt()); headBuffer.put(response.getExtend1());
        headBuffer.put(response.getExtend1()); headBuffer.put(response.getExtend2());
        headBuffer.put(response.getExtend2()); headBuffer.putInt(response.getSessionid());
        headBuffer.putInt(response.getSessionid()); headBuffer.putInt(response.getResult());
        headBuffer.putInt(response.getResult()); 
         /**
        /** * 组织报文的数据部分
         * 组织报文的数据部分 */
         */ ChannelBuffer dataBuffer=ProtocolUtil.encode(response.getEncode(),response.getValues());
        ChannelBuffer dataBuffer=ProtocolUtil.encode(response.getEncode(),response.getValues());  int length=dataBuffer.readableBytes();
        int length=dataBuffer.readableBytes(); headBuffer.putInt(length);
        headBuffer.putInt(length); /**
        /** * 非常重要
         * 非常重要 * ByteBuffer需要手动flip(),ChannelBuffer不需要
         * ByteBuffer需要手动flip(),ChannelBuffer不需要 */
         */ headBuffer.flip();
        headBuffer.flip(); ChannelBuffer totalBuffer=ChannelBuffers.dynamicBuffer();
        ChannelBuffer totalBuffer=ChannelBuffers.dynamicBuffer(); totalBuffer.writeBytes(headBuffer);
        totalBuffer.writeBytes(headBuffer); logger.info("totalBuffer size="+totalBuffer.readableBytes());
        logger.info("totalBuffer size="+totalBuffer.readableBytes()); totalBuffer.writeBytes(dataBuffer);
        totalBuffer.writeBytes(dataBuffer); logger.info("totalBuffer size="+totalBuffer.readableBytes());
        logger.info("totalBuffer size="+totalBuffer.readableBytes()); Channels.write(ctx, e.getFuture(), totalBuffer);
        Channels.write(ctx, e.getFuture(), totalBuffer); }
    }
 }
}
 package org.jboss.netty.example.xlsvr.codec;
package org.jboss.netty.example.xlsvr.codec;
 import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.example.xlsvr.util.ProtocolUtil;
import org.jboss.netty.example.xlsvr.util.ProtocolUtil; import org.jboss.netty.example.xlsvr.vo.XLResponse;
import org.jboss.netty.example.xlsvr.vo.XLResponse; import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
 /**
/** *  @author hankchen
 *  @author hankchen *  2012-2-3 上午10:47:54
 *  2012-2-3 上午10:47:54 */
 */
 /**
/** * 客户端解码器
 * 客户端解码器 */
 */ public class XLClientDecoder extends FrameDecoder {
public class XLClientDecoder extends FrameDecoder {
 @Override
    @Override protected Object decode(ChannelHandlerContext context, Channel channel, ChannelBuffer buffer) throws Exception {
    protected Object decode(ChannelHandlerContext context, Channel channel, ChannelBuffer buffer) throws Exception { if (buffer.readableBytes()<16) {
        if (buffer.readableBytes()<16) { return null;
            return null; }
        } buffer.markReaderIndex();
        buffer.markReaderIndex(); byte encode=buffer.readByte();
        byte encode=buffer.readByte(); byte encrypt=buffer.readByte();
        byte encrypt=buffer.readByte(); byte extend1=buffer.readByte();
        byte extend1=buffer.readByte(); byte extend2=buffer.readByte();
        byte extend2=buffer.readByte(); int sessionid=buffer.readInt();
        int sessionid=buffer.readInt(); int result=buffer.readInt();
        int result=buffer.readInt(); int length=buffer.readInt(); // 数据包长
        int length=buffer.readInt(); // 数据包长 if (buffer.readableBytes()<length) {
        if (buffer.readableBytes()<length) { buffer.resetReaderIndex();
            buffer.resetReaderIndex(); return null;
            return null; }
        } ChannelBuffer dataBuffer=ChannelBuffers.buffer(length);
        ChannelBuffer dataBuffer=ChannelBuffers.buffer(length); buffer.readBytes(dataBuffer, length);
        buffer.readBytes(dataBuffer, length); 
         XLResponse response=new XLResponse();
        XLResponse response=new XLResponse(); response.setEncode(encode);
        response.setEncode(encode); response.setEncrypt(encrypt);
        response.setEncrypt(encrypt); response.setExtend1(extend1);
        response.setExtend1(extend1); response.setExtend2(extend2);
        response.setExtend2(extend2); response.setSessionid(sessionid);
        response.setSessionid(sessionid); response.setResult(result);
        response.setResult(result); response.setLength(length);
        response.setLength(length); response.setValues(ProtocolUtil.decode(encode, dataBuffer));
        response.setValues(ProtocolUtil.decode(encode, dataBuffer)); response.setIp(ProtocolUtil.getClientIp(channel));
        response.setIp(ProtocolUtil.getClientIp(channel)); return response;
        return response; }
    }
 }
}
 package org.jboss.netty.example.xlsvr.util;
package org.jboss.netty.example.xlsvr.util;
 import java.net.SocketAddress;
import java.net.SocketAddress; import java.nio.charset.Charset;
import java.nio.charset.Charset; import java.util.HashMap;
import java.util.HashMap; import java.util.Map;
import java.util.Map; import java.util.Map.Entry;
import java.util.Map.Entry;
 import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.Channel;
 /**
/** *  @author hankchen
 *  @author hankchen *  2012-2-4 下午01:57:33
 *  2012-2-4 下午01:57:33 */
 */ public class ProtocolUtil {
public class ProtocolUtil { 
     /**
    /** * 编码报文的数据部分
     * 编码报文的数据部分 * @param encode
     * @param encode * @param values
     * @param values * @return
     * @return */
     */ public static ChannelBuffer encode(int encode,Map<String,String> values){
    public static ChannelBuffer encode(int encode,Map<String,String> values){ ChannelBuffer totalBuffer=null;
        ChannelBuffer totalBuffer=null; if (values!=null && values.size()>0) {
        if (values!=null && values.size()>0) { totalBuffer=ChannelBuffers.dynamicBuffer();
            totalBuffer=ChannelBuffers.dynamicBuffer(); int length=0,index=0;
            int length=0,index=0; ChannelBuffer [] channelBuffers=new ChannelBuffer[values.size()];
            ChannelBuffer [] channelBuffers=new ChannelBuffer[values.size()]; Charset charset=XLCharSetFactory.getCharset(encode);
            Charset charset=XLCharSetFactory.getCharset(encode); for(Entry<String,String> entry:values.entrySet()){
            for(Entry<String,String> entry:values.entrySet()){ String key=entry.getKey();
                String key=entry.getKey(); String value=entry.getValue();
                String value=entry.getValue(); ChannelBuffer buffer=ChannelBuffers.dynamicBuffer();
                ChannelBuffer buffer=ChannelBuffers.dynamicBuffer(); buffer.writeInt(key.length());
                buffer.writeInt(key.length()); buffer.writeBytes(key.getBytes(charset));
                buffer.writeBytes(key.getBytes(charset)); buffer.writeInt(value.length());
                buffer.writeInt(value.length()); buffer.writeBytes(value.getBytes(charset));
                buffer.writeBytes(value.getBytes(charset)); channelBuffers[index++]=buffer;
                channelBuffers[index++]=buffer; length+=buffer.readableBytes();
                length+=buffer.readableBytes(); }
            } 
             for (int i = 0; i < channelBuffers.length; i++) {
            for (int i = 0; i < channelBuffers.length; i++) { totalBuffer.writeBytes(channelBuffers[i]);
                totalBuffer.writeBytes(channelBuffers[i]); }
            } }
        } return totalBuffer;
        return totalBuffer; }
    } 
     /**
    /** * 解码报文的数据部分
     * 解码报文的数据部分 * @param encode
     * @param encode * @param dataBuffer
     * @param dataBuffer * @return
     * @return */
     */ public static Map<String,String> decode(int encode,ChannelBuffer dataBuffer){
    public static Map<String,String> decode(int encode,ChannelBuffer dataBuffer){ Map<String,String> dataMap=new HashMap<String, String>();
        Map<String,String> dataMap=new HashMap<String, String>(); if (dataBuffer!=null && dataBuffer.readableBytes()>0) {
        if (dataBuffer!=null && dataBuffer.readableBytes()>0) { int processIndex=0,length=dataBuffer.readableBytes();
            int processIndex=0,length=dataBuffer.readableBytes(); Charset charset=XLCharSetFactory.getCharset(encode);
            Charset charset=XLCharSetFactory.getCharset(encode); while(processIndex<length){
            while(processIndex<length){ /**
                /** * 获取Key
                 * 获取Key */
                 */ int size=dataBuffer.readInt();
                int size=dataBuffer.readInt(); byte [] contents=new byte [size];
                byte [] contents=new byte [size]; dataBuffer.readBytes(contents);
                dataBuffer.readBytes(contents); String key=new String(contents, charset);
                String key=new String(contents, charset); processIndex=processIndex+size+4;
                processIndex=processIndex+size+4; /**
                /** * 获取Value
                 * 获取Value */
                 */ size=dataBuffer.readInt();
                size=dataBuffer.readInt(); contents=new byte [size];
                contents=new byte [size]; dataBuffer.readBytes(contents);
                dataBuffer.readBytes(contents); String value=new String(contents, charset);
                String value=new String(contents, charset); dataMap.put(key, value);
                dataMap.put(key, value); processIndex=processIndex+size+4;
                processIndex=processIndex+size+4; }
            } }
        } return dataMap;
        return dataMap; }
    } 
     /**
    /** * 获取客户端IP
     * 获取客户端IP * @param channel
     * @param channel * @return
     * @return */
     */ public static String getClientIp(Channel channel){
    public static String getClientIp(Channel channel){ /**
        /** * 获取客户端IP
         * 获取客户端IP */
         */ SocketAddress address = channel.getRemoteAddress();
        SocketAddress address = channel.getRemoteAddress(); String ip = "";
        String ip = ""; if (address != null) {
        if (address != null) { ip = address.toString().trim();
            ip = address.toString().trim(); int index = ip.lastIndexOf(‘:‘);
            int index = ip.lastIndexOf(‘:‘); if (index < 1) {
            if (index < 1) { index = ip.length();
                index = ip.length(); }
            } ip = ip.substring(1, index);
            ip = ip.substring(1, index); }
        } if (ip.length() > 15) {
        if (ip.length() > 15) { ip = ip.substring(Math.max(ip.indexOf("/") + 1, ip.length() - 15));
            ip = ip.substring(Math.max(ip.indexOf("/") + 1, ip.length() - 15)); }
        } return ip;
        return ip; }
    } }
}
三、服务器端实现
服务器端提供的功能是:
1、接收客户端的请求(非关闭命令),返回XLResponse类型的数据。
2、如果客户端的请求是关闭命令:shutdown,则服务器端关闭自身进程。
为了展示多协议的运用,这里客户端的请求采用的是基于问本行(\n\r)的协议。
具体代码如下:
 package org.jboss.netty.example.xlsvr;
package org.jboss.netty.example.xlsvr;
 import java.net.InetSocketAddress;
import java.net.InetSocketAddress; import java.util.concurrent.Executors;
import java.util.concurrent.Executors;
 import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.example.xlsvr.codec.XLServerEncoder;
import org.jboss.netty.example.xlsvr.codec.XLServerEncoder; import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.frame.Delimiters; import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.util.CharsetUtil;
import org.jboss.netty.util.CharsetUtil; import org.slf4j.Logger;
import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import org.slf4j.LoggerFactory;
 /**
/** *  @author hankchen
 *  @author hankchen *  2012-1-30 下午03:21:38
 *  2012-1-30 下午03:21:38 */
 */
 public class XLServer {
public class XLServer { public static final int port =8080;
    public static final int port =8080; public static final Logger logger=LoggerFactory.getLogger(XLServer.class);
    public static final Logger logger=LoggerFactory.getLogger(XLServer.class); public static final ChannelGroup allChannels=new DefaultChannelGroup("XLServer");
    public static final ChannelGroup allChannels=new DefaultChannelGroup("XLServer"); private static final ServerBootstrap serverBootstrap=new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
    private static final ServerBootstrap serverBootstrap=new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); 
     public static void main(String [] args){
    public static void main(String [] args){ try {
        try { XLServer.startup();
            XLServer.startup(); } catch (Exception e) {
        } catch (Exception e) { e.printStackTrace();
            e.printStackTrace(); }
        } }
    } 
     public static boolean startup() throws Exception{
    public static boolean startup() throws Exception{ /**
        /** * 采用默认ChannelPipeline管道
         * 采用默认ChannelPipeline管道 * 这意味着同一个XLServerHandler实例将被多个Channel通道共享
         * 这意味着同一个XLServerHandler实例将被多个Channel通道共享 * 这种方式对于XLServerHandler中无有状态的成员变量是可以的,并且可以提高性能!
         * 这种方式对于XLServerHandler中无有状态的成员变量是可以的,并且可以提高性能! */
         */ ChannelPipeline pipeline=serverBootstrap.getPipeline();
        ChannelPipeline pipeline=serverBootstrap.getPipeline();  /**
        /** * 解码器是基于文本行的协议,\r\n或者\n\r
         * 解码器是基于文本行的协议,\r\n或者\n\r */
         */ pipeline.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter()));
        pipeline.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter())); pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new XLServerEncoder());
        pipeline.addLast("encoder", new XLServerEncoder()); pipeline.addLast("handler", new XLServerHandler());
        pipeline.addLast("handler", new XLServerHandler()); 
         serverBootstrap.setOption("child.tcpNoDelay", true); //注意child前缀
        serverBootstrap.setOption("child.tcpNoDelay", true); //注意child前缀 serverBootstrap.setOption("child.keepAlive", true); //注意child前缀
        serverBootstrap.setOption("child.keepAlive", true); //注意child前缀 
         /**
        /** * ServerBootstrap对象的bind方法返回了一个绑定了本地地址的服务端Channel通道对象
         * ServerBootstrap对象的bind方法返回了一个绑定了本地地址的服务端Channel通道对象 */
         */ Channel channel=serverBootstrap.bind(new InetSocketAddress(port));
        Channel channel=serverBootstrap.bind(new InetSocketAddress(port)); allChannels.add(channel);
        allChannels.add(channel); logger.info("server is started on port "+port);
        logger.info("server is started on port "+port); return false;
        return false; }
    } 
     public static void shutdown() throws Exception{
    public static void shutdown() throws Exception{ try {
        try { /**
            /** * 主动关闭服务器
             * 主动关闭服务器 */
             */ ChannelGroupFuture future=allChannels.close();
            ChannelGroupFuture future=allChannels.close(); future.awaitUninterruptibly();//阻塞,直到服务器关闭
            future.awaitUninterruptibly();//阻塞,直到服务器关闭 //serverBootstrap.releaseExternalResources();
            //serverBootstrap.releaseExternalResources(); } catch (Exception e) {
        } catch (Exception e) { e.printStackTrace();
            e.printStackTrace(); logger.error(e.getMessage(),e);
            logger.error(e.getMessage(),e); }
        } finally{
        finally{ logger.info("server is shutdown on port "+port);
            logger.info("server is shutdown on port "+port); System.exit(1);
            System.exit(1); }
        } }
    } }
}
 package org.jboss.netty.example.xlsvr;
package org.jboss.netty.example.xlsvr;
 import java.util.Random;
import java.util.Random;
 import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandler.Sharable; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.example.xlsvr.vo.XLResponse;
import org.jboss.netty.example.xlsvr.vo.XLResponse; import org.slf4j.Logger;
import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import org.slf4j.LoggerFactory;
 /**
/** *  @author hankchen
 *  @author hankchen *  2012-1-30 下午03:22:24
 *  2012-1-30 下午03:22:24 */
 */
 @Sharable
@Sharable public class XLServerHandler extends SimpleChannelHandler {
public class XLServerHandler extends SimpleChannelHandler { private static final Logger logger=LoggerFactory.getLogger(XLServerHandler.class);
    private static final Logger logger=LoggerFactory.getLogger(XLServerHandler.class); 
     @Override
    @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { logger.info("messageReceived");
        logger.info("messageReceived"); if (e.getMessage() instanceof String) {
        if (e.getMessage() instanceof String) { String content=(String)e.getMessage();
            String content=(String)e.getMessage(); logger.info("content is "+content);
            logger.info("content is "+content); if ("shutdown".equalsIgnoreCase(content)) {
            if ("shutdown".equalsIgnoreCase(content)) { //e.getChannel().close();
                //e.getChannel().close(); XLServer.shutdown();
                XLServer.shutdown(); }else {
            }else { sendResponse(ctx);
                sendResponse(ctx); }
            } }else {
        }else { logger.error("message is not a String.");
            logger.error("message is not a String."); e.getChannel().close();
            e.getChannel().close(); }
        } }
    }
 @Override
    @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { logger.error(e.getCause().getMessage(),e.getCause());
        logger.error(e.getCause().getMessage(),e.getCause()); e.getCause().printStackTrace();
        e.getCause().printStackTrace(); e.getChannel().close();
        e.getChannel().close(); }
    }
 @Override
    @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { logger.info("channelConnected");
        logger.info("channelConnected"); sendResponse(ctx);
        sendResponse(ctx); }
    }
 @Override
    @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { logger.info("channelClosed");
        logger.info("channelClosed"); //删除通道
        //删除通道 XLServer.allChannels.remove(e.getChannel());
        XLServer.allChannels.remove(e.getChannel()); }
    }
 @Override
    @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { logger.info("channelDisconnected");
        logger.info("channelDisconnected"); super.channelDisconnected(ctx, e);
        super.channelDisconnected(ctx, e); }
    }
 @Override
    @Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { logger.info("channelOpen");
        logger.info("channelOpen"); //增加通道
        //增加通道 XLServer.allChannels.add(e.getChannel());
        XLServer.allChannels.add(e.getChannel()); }
    }
 /**
    /** * 发送响应内容
     * 发送响应内容 * @param ctx
     * @param ctx * @param e
     * @param e * @return
     * @return */
     */ private ChannelFuture sendResponse(ChannelHandlerContext ctx){
    private ChannelFuture sendResponse(ChannelHandlerContext ctx){ Channel channel=ctx.getChannel();
        Channel channel=ctx.getChannel(); Random random=new Random();
        Random random=new Random(); XLResponse response=new XLResponse();
        XLResponse response=new XLResponse(); response.setEncode((byte)0);
        response.setEncode((byte)0); response.setResult(1);
        response.setResult(1); response.setValue("name","hankchen");
        response.setValue("name","hankchen"); response.setValue("time", String.valueOf(System.currentTimeMillis()));
        response.setValue("time", String.valueOf(System.currentTimeMillis())); response.setValue("age",String.valueOf(random.nextInt()));
        response.setValue("age",String.valueOf(random.nextInt())); /**
        /** * 发送接收信息的时间戳到客户端
         * 发送接收信息的时间戳到客户端 * 注意:Netty中所有的IO操作都是异步的!
         * 注意:Netty中所有的IO操作都是异步的! */
         */ ChannelFuture future=channel.write(response); //发送内容
        ChannelFuture future=channel.write(response); //发送内容 return future;
        return future; }
    } }
}
四、客户端实现
客户端的功能是连接服务器,发送10次请求,然后发送关闭服务器的命令,最后主动关闭客户端。
关键代码如下:
 /**
/** *  Copyright (C): 2012
 *  Copyright (C): 2012 *  @author hankchen
 *  @author hankchen *  2012-1-30 下午03:21:26
 *  2012-1-30 下午03:21:26 */
 */
 /**
/** * 服务器特征:
 * 服务器特征: * 1、使用专用解码器解析服务器发过来的数据
 * 1、使用专用解码器解析服务器发过来的数据 * 2、客户端主动关闭连接
 * 2、客户端主动关闭连接 */
 */ public class XLClient {
public class XLClient { public static final int port =XLServer.port;
    public static final int port =XLServer.port; public static final String host ="localhost";
    public static final String host ="localhost"; private static final Logger logger=LoggerFactory.getLogger(XLClient.class);
    private static final Logger logger=LoggerFactory.getLogger(XLClient.class); private static final NioClientSocketChannelFactory clientSocketChannelFactory=new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
    private static final NioClientSocketChannelFactory clientSocketChannelFactory=new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); private static final ClientBootstrap clientBootstrap=new ClientBootstrap(clientSocketChannelFactory);
    private static final ClientBootstrap clientBootstrap=new ClientBootstrap(clientSocketChannelFactory); 
     /**
    /** * @param args
     * @param args * @throws Exception
     * @throws Exception  */
     */ public static void main(String[] args) throws Exception {
    public static void main(String[] args) throws Exception { ChannelFuture future=XLClient.startup();
        ChannelFuture future=XLClient.startup(); logger.info("future state is "+future.isSuccess());
        logger.info("future state is "+future.isSuccess()); }
    } 
     /**
    /** * 启动客户端
     * 启动客户端 * @return
     * @return * @throws Exception
     * @throws Exception */
     */ public static ChannelFuture startup() throws Exception {
    public static ChannelFuture startup() throws Exception { /**
        /** * 注意:由于XLClientHandler中有状态的成员变量,因此不能采用默认共享ChannelPipeline的方式
         * 注意:由于XLClientHandler中有状态的成员变量,因此不能采用默认共享ChannelPipeline的方式 * 例如,下面的代码形式是错误的:
         * 例如,下面的代码形式是错误的: * ChannelPipeline pipeline=clientBootstrap.getPipeline();
         * ChannelPipeline pipeline=clientBootstrap.getPipeline(); * pipeline.addLast("handler", new XLClientHandler());
         * pipeline.addLast("handler", new XLClientHandler()); */
         */ clientBootstrap.setPipelineFactory(new XLClientPipelineFactory()); //只能这样设置
        clientBootstrap.setPipelineFactory(new XLClientPipelineFactory()); //只能这样设置 /**
        /** * 请注意,这里不存在使用“child.”前缀的配置项,客户端的SocketChannel实例不存在父级Channel对象
         * 请注意,这里不存在使用“child.”前缀的配置项,客户端的SocketChannel实例不存在父级Channel对象 */
         */ clientBootstrap.setOption("tcpNoDelay", true);
        clientBootstrap.setOption("tcpNoDelay", true); clientBootstrap.setOption("keepAlive", true);
        clientBootstrap.setOption("keepAlive", true); 
         ChannelFuture future=clientBootstrap.connect(new InetSocketAddress(host, port));
        ChannelFuture future=clientBootstrap.connect(new InetSocketAddress(host, port)); /**
        /** * 阻塞式的等待,直到ChannelFuture对象返回这个连接操作的成功或失败状态
         * 阻塞式的等待,直到ChannelFuture对象返回这个连接操作的成功或失败状态 */
         */ future.awaitUninterruptibly();
        future.awaitUninterruptibly(); /**
        /** * 如果连接失败,我们将打印连接失败的原因。
         * 如果连接失败,我们将打印连接失败的原因。 * 如果连接操作没有成功或者被取消,ChannelFuture对象的getCause()方法将返回连接失败的原因。
         * 如果连接操作没有成功或者被取消,ChannelFuture对象的getCause()方法将返回连接失败的原因。 */
         */ if (!future.isSuccess()) {
        if (!future.isSuccess()) { future.getCause().printStackTrace();
            future.getCause().printStackTrace(); }else {
        }else { logger.info("client is connected to server "+host+":"+port);
            logger.info("client is connected to server "+host+":"+port); }
        } return future;
        return future; }
    } 
     /**
    /** * 关闭客户端
     * 关闭客户端 * @param future
     * @param future * @throws Exception
     * @throws Exception */
     */ public static void shutdown(ChannelFuture future) throws Exception{
    public static void shutdown(ChannelFuture future) throws Exception{ try {
        try { /**
            /** * 主动关闭客户端连接,会阻塞等待直到通道关闭
             * 主动关闭客户端连接,会阻塞等待直到通道关闭 */
             */ future.getChannel().close().awaitUninterruptibly();
            future.getChannel().close().awaitUninterruptibly(); //future.getChannel().getCloseFuture().awaitUninterruptibly();
            //future.getChannel().getCloseFuture().awaitUninterruptibly(); /**
            /** * 释放ChannelFactory通道工厂使用的资源。
             * 释放ChannelFactory通道工厂使用的资源。 * 这一步仅需要调用 releaseExternalResources()方法即可。
             * 这一步仅需要调用 releaseExternalResources()方法即可。 * 包括NIO Secector和线程池在内的所有资源将被自动的关闭和终止。
             * 包括NIO Secector和线程池在内的所有资源将被自动的关闭和终止。 */
             */ clientBootstrap.releaseExternalResources();
            clientBootstrap.releaseExternalResources(); } catch (Exception e) {
        } catch (Exception e) { e.printStackTrace();
            e.printStackTrace(); logger.error(e.getMessage(),e);
            logger.error(e.getMessage(),e); }
        } finally{
        finally{ System.exit(1);
            System.exit(1); logger.info("client is shutdown to server "+host+":"+port);
            logger.info("client is shutdown to server "+host+":"+port); }
        } }
    } }
}
 public class XLClientPipelineFactory implements ChannelPipelineFactory{
public class XLClientPipelineFactory implements ChannelPipelineFactory{
 @Override
    @Override public ChannelPipeline getPipeline() throws Exception {
    public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline=Channels.pipeline();
        ChannelPipeline pipeline=Channels.pipeline(); /**
        /** * 使用专用的解码器,解决数据分段的问题
         * 使用专用的解码器,解决数据分段的问题 * 从业务逻辑代码中分离协议处理部分总是一个很不错的想法。
         * 从业务逻辑代码中分离协议处理部分总是一个很不错的想法。 */
         */ pipeline.addLast("decoder", new XLClientDecoder());
        pipeline.addLast("decoder", new XLClientDecoder()); /**
        /** * 有专门的编码解码器,这时处理器就不需要管数据分段和数据格式问题,只需要关注业务逻辑了!
         * 有专门的编码解码器,这时处理器就不需要管数据分段和数据格式问题,只需要关注业务逻辑了! */
         */ pipeline.addLast("handler", new XLClientHandler());
        pipeline.addLast("handler", new XLClientHandler()); return pipeline;
        return pipeline; }
    }
 }
}
 /**
/** *  Copyright (C): 2012
 *  Copyright (C): 2012 *  @author hankchen
 *  @author hankchen *  2012-1-30 下午03:21:52
 *  2012-1-30 下午03:21:52 */
 */
 /**
/** * 服务器特征:
 * 服务器特征: * 1、使用专用的编码解码器,解决数据分段的问题
 * 1、使用专用的编码解码器,解决数据分段的问题 * 2、使用POJO替代ChannelBuffer传输
 * 2、使用POJO替代ChannelBuffer传输 */
 */ public class XLClientHandler extends SimpleChannelHandler {
public class XLClientHandler extends SimpleChannelHandler { private static final Logger logger=LoggerFactory.getLogger(XLClientHandler.class);
    private static final Logger logger=LoggerFactory.getLogger(XLClientHandler.class); private final AtomicInteger count=new AtomicInteger(0); //计数器
    private final AtomicInteger count=new AtomicInteger(0); //计数器 
     @Override
    @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { processMethod1(ctx, e); //处理方式一
        processMethod1(ctx, e); //处理方式一 }
    } 
     /**
    /** * @param ctx
     * @param ctx * @param e
     * @param e * @throws Exception
     * @throws Exception */
     */ public void processMethod1(ChannelHandlerContext ctx, MessageEvent e) throws Exception{
    public void processMethod1(ChannelHandlerContext ctx, MessageEvent e) throws Exception{ logger.info("processMethod1……,count="+count.addAndGet(1));
        logger.info("processMethod1……,count="+count.addAndGet(1)); XLResponse serverTime=(XLResponse)e.getMessage();
        XLResponse serverTime=(XLResponse)e.getMessage(); logger.info("messageReceived,content:"+serverTime.toString());
        logger.info("messageReceived,content:"+serverTime.toString()); Thread.sleep(1000);
        Thread.sleep(1000); 
         if (count.get()<10) {
        if (count.get()<10) { //从新发送请求获取最新的服务器时间
            //从新发送请求获取最新的服务器时间 ctx.getChannel().write(ChannelBuffers.wrappedBuffer("again\r\n".getBytes()));
            ctx.getChannel().write(ChannelBuffers.wrappedBuffer("again\r\n".getBytes())); }else{
        }else{ //从新发送请求关闭服务器
            //从新发送请求关闭服务器 ctx.getChannel().write(ChannelBuffers.wrappedBuffer("shutdown\r\n".getBytes()));
            ctx.getChannel().write(ChannelBuffers.wrappedBuffer("shutdown\r\n".getBytes())); }
        } }
    } 
     @Override
    @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { logger.info("exceptionCaught");
        logger.info("exceptionCaught"); e.getCause().printStackTrace();
        e.getCause().printStackTrace(); ctx.getChannel().close();
        ctx.getChannel().close(); }
    }
 @Override
    @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { logger.info("channelClosed");
        logger.info("channelClosed"); super.channelClosed(ctx, e);
        super.channelClosed(ctx, e); }
    } 
     
     }
}全文代码较多,写了很多注释,希望对读者有用,谢谢!
(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen)
http://www.blogjava.net/hankchen/archive/2012/02/04/369378.html
原文:http://www.cnblogs.com/findumars/p/6357305.html