先了解一下 netty 大概框架图 ,可以看到客户端的创建和服务端最大的区别 - 服务端传入两个 EventLoopGroup,客户端传入一个 EventLoopGroup - channel 的类型也不同,服务端传入的是 NioServerSocketChannel ,客户端传入的是 NioSocketChannel - 服务端存在 childHandler 的设置,客户端没有,
客户端连接过程 : - 和服务端一样先创建 EventLoopGroup (只有一个,内部多个保持多个EventLoop线程,执行处理事务) - connect 方法 ,和服务端一样,使用group里的EventLoop创建一个 channel ,然后注册到 select 中去(这个过程都在channel 中进行) - (异步执行)当连接不上就会交给 EventLoop线程中执行监听的任务。 - 而一旦监听到了就交给 channel 执行。 - selectKey 可以attack一个object,刚好可以用来放channel ,然后在某个线程监听到某个实现的时候再把 channel 拿出来用
实际上客户端只有一个 Reactor . 那么重点的逻辑就到了 connect 那里
/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validate();
return doConnect(remoteAddress, localAddress);
}
/**
* @see {@link #connect()}
*/
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
//创建channel ,注册到 select , initAndRegister方法是父类的方法我们在分析服务端
//的时候已经分析过了
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
//一开始就连接上了,
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise = channel.newPromise();
if (regFuture.isDone()) {
//链路成功后,异步连接 TCP
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
return promise;
}
private static void doConnect0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
// 到了执行连接的操作就转到了Netty 的 NIO线程执行,此刻客户端返回,连接异步执行。
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
if (localAddress == null) {
//没传 localAddress 会传到 TailHandler的connect方法
channel.connect(remoteAddress, promise);
} else {
//正常情况下到 HeaderHandler的connect 方法
channel.connect(remoteAddress, localAddress, promise);
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
我们看一下 HeaderHandler的connect 方法。
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
//执行 HeaderHandler内的unsafe字段的 connect 方法
unsafe.connect(remoteAddress, localAddress, promise);
}
unsafe会执行AbstractNioChannel(这个类是NioServerSocketChannel和NioSocketChannel的共同父类)的connect 方法
@Override
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}
boolean wasActive = isActive();
//doConnect 方法是个抽象方法
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
promise.tryFailure(t);
closeIfClosed();
}
}
NioSocketChannel 的 doConnect 方法
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
javaChannel().socket().bind(localAddress);
}
boolean success = false;
try {
boolean connected = javaChannel().connect(remoteAddress);
if (!connected) {
//如果绑定不成功,注册连接事件
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
后续更新...
原文:https://www.cnblogs.com/Benjious/p/11613265.html