首页 > 编程语言 > 详细

Dubbo 线程池模型

时间:2021-06-04 01:20:51      阅读:20      评论:0      收藏:0      [点我收藏+]

Dubbo 线程池模型

 

前言

大家好,今天开始给大家分享 — Dubbo 专题之 Dubbo 线程池模型。在前面上个章节中我们讨论了 Dubbo SPI,了解了 Dubbo SPI 其本质是从 JDK 标准的 SPI (Service Provider Interface) 扩展点发现机制加强而来,同时解决了 Java 中 SPI 的一些缺陷。以及我们使用 Dubbo SPI 实现自定义能力的拓展。那本章节我们要讨论的 Dubbo 线程模型也是基于 SPI 实现,那什么是线程模型呢?以及其在我们的项目中有什么作用呢?那么我们在本章节中进行讨论。下面就让我们快速开始吧!

 

1. 线程模型简介

小伙伴如果对 Servlet 熟悉就知道,从 Servlet 3.x开始支持异步非阻塞模式。至于什么异步非阻塞前面我在前面的章节中有讨论小伙伴可以自行学习之前的文章。我们通过一个访问Web应用流程图简单说明:

技术分享图片

在上面的流程图中我们可以看到第一个请求发起同步 Web 调用,然后 Web 再发起对第三方服务的调用,整个过程全链路是同步调用。第二个请求同样也是发起同步调用,但是在发起第三方调用的时候切换了线程(基于 Servlet 3.x 我们不需要手动的创建线程来切换)。这么做的好处在于我们可以用专门处理线程池去做业务处理或第三方服务的调用。那什么情况下我们需要切换线程不使用主线程呢?如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度。但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库或其它服务调用时,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。

 

2. 使用方式

那在 Dubbo 中给我们提供了通过不同的派发策略和不同的线程池配置的组合来应对不同的场景。配置方式如下:

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

