pom.xml文件添加依赖,如果引入了gateway的依赖,但是不想使用,可以在application.yml文件中设置spring.cloud.gateway.enabled=false
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
Route
网关的基础,由一个唯一ID、一个目标URL(请求的服务端具体地址)、一组断言(判断是否会进行路由)、一组过滤器。
只有断言全部返回true,才会进行路由。
Predicate
断言,使用的是Java8函数,参数类型为ServlerWebExchange,可以匹配任何的http请求,并且可以拿到请求中的请求头、参数等信息。
Filter
使用特定工厂构建的GatewayFilter的实例,可以在此Filter中修改请求和响应信息。
Order: Integer.MAX_VALUE
请求转发过滤器。
从exchange对象中获取gatewayRequestUrl,如果这个url中有forward的scheme,则使用Spring的DispatcherHandler 进行请求转发。
Order: 10100
负载均衡过滤器。(核心)
从exchange对象中获取gatewayRequestUrl、gatewaySchemePrefix,如果url为空或者url的schema不是lb并且gatewaySchemePrefix前缀不是lb,则进入下个过滤器;否则去获取真正的url。
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 拿到请求url
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
// 拿到匹配的路由中url的前缀
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
// 如果url为null或者url的schema不是lb并且请求Route中配置的url不是lb,进入下个过滤器
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// 保留原始的Url,就是将原始的url放入exchange对象的gatewayOriginalRequestUrl参数值中
addOriginalRequestUrl(exchange, url);
// 获取真正的请求url并组装成ServiceInstance对象
final ServiceInstance instance = choose(exchange);
if (instance == null) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}
URI uri = exchange.getRequest().getURI();
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn‘t provide one.
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
URI requestUrl = loadBalancer.reconstructURI(
new DelegatingServiceInstance(instance, overrideScheme), uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
使用Ribbon负载时,choose(exchange)调用逻辑
protected ServiceInstance choose(ServerWebExchange exchange) {
return loadBalancer.choose(
((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost());
}
public ServiceInstance choose(String serviceId, Object hint) {
// getLoadBalancer(serviceId)根据服务id获取服务列表
// getServer(ILoadBalancer loadBalancer, Object hint) 根据负载均衡策略选择服务
Server server = getServer(getLoadBalancer(serviceId), hint);
if (server == null) {
return null;
}
// 实例化RibbonServer对象
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
public <C> C getInstance(String name, Class<C> type) {
C instance = super.getInstance(name, type);
if (instance != null) {
return instance;
}
IClientConfig config = getInstance(name, IClientConfig.class);
return instantiateWithConfig(getContext(name), type, config);
}
// 缓存的内置容器
private Map<String, AnnotationConfigApplicationContext> contexts = new ConcurrentHashMap<>();
// 获取应用上下文的方法
protected AnnotationConfigApplicationContext getContext(String name) {
// 如果缓存中没有该应用,则创建应用
// 双重检查锁,保证线程安全
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
this.contexts.put(name, createContext(name));
}
}
}
// 返回应用实例
return this.contexts.get(name);
}
// 所有带有@RibbonClients 的配置类,key为类名全路径
private Map<String, C> configurations = new ConcurrentHashMap<>();
// 根据注解配置创建内置容器
protected AnnotationConfigApplicationContext createContext(String name) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
/* 加载配置start */
// 如果有该服务需要加载的配置类,将自定义的配置注册到容器中
if (this.configurations.containsKey(name)) {
for (Class<?> configuration : this.configurations.get(name)
.getConfiguration()) {
context.register(configuration);
}
}
// 如果配置的key是以default.开头,则默认注册的容器中
// RibbonNacosAutoConfiguration和RibbonAutoConfiguration都是以defaule.开头的
for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
if (entry.getKey().startsWith("default.")) {
for (Class<?> configuration : entry.getValue().getConfiguration()) {
context.register(configuration);
}
}
}
// 注册默认的配置,即RibbonClientConfiguration
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
// 添加环境变量
context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
this.propertySourceName,
Collections.<String, Object>singletonMap(this.propertyName, name)));
/* 加载配置end */
if (this.parent != null) {
// Uses Environment from parent as well as beans
context.setParent(this.parent);
// jdk11 issue
// https://github.com/spring-cloud/spring-cloud-netflix/issues/3101
context.setClassLoader(this.parent.getClassLoader());
}
// 设置容器名称
context.setDisplayName(generateDisplayName(name));
// 刷新容器
context.refresh();
return context;
}
从容器中获取服务getServer():根据负载均衡算法获取服务,Ribbon默认加载的ZoneAvoidanceRule负载策略
public Server choose(Object key) {
// 当前服务对应的负载均衡器,包含NacosRibbonClientConfiguration注入的Server列表
ILoadBalancer lb = getLoadBalancer();
// 获取服务,默认轮询策略
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
// 轮询方法,参数为可用服务的数量
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}
首先看一个接口ServerListUpdater自己核心方法
有个内部接口UpdayeAction,真正执行服务列表更新的接口。
public interface ServerListUpdater {
/**
* an interface for the updateAction that actually executes a server list update
*/
public interface UpdateAction {
void doUpdate();
}
/**
* start the serverList updater with the given update action
* This call should be idempotent.
*
* @param updateAction
*/
void start(UpdateAction updateAction);
}
再看看ServerListUpdater接口的实现类PollingServerListUpdater,只贴核心方法start()。
参数为UpdateAction 对象,定义了一个线程类,线程里调用updateAction.doUpdate()方法。
启动一个定时任务线程池,默认每隔30S执行一次,也就是说每隔30从注册中心获取一次最新的服务列表。
public class PollingServerListUpdater implements ServerListUpdater {
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
}
Order:-1
在其它过滤器执行完后执行,并将服务端响应的结果写回客户端,如果需要重新处理响应结果,则新的过滤器必须在此过滤器之后执行,也就是order比-1小。
Order:0
gateway性能监控核心过滤器,用于采集请求数据,主要包含
routeId: 路由id
routeUri: 路由的url
outcome:
status: 请求状态
httpStatusCode: 响应状态码
httpMethod: 请求方法
实现GlobalFilter, Ordered,并重写getOrder()和filter()方法
public class CustomGlobalFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("custom global filter");
return chain.filter(exchange);
}
@Override
public int getOrder() {
return -1;
}
}
Route过滤器命名必须以GatewayFilterFactory结尾,并且在application.yml文件中只需配置前缀
public class PreGatewayFilterFactory extends AbstractGatewayFilterFactory<PreGatewayFilterFactory.Config> {
public PreGatewayFilterFactory() {
super(Config.class);
}
@Override
public GatewayFilter apply(Config config) {
// grab configuration from Config object
return (exchange, chain) -> {
//If you want to build a "pre" filter you need to manipulate the
//request before calling chain.filter
ServerHttpRequest.Builder builder = exchange.getRequest().mutate();
//use builder to manipulate the request
return chain.filter(exchange.mutate().request(builder.build()).build());
};
}
public static class Config {
//Put the configuration properties for your filter here
}
}
原文:https://www.cnblogs.com/ns-study/p/14649392.html