RoundRobinRule,轮训策略,默认策略
RandomRule,随机,使用Random对象从服务列表中随机选择一个服务
RetryRule,轮询 + 重试
WeightedResponseTimeRule:优先选择响应时间快,此策略会根据平均响应时间计算所有服务的权重,响应时间越快,服务权重越重、被选中的概率越高。此类有个DynamicServerWeightTask的定时任务,默认情况下每隔30秒会计算一次各个服务实例的权重。刚启动时,如果统计信息不足,则使用RoundRobinRule策略,等统计信息足够,会切换回来
AvailabilityFilteringRule:可用性过滤,会先过滤掉以下服务:由于多次访问故障而断路器处于打开的服务、并发的连接数量超过阈值,然后对剩余的服务列表按照RoundRobinRule策略进行访问
BestAvailableRule:优先选择并发请求最小的,刚启动时吗,如果统计信息不足,则使用RoundRobinRule策略,等统计信息足够,才会切换回来
ZoneAvoidanceRule:可以实现避免可能访问失效的区域(zone)
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
在RestTemplate上面增加注解@LoadBalanced,就开启了负载功能。
简单介绍原理:该注解会把所有的的RestTemplate实例都加上@LoadBalanced,后续通过@Qualifier和@Autowire注解可以将所有的RestTemplate都注入获取到。
然后挨个给这些RestTemplate添加拦截器,在拦截器中实现负载逻辑。
@Configuration
@ConditionalOnClass({IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class})
@RibbonClients
@AutoConfigureAfter(
name = {"org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration"}
)
//引入LoadBalancerAutoConfiguration类
@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
@EnableConfigurationProperties({RibbonEagerLoadProperties.class})
public class RibbonAutoConfiguration {
@Autowired(
required = false
)
private List<RibbonClientSpecification> configurations = new ArrayList();
//饥饿加载模式配置
//正常模式下ribbon client都是到使用的时候才去加载
//饥饿模式在spring容器初始化完毕就加载ribbon client
@Autowired
private RibbonEagerLoadProperties ribbonEagerLoadProperties;
//子容器工厂,一个项目可以配置多个ribbon client,这些client分别加载自己的容器,互相独立
@Bean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
@Bean//引入RibbonLoadBalancerClient
@ConditionalOnMissingBean({LoadBalancerClient.class})
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(this.springClientFactory());
}
@Bean
@ConditionalOnProperty(
value = {"ribbon.eager-load.enabled"},//该属性开启饥饿模式
matchIfMissing = false
)
//饥饿加载模式
public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
return new RibbonApplicationContextInitializer(this.springClientFactory(), this.ribbonEagerLoadProperties.getClients());
}
}
该类引入哪些功能:
1、LoadBalancerAutoConfiguration:给所有RestTemplate添加拦截器
2、SpringClientFactory子容器
在多ribbon客户端的情况下,SpringClientFactory会为每个客户端都加载自己的上下文,实现ribbon客户端的隔离性。
举个例子:
定义两个自定义ribbon客户端
@Configurable
public class RibbonCust1 {
@Bean
public IRule myRule1(){
return new RandomRule();
}
}
@Configurable
public class RibbonCust2 {
@Bean
public IRule myRule2(){
return new RetryRule();
}
}
启动类开启ribbon负载,并且指明哪些服务用哪些ribbon
@SpringBootApplication
@RibbonClient(
name = "demo-goods", configuration = MyRule2.class
)
public class Ads2Application {
public static void main(String[] args) {
SpringApplication.run(Ads2Application.class,args);
}
}
这种情况不同的客户端的上下文环境是独立的,SpringClientFactory就是存储子上下文的。
@Configuration
@ConditionalOnClass({RestTemplate.class})
@ConditionalOnBean({LoadBalancerClient.class})
@EnableConfigurationProperties({LoadBalancerRetryProperties.class})
public class LoadBalancerAutoConfiguration {
//这里配合@LoadBalanced注解的@Qualifier注解,将所有包含@LoadBalanced注解的RestTemplate给注入进来
@LoadBalanced
@Autowired(
required = false
)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Autowired(
required = false
)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
public LoadBalancerAutoConfiguration() {
}
/***
* 此方法的逻辑
* 获取到所有的RestTemplate,把这些RestTemplate加入到自定义定制器中
* @param customizers
* @return
*/
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializer(final List<RestTemplateCustomizer> customizers) {
return new SmartInitializingSingleton() {
public void afterSingletonsInstantiated() {
Iterator var1 = org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration.this.restTemplates.iterator();
while(var1.hasNext()) {
RestTemplate restTemplate = (RestTemplate)var1.next();
Iterator var3 = customizers.iterator();
while(var3.hasNext()) {
RestTemplateCustomizer customizer = (RestTemplateCustomizer)var3.next();
customizer.customize(restTemplate);
}
}
}
};
}
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
@Configuration
@ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})
static class LoadBalancerInterceptorConfig {
LoadBalancerInterceptorConfig() {
}
//初始化拦截器
@Bean
public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
//初始化自定义定制器:该定制器会将放入到restTemplate都加上拦截器
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return new RestTemplateCustomizer() {
public void customize(RestTemplate restTemplate) {
List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
}
};
}
}
}
获取所有restTempalte,遍历挨个添加拦截器LoadBalancerInterceptor。
看一下拦截器LoadBalancerInterceptor是啥样的。
//LoadBalancerInterceptor
private LoadBalancerClient loadBalancer;
public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
//入口
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
内部调用了LoadBalancerClient的execute方法实现负载
//RibbonLoadBalancerClient
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
//获取负载均衡器
ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
//通过负载均衡器选主一个server
Server server = this.getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
} else {
RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
return this.execute(serviceId, ribbonServer, request);
}
}
1、通过getServer方法实现负载选择,一个server
2、通过execute方法发起远程调用
先看一下getServer是如何选择server的
protected Server getServer(ILoadBalancer loadBalancer) {
return loadBalancer == null ? null : loadBalancer.chooseServer("default");
}
这一步又通过loadBalancer来选择server。
loadBalancer就是ribbon实现负载均衡的核心类。先看一下。
先看下loadBalancer的继承关系
//ZoneAwareLoadBalancer
public Server chooseServer(Object key) {
if (ENABLED.get() && this.getLoadBalancerStats().getAvailableZones().size() > 1) {
Server server = null;
try {
LoadBalancerStats lbStats = this.getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (this.triggeringLoad == null) {
this.triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2D);
}
if (this.triggeringBlackoutPercentage == null) {
this.triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999D);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, this.triggeringLoad.get(), this.triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = this.getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception var8) {
logger.error("Error choosing server using zone aware logic for load balancer={}", this.name, var8);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
} else {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
}
使用命令模式将所有对外部服务的调用包装在HystrixCommand或HystrixObservableCommand对象中,并将该对象放在单独的线程中执行。
命令模式
HystrixCommand:依赖的服务返回单个操作结果。封装了两种执行方法:execute()同步和queue()异步。
HystrixObservableCommand:依赖的服务返回多个操作结果。也封装了两种执行方法:observe()和toObserve()。observe()返回的是Hot Observe对象。toObserve()返回的是cold observe对象。
什么是Hot Observe?什么是cold observe?
Observable是RxJava中的概念,RxJava可以理解成发布订阅,其中Observable对象是发布者,每个Observable对应一个或多个订阅者。结合命令模式可以这么理解:RxJava在其中实现了一个Invoker的角色,通过发布订阅,将commond和reciever(命令执行者)连接起来。
Hot Observe:发布消息前无论当前Observable对象是否由订阅者,都进行消息发布。
cold observe:发布消息前如果当前Observable对象没有订阅者,则等待订阅者。
有两种使用方式,一种是通过注解,一种是通过自定义类实现HystrixCommand/HystrixObservableCommand。
首先在启动类通过注解@EnableHystrix开启断路器。
@Component
public class UserService {
@Autowired
private RestTemplate restTemplate;
/***
* 通过注解方式
* @return
*/
@HystrixCommand(fallbackMethod = "fallback")//指定失败回调方法
public User queryUserById() {
return new RestTemplate().getForObject("http://localhost:8081/queryUserById/{1}", User.class,1L);
}
private String fallback() {
return "fallback";
}
}
HystrixCommand:
/***
* 通过自定义类 -- HystrixCommand
* @return
*/
public User queryUserByIdHystrix() throws Exception{
//同步
// User user = new UserCommond(1L,restTemplate).execute();
//异步
Future<User> userFuture = new UserCommond(1L,restTemplate).queue();
return userFuture.get();
}
HystrixObservableCommand:一下发送多条请求
public class UserObserveCommond extends HystrixObservableCommand<User> {
private RestTemplate restTemplate;
private Long[] ids;
public UserObserveCommond(Long[] ids , RestTemplate restTemplate) {
super(HystrixCommandGroupKey.Factory.asKey("usercommand"));// 调用父类构造方法
this.ids = ids;
this.restTemplate = restTemplate;
}
@Override
protected Observable<User> construct() {
return Observable.create(new Observable.OnSubscribe<User>() {
/*
* Observable有三个关键的事件方法,分别为onNext,onCompleted,onError
*/
@Override
public void call(Subscriber<? super User> observer) {
try {// 写业务逻辑,注意try-catch
if (!observer.isUnsubscribed()) {
for (Long id : ids) {
User user = restTemplate.getForObject("http://localhost:8081/queryUserById/{1}", User.class,id);
observer.onNext(user);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
/**
* fallback方法的写法,覆写resumeWithFallback方法
* 当调用出现异常时,会调用该降级方法
*/
@Override
public Observable<User> resumeWithFallback() {
return Observable.create(new Observable.OnSubscribe<User>() {
@Override
public void call(Subscriber<? super User> observer) {
try {
if (!observer.isUnsubscribed()) {
User u = new User();
u.setName("刘先生");
u.setId(1l);
observer.onNext(u);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}
/***
* 通过自定义类 -- HystrixObservableCommand
*
* 一下发送多条请求
*
* @return
*/
public List<User> queryUserByIdHystrixObservable() throws Exception{
List<User> list = new ArrayList<User>();
Long[] ids = {1L,2L,3L};
UserObserveCommond observableCommand = new UserObserveCommond(ids,restTemplate);
Observable<User> observe = observableCommand.observe();
observe.subscribe(new Observer<User>() {
@Override
public void onCompleted() {
System.out.println("聚合完了所有的查询请求!");
System.out.println(list);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onNext(User user) {
list.add(user);
}
});
return list;
}
开启缓存:重写如下方法即可
@Override
protected String getCacheKey() {
return String.valueOf(id);
}
使用命令模式,每一个远程调用都会封装成一个commond对象,然后在通过切面,对该commond对象进行增强。
切面类是HystrixCommandAspect:
原文:https://www.cnblogs.com/yanhui007/p/12640442.html