首页 > Web开发 > 详细

Netty源码分析之NioEventLoopGroup创建

时间:2019-05-01 14:53:05      阅读:134      评论:0      收藏:0      [点我收藏+]

NioEventLoopGroup的创建

一般是通过创建NioEventLoopGroup来创建NioEventLoop。

bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();

NioEventLoopGroup的构造函数:

public NioEventLoopGroup() {
    this(0);
}

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (ThreadFactory)null);
}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
    this(nThreads, threadFactory, SelectorProvider.provider());
}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(nThreads, threadFactory, new Object[]{selectorProvider});
}

  可以从默认的构造函数看出来,默认传入的线程数是0,默认传入的ThreadFactory是null,并且传入了一个selectorProvider。最后层层调用了父类的构造函数。

//MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0?DEFAULT_EVENT_LOOP_THREADS:nThreads, threadFactory, args);
}

  这里会判断传入的线程数是不是等于0,如果等于0,就将线程数设置为2*cpu。默认情况下不传参数,会创建2*cpu个线程数的线程池NioEventLoopGroup。然后继续调用父类构造函数

//MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {    this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
        if(nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", new Object[]{Integer.valueOf(nThreads)}));
    } else {
             //创建ThreadFactory
        if(threadFactory == null) {
            threadFactory = this.newDefaultThreadFactory();
        }
              
             //创建child线程组
        this.children = new SingleThreadEventExecutor[nThreads];

             //创建线程选择器
             if(isPowerOfTwo(this.children.length)) {
            this.chooser = new MultithreadEventExecutorGroup.PowerOfTwoEventExecutorChooser(null);
        } else {
            this.chooser = new MultithreadEventExecutorGroup.GenericEventExecutorChooser(null);
        }
             
             //使用newChild方法初始化child线程组,这里要将threadFactory传入
        int len$;
        for(int terminationListener = 0; terminationListener < nThreads; ++terminationListener) {
            boolean arr$ = false;
            boolean var17 = false;

            try {
                var17 = true;
                this.children[terminationListener] = this.newChild(threadFactory, args);
                arr$ = true;
                var17 = false;
            } catch (Exception var18) {
                throw new IllegalStateException("failed to create a child event loop", var18);
            } finally {
                if(var17) {
                    if(!arr$) {
                        int j;
                        for(j = 0; j < terminationListener; ++j) {
                            this.children[j].shutdownGracefully();
                        }

                        for(j = 0; j < terminationListener; ++j) {
                            EventExecutor e1 = this.children[j];

                            try {
                                while(!e1.isTerminated()) {
                                    e1.awaitTermination(2147483647L, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException var19) {
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }

                }
            }

            if(!arr$) {
                for(len$ = 0; len$ < terminationListener; ++len$) {
                    this.children[len$].shutdownGracefully();
                }

                for(len$ = 0; len$ < terminationListener; ++len$) {
                    EventExecutor i$ = this.children[len$];

                    try {
                        while(!i$.isTerminated()) {
                            i$.awaitTermination(2147483647L, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException var21) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }

        FutureListener var22 = new FutureListener() {
            public void operationComplete(Future<Object> future) throws Exception {
                if(MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
                    MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);
                }

            }
        };
        EventExecutor[] var23 = this.children;
        len$ = var23.length;

        for(int var24 = 0; var24 < len$; ++var24) {
            EventExecutor e = var23[var24];
            e.terminationFuture().addListener(var22);
        }

    }
}    

  

这里的构造函数主要做了三件事情;

1.创建ThreadFactory线程构造器

2.使用newChild方法创建child线程组

3.创建线程选择器

 

创建threadFactory

这里的threadFactory相当于是线程池聚合的一个组件,它是通过newDefaultThreadFactory进行创建的。

protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(this.getClass(), 10);
}

 DefaultThreadFactory的构造函数:

public DefaultThreadFactory(Class<?> poolType, int priority) {
    this((Class)poolType, false, priority);
}
public DefaultThreadFactory(String poolName, boolean daemon) {
    this((String)poolName, daemon, 5);
}

public DefaultThreadFactory(Class<?> poolType, int priority) {
    this((Class)poolType, false, priority);
}

public DefaultThreadFactory(String poolName, int priority) {
    this((String)poolName, false, priority);
}

public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
    this((String)toPoolName(poolType), daemon, priority);
}

private static String toPoolName(Class<?> poolType) {
    if(poolType == null) {
        throw new NullPointerException("poolType");
    } else {
        String poolName = StringUtil.simpleClassName(poolType);
        switch(poolName.length()) {
            case 0:
                return "unknown";
            case 1:
                return poolName.toLowerCase(Locale.US);
            default:
                return Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))?Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1):poolName;
        }
    }
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this.nextId = new AtomicInteger();
if(poolName == null) {
throw new NullPointerException("poolName");
} else if(priority >= 1 && priority <= 10) {
this.prefix = poolName + ‘-‘ + poolId.incrementAndGet() + ‘-‘;
this.daemon = daemon;
this.priority = priority;
} else {
throw new IllegalArgumentException("priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
}
public Thread newThread(Runnable r) {
Thread t = this.newThread(new DefaultThreadFactory.DefaultRunnableDecorator(r), this.prefix + this.nextId.incrementAndGet());

try {
if(t.isDaemon()) {
if(!this.daemon) {
t.setDaemon(false);
}
} else if(this.daemon) {
t.setDaemon(true);
}

if(t.getPriority() != this.priority) {
t.setPriority(this.priority);
}
} catch (Exception var4) {
;
}

return t;
}

  这里传入了当前的类类型NioEventLoopGroup,设置当前线程池的名称为nioEventLoopGroup,线程的优先级为5,然后载调用父类构造函数当前线程池的前缀nioEventLoopGroup-线程池id-。

  当每次进行newThread的时候,会将线程池前缀和当前的线程数传入,所以线程的名称就为nioEventLoopGroup-线程池id-线程数id。

  例如:nioEventLoopGroup-2-1 代表的是第二个线程池的第一个线程。

protected Thread newThread(Runnable r, String name) {
    return new FastThreadLocalThread(r, name);
}

  当调用newThread方法时,其实创建的是一个FastThreadLocalThread对象,netty底层的一个Thread对象,对Thread进行了包装,优化了相关的ThreadLocal操作。

 

通过newChild()方法创建NioEventLoop线程组

  这里的newChild()方法就是创建一个NioEventLoop。

protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
    return new NioEventLoop(this, threadFactory, (SelectorProvider)args[0]);
}

  NioEventLoop的构造,要传入父线程池,threadFactory,和provider。每一个NioEventLoop都有一个selector与它进行绑定,为了实现IO多路复用。

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(parent, threadFactory, false);
    if(selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    } else {
        this.provider = selectorProvider;
        this.selector = this.openSelector();
    }
}

  创建任务队列,如果不是NioEventLoop对应的线程,当是外部线程时,就将任务放入任务队列,否则使用线程执行任务。这里的任务队列是LinkedBlockingQueue

protected Queue<Runnable> newTaskQueue() {
    return new LinkedBlockingQueue();
}

  NioEventLoop的主要组件就是selector,taskqueue,thread,selector用于IO多路复用,使用selector才能使得一个线程能处理多个连接(channel的事件),而thread就是存储当前NioEventLoop的线程,如果不在当前线程中,那么就将任务提交到任务队列中taskqueue。

 

创建线程选择器

  线程选择器是对应NioEventLoopGroup.next()方法,每当有一个新的连接进入的时候,NioEventLoopGroup要选择一个NioEventLoop和新连接进行绑定。

技术分享图片

 

  在创建线程选择器的时候会判断当前线程池的个数是不是2的幂次方,如果是就创建一个PowerOfTwoEventExecutorChooser,否则就创建一个GenericEventExecutorChooser。

if(isPowerOfTwo(this.children.length)) {
    this.chooser = new MultithreadEventExecutorGroup.PowerOfTwoEventExecutorChooser(null);
} else {
    this.chooser = new MultithreadEventExecutorGroup.GenericEventExecutorChooser(null);
}

  普通的线程选择器:

private final class GenericEventExecutorChooser implements MultithreadEventExecutorGroup.EventExecutorChooser {
    private GenericEventExecutorChooser() {
    }

    public EventExecutor next() {
        return MultithreadEventExecutorGroup.this.children[Math.abs(MultithreadEventExecutorGroup.this.childIndex.getAndIncrement() % MultithreadEventExecutorGroup.this.children.length)];
    }
}

  二的幂次方线程选择器:

private final class PowerOfTwoEventExecutorChooser implements MultithreadEventExecutorGroup.EventExecutorChooser {
    private PowerOfTwoEventExecutorChooser() {
    }

    public EventExecutor next() {
        return MultithreadEventExecutorGroup.this.children[MultithreadEventExecutorGroup.this.childIndex.getAndIncrement() & MultithreadEventExecutorGroup.this.children.length - 1];
    }
}

  但满足线程数是2的幂次方时,采用&操作来优化,&比%要高效很多。

Netty源码分析之NioEventLoopGroup创建

原文:https://www.cnblogs.com/xiaobaituyun/p/10799786.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!