传统的单体应用,组件间的调用都是使用代码级的方法函数。比如用户登录自动签到,增加积分。我们可以在登录函数调用积分模块的某个函数,为了解耦我们使用以来注入并放弃new Class()这种方式。但是不管哪种方式都是在同一个进程里。
讲一个单体应用改为微服务应用的最大挑战就是改变通信机制,直接把进程内方法调用改成服务间的 RPC 调用会导致在分布式环境中性能低下的、零散的和低效的通信。
异步还是同步的:
• 同步协议。
HTTP 是一种同步协议。客户端发起一个请求然后等待服务端响应。客户端的代码可以独立地实现同步(线程被阻塞)或异步(线程非阻塞,最终的响应通过回调来处理)的执行方式。这里的重点在于协议(HTTP / HTTPS)是同步的,客户端代码只能在收到 HTTP 服务端的响应后才可以继续先前的任务。•
异步协议。
其他协议比如 AMQP(很多操作系统和云环境支持的一种协议)使用了异步消息。客户端代码或消息发送者通常不需要等待响应,只要把消息发送给 RabbitMQ 队列或其他消息代理。
第二个维度是看有单个接收者还是多个接收者:
• 单个接受者。每个请求必须准确地被一个接收者或服务来处理,一个例子就是命令模式。
• 多个接收者。每个请求能被 0 个或多个接收者来处理,这类通信必须是异步的,一个例子是事件驱动架构里的发布/订阅机制。它基于事件总线接口或消息代理在多个微服务间通过事件来传送数据。
事件总线跟观察者(发布-订阅)模式非常相似也可以说是发布-订阅模式的一种实现,跟传统观察者的差别只是一个代码级一个是架构级的。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。
为什么使用事件总线这种异步模式?
异步整合方式增强微服务自治能力。
创建微服务应用的重点在于整合微服务的方式。理想情况下,应该减少内部微服务间的通信,微服务之间的交互越少越好。核心规则是微服务间的交互需要异步。并不意味着一定要使用某种特定的协议(比如异步消息或同步的HTTP),只是表明微服务间通过异步传输数据来通信,但请不要依赖于其他内部微服务作为自己 HTTP请求/响应的一部分。
如果可能,即便只是用于查询,也绝不要依赖微服务间的同步通信(请求/响应)。每个微服务的目标是自治的,对客户端是可用的,即使作为端到端应用一部分的其他服务发生故障或不稳定。如果您认为需要从一个微服务调用其他微服务(比如发起一个 HTTP 请求来查询数据)为客户端应用提供响应结果,那么这样的架构在其他微服务发生故障时就变得不稳定。此外,在微服务之间如果存在 HTTP 依赖,比如串联 HTTP 请求创建很长的请求/响应周期,这样不仅使您的微服务不能自治,而且一旦这个链条上的某个服务有性能问题,整个服务的性能都受到影响。微服务间添加的同步依赖(比如查询请求)越多,客户端应用的总响应时间就会越长。
从上图可知,核心就4个角色:
实现事件总线的关键是:
3. eshop的事件总线
事件源:IntegrationEvent,通过继承扩展这个类,完善事件的描述信息。
事件处理:IIntegrationEventHandler,IDynamicIntegrationEventHandler,两个接口都定义了Handle方法来响应事件。IIntegrationEventHandler接收强类型的IntegrationEvent,IDynamicIntegrationEventHandler接收动态类型dynamic。
Integration Event(集成事件)。因为在微服务中事件的消费不再局限于当前领域内,而是多个微服务可能共享同一个事件,所以这里要和DDD中的领域事件区分开来。集成事件可用于跨多个微服务或外部系统同步领域状态,这是通过在微服务之外发布集成事件来实现的。
事件总线:IEventBus,提供Publish用来发布事件,Subscriber用来订阅事件。
为了方便进行订阅管理,系统提供了额外的一层抽象IEventBusSubscriptionsManager
,其用于维护事件的订阅和注销,以及订阅信息的持久化。其默认的实现InMemoryEventBusSubscriptionsManager
就是使用内存进行存储事件源和事件处理的映射字典。
从类图中看InMemoryEventBusSubscriptionsManager
中定义了一个内部类SubscriptionInfo
,其主要用于表示事件订阅方的订阅类型和事件处理的类型。
我们来近距离看下InMemoryEventBusSubscriptionsManager
的定义:
//定义的事件名称和事件订阅的字典映射(1:N) private readonly Dictionary<string, List<SubscriptionInfo>> _handlers; //保存所有的事件处理类型 private readonly List<Type> _eventTypes; //定义事件移除后事件 public event EventHandler<string> OnEventRemoved; //构造函数初始化 public InMemoryEventBusSubscriptionsManager() { _handlers = new Dictionary<string, List<SubscriptionInfo>>(); _eventTypes = new List<Type>(); } //添加动态类型事件订阅(需要手动指定事件名称) public void AddDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { DoAddSubscription(typeof(TH), eventName, isDynamic: true); } //添加强类型事件订阅(事件名称为事件源类型) public void AddSubscription<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventName = GetEventKey<T>(); DoAddSubscription(typeof(TH), eventName, isDynamic: false); if (!_eventTypes.Contains(typeof(T))) { _eventTypes.Add(typeof(T)); } } //移除动态类型事件订阅 public void RemoveDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName); DoRemoveHandler(eventName, handlerToRemove); } //移除强类型事件订阅 public void RemoveSubscription<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent { var handlerToRemove = FindSubscriptionToRemove<T, TH>(); var eventName = GetEventKey<T>(); DoRemoveHandler(eventName, handlerToRemove); }
添加了这么一层抽象,即符合了单一职责原则,又完成了代码重用。IEventBus
的具体实现通过注入对IEventBusSubscriptionsManager
的依赖,即可完成订阅管理。
你这里可能会好奇,为什么要暴露一个OnEventRemoved
事件?这里先按住不表,留给大家思考。
我们这里不纠结为什么使用RabbitMQ,其实可以替代的方案很多。我们只要知道RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。
public class EventBusRabbitMQ : IEventBus, IDisposable { const string BROKER_NAME = "eshop_event_bus"; private readonly IRabbitMQPersistentConnection _persistentConnection; private readonly ILogger<EventBusRabbitMQ> _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly ILifetimeScope _autofac; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private readonly int _retryCount; private IModel _consumerChannel; private string _queueName; public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _queueName = queueName; _consumerChannel = CreateConsumerChannel(); _autofac = autofac; _retryCount = retryCount; _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; } private void SubsManager_OnEventRemoved(object sender, string eventName) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); if (_subsManager.IsEmpty) { _queueName = string.Empty; _consumerChannel.Close(); } } } //.... }
构造函数主要做了以下几件事:
IRabbitMQPersistentConnection
用来管理链接。IEventBusSubscriptionsManager
,进行订阅管理。OnEventRemoved
事件,取消队列的绑定。(这也就回答了上面遗留的问题)/// <summary> /// 动态类型订阅 /// </summary> /// <typeparam name="TH"></typeparam> /// <param name="eventName"></param> public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { DoInternalSubscription(eventName); _subsManager.AddDynamicSubscription<TH>(eventName); } /// <summary> /// 强类型订阅 /// </summary> /// <typeparam name="T"></typeparam> /// <typeparam name="TH"></typeparam> public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventName = _subsManager.GetEventKey<T>(); DoInternalSubscription(eventName); _subsManager.AddSubscription<T, TH>(); } /// <summary> /// rabbitmq队列的绑定。以eventName为routingKey进行路由 /// </summary> /// <param name="eventName">事件名称</param> private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } }
/// <summary> /// 发布 /// </summary> /// <param name="event">事件</param> public void Publish(IntegrationEvent @event) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } //使用Polly进行重试 var policy = RetryPolicy.Handle<BrokerUnreachableException>() .Or<SocketException>() .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex.ToString()); }); using (var channel = _persistentConnection.CreateModel()) { var eventName = @event.GetType() .Name; //使用direct全匹配、单播形式的路由机制进行消息分发 channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); //消息主体是json字符串 var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); policy.Execute(() => { var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; // 进行消息持久化 channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory:true,//告知服务器当根据指定的routingKey和消息找不到对应的队列时,直接返回消息给生产者。 basicProperties: properties, body: body); }); } }
构造函数中有一句
_consumerChannel = CreateConsumerChannel();
private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } //创建信道Channel var channel = _persistentConnection.CreateModel(); //申明Exchange使用direct模式 channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); //声明队列绑定Channel的消费者实例 channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); //注册Received事件委托处理消息接收事件 consumer.Received += async (model, ea) => { var eventName = ea.RoutingKey; var message = Encoding.UTF8.GetString(ea.Body); //事件处理的逻辑 await ProcessEvent(eventName, message); channel.BasicAck(ea.DeliveryTag,multiple:false); }; //启动监听 channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer); channel.CallbackException += (sender, ea) => { _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); }; return channel; }
/// <summary> /// 事件处理逻辑 /// </summary> /// <param name="eventName">事件名称</param> /// <param name="message">消息</param> /// <returns></returns> private async Task ProcessEvent(string eventName, string message) { if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { //Event Handler实例 var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; if (handler == null) continue; //反序列化为动态类型 dynamic eventData = JObject.Parse(message); //调用Handle方法 await handler.Handle(eventData); } else { //Event Handler实例 var handler = scope.ResolveOptional(subscription.HandlerType); if (handler == null) continue; var eventType = _subsManager.GetEventTypeByName(eventName); //反序列化为强类型 var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); //调用Handle方法 await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } } }
微服务的集成
各个Startup类中注册
①注册IRabbitMQPersistentConnection
服务用于设置RabbitMQ连接
services.AddSingleton<IRabbitMQPersistentConnection>(sp => { var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>(); //... return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); });
②注册单例模式的IEventBusSubscriptionsManager
用于订阅管理
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
③ 注册单例模式的EventBusRabbitMQ
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp => { var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var retryCount = 5; if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) { retryCount = int.Parse(Configuration["EventBusRetryCount"]); } return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); });
④发布事件
若要发布事件,需要根据是否需要事件源(参数传递)来决定是否需要申明相应的集成事件,需要则继承自IntegrationEvent
进行申明。然后在需要发布事件的地方进行实例化,并通过调用IEventBus
的实例的Publish
方法进行发布。
//事件源的声明 public class ProductPriceChangedIntegrationEvent : IntegrationEvent { public int ProductId { get; private set; } public decimal NewPrice { get; private set; } public decimal OldPrice { get; private set; } public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, decimal oldPrice) { ProductId = productId; NewPrice = newPrice; OldPrice = oldPrice; } } //声明事件源 var priceChangedEvent = new ProductPriceChangedIntegrationEvent(1001, 200.00, 169.00) //发布事件 _eventBus.Publish(priceChangedEvent)
⑤ 订阅事件
若要订阅事件,需要根据需要处理的事件类型,申明对应的事件处理类,继承自IIntegrationEventHandler
或IDynamicIntegrationEventHandler
,并注册到IOC容器。然后创建IEventBus
的实例调用Subscribe
方法进行显式订阅。
//定义事件处理 public class ProductPriceChangedIntegrationEventHandler : IIntegrationEventHandler<ProductPriceChangedIntegrationEvent> { public async Task Handle(ProductPriceChangedIntegrationEvent @event) { //do something } } //事件订阅 var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>();
⑥跨服务事件消费
在微服务中跨服务事件消费很普遍,这里有一点需要说明的是如果订阅的强类型事件非当前微服务中订阅的事件,需要复制定义订阅的事件类型。换句话说,比如在A服务发布的TestEvent
事件,B服务订阅该事件,同样需要在B服务复制定义一个TestEvent
。
这也是微服务的一个通病,重复代码。
原文:https://www.cnblogs.com/tianyamoon/p/10206736.html