NioEventLoopGroup的创建
一般是通过创建NioEventLoopGroup来创建NioEventLoop。
bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup();
NioEventLoopGroup的构造函数:
public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) { this(nThreads, (ThreadFactory)null); } public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(nThreads, threadFactory, new Object[]{selectorProvider}); }
可以从默认的构造函数看出来,默认传入的线程数是0,默认传入的ThreadFactory是null,并且传入了一个selectorProvider。最后层层调用了父类的构造函数。
//MultithreadEventLoopGroup protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0?DEFAULT_EVENT_LOOP_THREADS:nThreads, threadFactory, args); }
这里会判断传入的线程数是不是等于0,如果等于0,就将线程数设置为2*cpu。默认情况下不传参数,会创建2*cpu个线程数的线程池NioEventLoopGroup。然后继续调用父类构造函数
//MultithreadEventExecutorGroup protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); if(nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", new Object[]{Integer.valueOf(nThreads)})); } else { //创建ThreadFactory if(threadFactory == null) { threadFactory = this.newDefaultThreadFactory(); } //创建child线程组 this.children = new SingleThreadEventExecutor[nThreads]; //创建线程选择器 if(isPowerOfTwo(this.children.length)) { this.chooser = new MultithreadEventExecutorGroup.PowerOfTwoEventExecutorChooser(null); } else { this.chooser = new MultithreadEventExecutorGroup.GenericEventExecutorChooser(null); } //使用newChild方法初始化child线程组,这里要将threadFactory传入 int len$; for(int terminationListener = 0; terminationListener < nThreads; ++terminationListener) { boolean arr$ = false; boolean var17 = false; try { var17 = true; this.children[terminationListener] = this.newChild(threadFactory, args); arr$ = true; var17 = false; } catch (Exception var18) { throw new IllegalStateException("failed to create a child event loop", var18); } finally { if(var17) { if(!arr$) { int j; for(j = 0; j < terminationListener; ++j) { this.children[j].shutdownGracefully(); } for(j = 0; j < terminationListener; ++j) { EventExecutor e1 = this.children[j]; try { while(!e1.isTerminated()) { e1.awaitTermination(2147483647L, TimeUnit.SECONDS); } } catch (InterruptedException var19) { Thread.currentThread().interrupt(); break; } } } } } if(!arr$) { for(len$ = 0; len$ < terminationListener; ++len$) { this.children[len$].shutdownGracefully(); } for(len$ = 0; len$ < terminationListener; ++len$) { EventExecutor i$ = this.children[len$]; try { while(!i$.isTerminated()) { i$.awaitTermination(2147483647L, TimeUnit.SECONDS); } } catch (InterruptedException var21) { Thread.currentThread().interrupt(); break; } } } } FutureListener var22 = new FutureListener() { public void operationComplete(Future<Object> future) throws Exception { if(MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) { MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null); } } }; EventExecutor[] var23 = this.children; len$ = var23.length; for(int var24 = 0; var24 < len$; ++var24) { EventExecutor e = var23[var24]; e.terminationFuture().addListener(var22); } } }
这里的构造函数主要做了三件事情;
1.创建ThreadFactory线程构造器
2.使用newChild方法创建child线程组
3.创建线程选择器
创建threadFactory
这里的threadFactory相当于是线程池聚合的一个组件,它是通过newDefaultThreadFactory进行创建的。
protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(this.getClass(), 10); }
DefaultThreadFactory的构造函数:
public DefaultThreadFactory(Class<?> poolType, int priority) { this((Class)poolType, false, priority); } public DefaultThreadFactory(String poolName, boolean daemon) { this((String)poolName, daemon, 5); } public DefaultThreadFactory(Class<?> poolType, int priority) { this((Class)poolType, false, priority); } public DefaultThreadFactory(String poolName, int priority) { this((String)poolName, false, priority); } public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) { this((String)toPoolName(poolType), daemon, priority); } private static String toPoolName(Class<?> poolType) { if(poolType == null) { throw new NullPointerException("poolType"); } else { String poolName = StringUtil.simpleClassName(poolType); switch(poolName.length()) { case 0: return "unknown"; case 1: return poolName.toLowerCase(Locale.US); default: return Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))?Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1):poolName; } } }
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this.nextId = new AtomicInteger();
if(poolName == null) {
throw new NullPointerException("poolName");
} else if(priority >= 1 && priority <= 10) {
this.prefix = poolName + ‘-‘ + poolId.incrementAndGet() + ‘-‘;
this.daemon = daemon;
this.priority = priority;
} else {
throw new IllegalArgumentException("priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
}
public Thread newThread(Runnable r) {
Thread t = this.newThread(new DefaultThreadFactory.DefaultRunnableDecorator(r), this.prefix + this.nextId.incrementAndGet());
try {
if(t.isDaemon()) {
if(!this.daemon) {
t.setDaemon(false);
}
} else if(this.daemon) {
t.setDaemon(true);
}
if(t.getPriority() != this.priority) {
t.setPriority(this.priority);
}
} catch (Exception var4) {
;
}
return t;
}
这里传入了当前的类类型NioEventLoopGroup,设置当前线程池的名称为nioEventLoopGroup,线程的优先级为5,然后载调用父类构造函数当前线程池的前缀nioEventLoopGroup-线程池id-。
当每次进行newThread的时候,会将线程池前缀和当前的线程数传入,所以线程的名称就为nioEventLoopGroup-线程池id-线程数id。
例如:nioEventLoopGroup-2-1 代表的是第二个线程池的第一个线程。
protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(r, name); }
当调用newThread方法时,其实创建的是一个FastThreadLocalThread对象,netty底层的一个Thread对象,对Thread进行了包装,优化了相关的ThreadLocal操作。
通过newChild()方法创建NioEventLoop线程组
这里的newChild()方法就是创建一个NioEventLoop。
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider)args[0]); }
NioEventLoop的构造,要传入父线程池,threadFactory,和provider。每一个NioEventLoop都有一个selector与它进行绑定,为了实现IO多路复用。
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(parent, threadFactory, false); if(selectorProvider == null) { throw new NullPointerException("selectorProvider"); } else { this.provider = selectorProvider; this.selector = this.openSelector(); } }
创建任务队列,如果不是NioEventLoop对应的线程,当是外部线程时,就将任务放入任务队列,否则使用线程执行任务。这里的任务队列是LinkedBlockingQueue
protected Queue<Runnable> newTaskQueue() { return new LinkedBlockingQueue(); }
NioEventLoop的主要组件就是selector,taskqueue,thread,selector用于IO多路复用,使用selector才能使得一个线程能处理多个连接(channel的事件),而thread就是存储当前NioEventLoop的线程,如果不在当前线程中,那么就将任务提交到任务队列中taskqueue。
创建线程选择器
线程选择器是对应NioEventLoopGroup.next()方法,每当有一个新的连接进入的时候,NioEventLoopGroup要选择一个NioEventLoop和新连接进行绑定。
在创建线程选择器的时候会判断当前线程池的个数是不是2的幂次方,如果是就创建一个PowerOfTwoEventExecutorChooser,否则就创建一个GenericEventExecutorChooser。
if(isPowerOfTwo(this.children.length)) { this.chooser = new MultithreadEventExecutorGroup.PowerOfTwoEventExecutorChooser(null); } else { this.chooser = new MultithreadEventExecutorGroup.GenericEventExecutorChooser(null); }
普通的线程选择器:
private final class GenericEventExecutorChooser implements MultithreadEventExecutorGroup.EventExecutorChooser { private GenericEventExecutorChooser() { } public EventExecutor next() { return MultithreadEventExecutorGroup.this.children[Math.abs(MultithreadEventExecutorGroup.this.childIndex.getAndIncrement() % MultithreadEventExecutorGroup.this.children.length)]; } }
二的幂次方线程选择器:
private final class PowerOfTwoEventExecutorChooser implements MultithreadEventExecutorGroup.EventExecutorChooser { private PowerOfTwoEventExecutorChooser() { } public EventExecutor next() { return MultithreadEventExecutorGroup.this.children[MultithreadEventExecutorGroup.this.childIndex.getAndIncrement() & MultithreadEventExecutorGroup.this.children.length - 1]; } }
但满足线程数是2的幂次方时,采用&操作来优化,&比%要高效很多。
原文:https://www.cnblogs.com/xiaobaituyun/p/10799786.html