下面我们简单描述下dispatcherthreadpool的参数说明:

  1. Dispatcher

    • all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。(默认)

    • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。

    • message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。

    • execution 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。

    • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

    1. ThreadPool

      • fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(默认)

      • cached 缓存线程池,空闲一分钟自动删除,需要时重建。

      • limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。

      • eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)

       

      3. 使用场景

      通过前面的介绍我们应该明白我们为什么需要切换线程,遵循一个很简单的原则:如果我们处理的任务需要操作新的 IO 或者处理任务需要很长的时间那么我们就可以把这部分工作放到我们的任务线程池去处理。那么我们简单的总结下在工作常遇到的场景:

      1. 计算型服务:在我之前的工作中遇到这样的一个需求:我们的车机实时上报数据给服务器,服务器记录数据并且实时计算和纠正导航数据。那么这里我们需要一个计算型的微服务,主要的工作就是计算和修正实时数据,那么这个服务就是典型的计算型服务,所有我们计算过程中尽量减少线程的切换并尽可能的在一个线程内进行计算。这样减少线程切换的开销提供计算速度。

      2. 网关服务:首先我们需要了解什么是网关,简单的理解就是所有的服务入口,对每个服务的调用必须经过网关转发到对应服务上(类似 Nginx )。那这里网关主要工作就是服务转发(鉴权、限流等等),可以理解为发起请求。很明显发起请求就是开启新的 IO 所有我们可以切换到线程池去处理。

       

      4. 示例演示

      下面我们通过以获取图书列表为例进行演示。以下是项目的结构图:

      技术分享图片

      因为这里我们主要是对服务提供端的配置,所有我们主要看dubbo-provider-xml.xml配置内容:

      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
            xmlns="http://www.springframework.org/schema/beans"
            xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
            http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
      ?
         <!-- 指定分发策略为:all 线程池:fixed 固定大小为:100 -->
         <dubbo:protocol port="20880" name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
      ?
         <dubbo:application name="demo-provider" metadata-type="remote"/>
      ?
         <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
      ?
         <bean id="bookFacade" class="com.muke.dubbocourse.test.provider.BookFacadeImpl"/>
      ?
         <!--暴露服务为Dubbo服务-->
         <dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" ref="bookFacade" />
      ?
      </beans>

      上面的 XML 配置中dispatcher="all"指定事件的分发策略、threadpool="fixed" threads="100"指定线程池固定大小为100

       

      5. 原理分析

      这里分发策略和线程池采用 Dubbo 中的 SPI 方式加载的小伙伴可以参考前面的 《Dubbo SPI》章节进行了解。下面我们进入主题,首先看看在 Dubbo 中为我们提供的5种事件分发策略:

      技术分享图片

      我们这里简单的分析 all分发策略其它的都是类似的小伙伴自行查阅源码分析。下面我们看看org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler核心源码:

      ?
      /***
      *@className AllChannelHandler
      *       
      *@description 所有处理分发到线程池去处理
      *       
      *@author <a href="http://youngitman.tech">青年IT男</a>
      *       
      *@date 12:50 2020-03-05
      *       
      *@JunitTest: {@link }     
      *
      *@version v1.0.0
      *       
      **/
      public class AllChannelHandler extends WrappedChannelHandler {
      ?
         public AllChannelHandler(ChannelHandler handler, URL url) {
             super(handler, url);
        }
      ?
         /**
          *
          * 远程连接事件回调
          *
          * @author liyong 
          * @date 1:34 PM 2020/12/6 
          * @param channel 
          * @exception 
          * @return void 
          **/
         @Override
         public void connected(Channel channel) throws RemotingException {
             ExecutorService executor getExecutorService();
             try {
                 //连接到远程事件放入线程池执行
                 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
            } catch (Throwable t) {
                 throw new ExecutionException("connect event", channel, getClass() " error when process connected event .", t);
            }
        }
      ?
         /**
          *
          * 端口远程连接
          *
          * @author liyong 
          * @date 1:34 PM 2020/12/6 
          * @param channel 
          * @exception 
          * @return void 
          **/
         @Override
         public void disconnected(Channel channel) throws RemotingException {
             ExecutorService executor getExecutorService();
             try {
                 //断开连接处理事件放入线程池执行
                 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
            } catch (Throwable t) {
                 throw new ExecutionException("disconnect event", channel, getClass() " error when process disconnected event .", t);
            }
        }
      ?
         /**
          *
          * 接收到数据回调
          *
          * @author liyong 
          * @date 1:34 PM 2020/12/6 
          * @param channel 
          * @param message 
          * @exception 
          * @return void 
          **/
         @Override
         public void received(Channel channel, Object message) throws RemotingException {
             ExecutorService executor getPreferredExecutorService(message);
             try {
                 //接收到数据放入线程池处理
                 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
             if(message instanceof Request && instanceof RejectedExecutionException){
                     sendFeedback(channel, (Request) message, t);
                     return;
             }
                 throw new ExecutionException(message, channel, getClass() " error when process received event .", t);
            }
        }
      ?
         /**
          *
          * 发生异常回调
          *
          * @author liyong 
          * @date 1:35 PM 2020/12/6 
          * @param channel 
          * @param exception 
          * @exception 
          * @return void 
          **/
         @Override
         public void caught(Channel channel, Throwable exception) throws RemotingException {
             ExecutorService executor getExecutorService();
             try {
                 //发生异常放入线程池处理
                 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
            } catch (Throwable t) {
                 throw new ExecutionException("caught event", channel, getClass() " error when process caught event .", t);
            }
        }
      }

      从上面的代码注释中可以看到 all 这种处理策略就是所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等。

      接下来我们看看线程池的处理策略主要支持4种:

      技术分享图片

      我们以fixed策略进行分析。我们看到org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool核心源码:

      /**
      * 创建固定大小线程池
      *
      * @see java.util.concurrent.Executors#newFixedThreadPool(int)
      */
      public class FixedThreadPool implements ThreadPool {
      ?
         @Override
         public Executor getExecutor(URL url) {
             //线程池名称
             String name url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
             //线程池大小
             int threads url.getParameter(THREADS_KEY, DEFAULT_THREADS);
             //队列大小
             int queues url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
             return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                     //如果队列大小为0使用同步队列
                     queues == new SynchronousQueue<Runnable>() :
                             //否则使用指定大小到阻塞队列
                            (queues new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                     new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
      ?
      }

      上面的源码中使用指定大小的队列创建线程池,如果队列大小为0使用同步队列。

       

      6. 小结

      在本小节中我们主要学习了 Dubbo 中的线程池模型,在 Dubbo 中为我们提供了两种策略调整线程池模型分别是:DispatcherThreadPool。其中Dispatcher提供了5种策略:alldirectmessageexecutionconnectionThreadPool提供了4种策略:fixedcachedlimitedeager。同时我们分别从源码中学习了底层的实现逻辑。

      本节课程的重点如下:

      1. 理解 Dubbo 中线程模型

      2. 了解什么是 Dispatcher模式

      3. 了解什么是 ThreadPool模式

      4. 了解线程模型实现原理

       

      写在最后

      本小节是 Dubbo 入门到精通系列 (《从零开始学习Dubbo》、《Dubbo高阶应用》、《Dubbo源码分析》) 中 《从零开始学习Dubbo》基础课程最后一小节,感谢大家长期的支持。由于本人时间精力有限后面课程的相关专题更新可能比较缓慢请多多包含,再次感谢小伙伴的关注。

 

作者

个人从事金融行业,就职过易极付、思建科技、某网约车平台等重庆一流技术团队,目前就职于某银行负责统一支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。同时也热衷于技术分享创立公众号和博客站点对知识体系进行分享。关注公众号:青年IT男 获取最新技术文章推送!

博客地址: youngitman.tech

微信公众号: 技术分享图片

 

作者:青年IT男
链接:https://juejin.cn/post/6969587297037582349
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

Dubbo 线程池模型

原文:https://www.cnblogs.com/liyongit/p/14847654.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!