在前面的文章中,分享记录 ExtensionLoader扩展机制、服务的发布过程、Netty 启动监听服务 等内容,相比今天要写的客户端, 服务端的发布、启动还是比较清晰,好理解的。
客户端的ref生成,个人也是梳理好久,相对服务端来说,主要是有几个地方比较麻烦:
1. 是在客户端的启动过程中,涉及到几个的zk path、data变更订阅,多个listener,经常会搞混。
2. 监听器很多都是 java8 里面的lamada表达式,写起来快,理解调试起来就不轻松
3. 客户端,会涉及到 配置 参数重载与覆盖,以 provider -> consumer ->配置中心动态configs 的顺序进行取值覆盖,取这个里面最后的设置为准。
一、Client 调用简图
在ReferenceConfig 下面的Protocol 流程里面,先是有个RegisterProtocol 注册协议,这个也是非常重要,先在注册协议里面做了很多工作,然后在在里面执行Protocol,执行具体的交互协议。
二、 RegistryProtocol 的doRefer
Client启动的主流程在RegistryProtocol 类的doRefer方法里面。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); //1. Directory是客户端启动的一个关键类,负责Invoker生成,配置信息变更监听等操作, directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); registry.register(directory.getRegisteredConsumerUrl()); // 2. zk上面生成路径,注册Consumer } directory.buildRouterChain(subscribeUrl); // 3. 实际构建 RouterChain ,默认为4个 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); //4. directory执行订阅, 启动很多重要逻辑都在此方法里面 Invoker invoker = cluster.join(directory); // 5. 对directory 进行包装,加上mock、failover功能 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); //6. 对invoker、directory信息简单聚合,存入本地map return invoker; }
在客户端里面,RegistoryDirectory 是一个比较重要的类, 封装了节点、配置变更处理,实际调用Invoker的启动等逻辑。
1. 注释1. new 一个RegistoryDirectory实例出来,内部逻辑比较少,主要是url转换为基础数据
2. 注释2. 将conumser的路径,在zk上面注册。
3. 注释3. 调用ExtensionLoader获取RouterFactory的ActiveExtension(),然后调用工厂方法,获取实际的Router,排序生成router链。依次是:MockInvokersSelector,TagRouter,AppRouter,ServiceRouter,功能也很好理解,Router的作用,是获取可以被调用的provider列表。 Tag、App、Service 3个Router都会监控ZK的data数据变化。但这里没有启动监控, 这个后面再讲。
4. 注释4. 这一行代码,后面的动作可就多了,大概包括: a) 注册zk上各种data、path监听 b)实际交互协议初始化, c) 网络 client 启动,连接server
5. 注释5. 对directory进行包装,加上mock、failover功能。 这行的cluster是个adaptive ,默认会进入 MockClusterWrapper 里面的join方法。
三、 RegisterDirectory类
RegisterDirectory 这个类,主要就是zk的监听器,然后执行相应的动作。 当然底层的dobboprotocol 也是在这里启动,netty client 也是由监听里面的动作方法启动的。
RegisterDirectory类,有几个主要方法:1、subscribe(url) 2、notify(List urls) 3、refreshOverrideAndInvoker(List urls) 4.refreshInvoker(List urls) ,调用关系是:1->2 ->3 ->4 这样逐级调用的,下面逐个讲讲。
1. subscribe 方法
接上面的方法: directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
public void subscribe(URL url) { setConsumerUrl(url); // 注释1.设置consumer 的url CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); // 注释2. 将本directory 放入cunsumer config的监听器, serviceConfigurationListener = new ReferenceConfigurationListener(this, url); //注释3. 监听demoservice的config registry.subscribe(url, this); //注释4. 监听config、provider、router 3个path ,并调用notify方法 }
注释1. 将consumer的url保存下来。
注释2,与注释3 ,2个都是将本directory 作为listener,去监听zk上面data的变化。
注释2,监听的是:/dubbo/config/demo-consumer.configurators 的data变化, demo-consumer是 dubbo client 应用的注册名称
注释3 ,监听的是 /dubbo/config/org.apache.dubbo.demo.DemoService.configurators 的data变化。这里demoService是 接口的名称
对于多个服务来说,consumer的应用名 是不变的,接口是变化 的,所以注释2 是将本directory 加入listener数组,多个directory监听同一个path data数据变化, 注释3 是又新建了一个ReferenceConfigureListener,各个Service接口的数据变化,各自direcotry 去监听。 数据变化后,都是调用 RegistryDirectory的 refreshInvoker 方法。 当配置变更后,看看是否重新刷新Invoker
注释4, 这里的registry 是ZookeeperRegistry ,会去监听并获取路径下面的节点。监听的路径是: /dubbo/org.apache.dubbo.demo.DemoService/providers 、/dubbo/org.apache.dubbo.demo.DemoService/configurators、/dubbo/org.apache.dubbo.demo.DemoService/routers ,即监听provider、configurators、routers 3个节点下面的子节点变动。
监听执行的方法是ZookeeperRegistry.this.notify。 在注释4里面,最终会执行RegisterDirectory 里面的notify(List urls) 方法。
2. notify(List urls)
notify方法, 最开始的Invoker启动 与监听到配置变更,都会复用这个方法。会进行协议转换,配置的覆盖,Invoker的启动。
notify 方法的入参urls ,不管是首次手动调用(第一次从注册中心获取后调用),还是后续因为zk的path节点变更,listener调用,urls 源头都是 zk的path 数据。
public synchronized void notify(List<URL> urls) { Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(url -> { if (UrlUtils.isConfigurator(url)) { return CONFIGURATORS_CATEGORY; } else if (UrlUtils.isRoute(url)) { return ROUTERS_CATEGORY; } else if (UrlUtils.isProvider(url)) { return PROVIDERS_CATEGORY; } return ""; })); // 1. 一系列 stream 操作,对url 过滤校验,分组 // 下面几行代码对 configs、routers、providers 的数据分别处理 List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters); //2. routers 节点变更,会重新加入新的 router 路由 List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());//3. providers变更会继续下传 refreshOverrideAndInvoker(providerURLs); //4. 重新刷新数据配置,并重启动Invoker(如有需要) }
RegisterDirectory类的 notify 方法,对变更后的path 节点进行处理, 因为节点path 已经在ZookeeperRegister 的notify方法里面进行了一定的转换处理,所以不是原始的path节点。
入参urls , config、router 的协议为empty(无数据), provider的协议是具体的发布协议,例如范例为dubbo。 empty的后续会被识别,特殊处理。
以上代码:
注释1. 对url列表进行校验、过滤,然后分成 config、router、provider 3个分组map
注释2. 如果router 路由节点有变化,则从新将router 下的数据生成router,加入之前生成的4个 active 路由链里面去。
注释3. 如果provider 节点有变化,则获取url
注释4. 调用下层方法,对配置重新刷新,如有需要重新启动Invoker 。
3. refreshOverrideAndInvoker
这个方法里面就2个方法调用。先是调用overrideDirectoryUrl(), 就是依次用 /dubbo/org.apache.dubbo.demo.DemoService/configurators 下节点path配置、 /dubbo/config/consumer.configurators 的data配置,/dubbo/config/org.apache.dubbo.demo.DemoService.configurators 下data 配置 来调整 原始consumer的url。
private void refreshOverrideAndInvoker(List<URL> urls) { // mock zookeeper://xxx?mock=return null overrideDirectoryUrl(); // 1. 逐个调用注册中心里面的配置,覆盖原来的url,组成最新的url 放入overrideDirectoryUrl 存储
refreshInvoker(urls); //2.根据 provider urls,重新刷新Invoker
}
4. refreshInvoker(urls);
refreshInvoker 代码以及子代码比较长,就不贴了, 主要做3个操作。
1. Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
这行代码,就是根据providerUrls ,在toInvoker方法内部,几乎再次调用了一遍配置覆盖逻辑。最后如果配置有修改,就重新生成Invoker,没有就用旧的Invoker。同时返回新的url <->Invoker 的映射map
2. this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
这行 代码,就是如果服务有分组,则将分组下的 provider,包装成StaticDirectory,组成1个Invoker ,返回。
例如,服务Demo ,分为 Group A,GroupB, GroupA下有1、2、3 ,GroupB下有2、4、5, 那么groupA,GroupB分别包装为一个Invoker,就是2个Invoker。
实际调用的时候,先用GroupA,GroupB,去路由、负载均衡一次,选到Group A, 然后再在Group A下,路由,负载一次,选择1、2、3下面的一个节点提供服务。
3. destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
这行代码,就是看看旧的url 是否在新map里面存在,不存在,就是销毁url对应的Invoker 。 这个好理解,比如 有个provider下线了,就要消减。 另外,同1provider配置变更,新的url与旧的不一样,旧的也会销毁,因为新的 url会生成一个Invoker。
三、 Consumer 端的 配置监听路径 与 执行方法
分别在 AbstractZookeeperClient类的 addChildListener() , addDataListener() 方法,ZookeeperDynamicConfiguration 类的 addListener() 方法 打印 监控的key,group,listener 。
监控path变更:
/dubbo/org.apache.dubbo.demo.DemoService/providers
/dubbo/org.apache.dubbo.demo.DemoService/configurators
/dubbo/org.apache.dubbo.demo.DemoService/routers
以上3个path 下面节点如果有变更,都会调用注册时的lamda表达式 org.apache.dubbo.registry.zookeeper.ZookeeperRegistry$$Lambda$ ,执行方法是ZookeeperRegistry.this.notify,最终会调 RegisterDirectory.notify(urls),将path转换成对应的url,更新路由,url配置覆盖更新,重新生成新配置的Invoker,配置分组,删除旧Invoker等 上面讲的逻辑。 节点path变更,执行了2次配置信息覆盖。
监控data变更: data path = /dubbo/config
data变更,是一个TreeCache Listener,然后注册了子监听,各自的listener 监听自己感兴趣的path。
有以下2类:
1.config data监听:
demo-consumer.configurators ->listener = RegistryDirectory$ConsumerConfigurationListener
org.apache.dubbo.demo.DemoService.configurators ->listener = RegistryDirectory$ReferenceConfigurationListener
2. router data 监听:
demo-consumer.condition-router -->listener = AppRouter
org.apache.dubbo.demo.DemoService.condition-router -->listener = ServiceRouter
demo-provider.tag-router -->listener = TagRouter
configData 变更,最终都是调用 RegisterDirectory.refreshInvoker ,但传入的url是Collections.emptyList . 根据代码推断,就是将zk的data保存到本地,url采用cacheUrls, 调用toInvoker 方法,内部会用最新的zk data,覆盖cacheUrl里面的参数配置,然后重新生成Invoker。 这个是之前跳过代码,没搞懂的逻辑,
router data 变更,会将zk 的最新data取回,放入 本地保存, 后续 服务调用时就会生效。
providers、configurators、routers 路径下节点的变更, 比demo-consumer.configurators 的data config变更 listener会多一些处理逻辑。 但都会执行配置覆盖,invoker重新发布。
四、总结
Consumer端启动,主要是记录了个人之前理解比较困难的地方,好多地方都略过去了。
Dubbo Client Invoker 的启动没有记录,这个更Server端启动几乎是一样的 ,有过Server的经验,理解起来比较容易,Channel、ChannelHandler的包装,几乎一模一样。 netty Handler 的处理都是Dubbo 内部的requestHandler ,不同的是,client 最终会调到received方法,server会调reply方法。
在consumer 比provider启动多的就是 几个配置变更订阅,路由、分组包装。 配置变更,listener 传递的层次比较深,不记录下,开始比较容易搞混。
写的比较粗糙,如有问题,欢迎各位提出来,一起讨论!
Dubbo源码学习笔记 之 Consumer 启动&配置变更监听
原文:https://www.cnblogs.com/keep-code/p/11133121.html