原生的 socket 编程:Java 中的 IO 与 socket 编程 [ 复习 ]
介绍
Netty 项目是一个提供异步事件驱动网络应用框架。
示例
1.discord 服务
handler
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//默默的丢弃接收到的数据
ByteBuf in = (ByteBuf) msg;
try{
System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII));
} finally{
ReferenceCountUtil.release(msg); //(2)
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
//出现异常时直接关闭链接
ctx.close();
}
}
server
public class DiscardServer {
private int port;
public DiscardServer(int port){
this.port=port;
}
public void run() {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port;
if(args.length>0){
port=Integer.parseInt(args[0]);
}else{
port=8080;
}
new DiscardServer(port).run();
}
}
使用命令行作为客户端
$ ncat localhost 8080 hello hi hello world

2.echo 服务
handler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
//出现异常时直接关闭链接
ctx.close();
}
}
同样命令行作为客户端

3.time 服务
server handler
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
ChannelFuture f = ctx.writeAndFlush(time);
// f.addListener(ChannelFutureListener.CLOSE);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assert f == future;
ctx.close();
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
server
public class TimeServer {
private final int port;
public TimeServer(int port){
this.port=port;
}
public void run() {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port;
if(args.length>0){
port=Integer.parseInt(args[0]);
}else{
port=8080;
}
new TimeServer(port).run();
}
}
client handler
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
decoder
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
client
public class TimeClient {
public static void main(String[] args) {
String host = "localhost";
int port = 8080;
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
//启动客户端
ChannelFuture future = b.connect(host, port).sync();
//在连接关闭之前保持等待
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
}
233
原文:https://www.cnblogs.com/lemos/p/13364211.html