From 06f5f0f3abb035e59278d76809ec021f4c1607ab Mon Sep 17 00:00:00 2001 From: Halil ibrahim Kalkan Date: Wed, 14 Nov 2018 17:09:29 +0300 Subject: [PATCH] RabbitMqDistributedEventBus implemented #33 --- .../RabbitMq/RabbitMqDistributedEventBus.cs | 214 ++++++++++++++++-- .../Volo/Abp/EventBus/ActionEventHandler.cs | 2 +- .../Volo/Abp/EventBus/EventBusBase.cs | 17 ++ .../EventHandlerFactoryUnregistrar.cs | 2 +- .../EventBus/SingleInstanceHandlerFactory.cs | 2 +- 5 files changed, 219 insertions(+), 18 deletions(-) diff --git a/framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBus.cs index c08f6c6c9f..053b41dc5b 100644 --- a/framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBus.cs @@ -1,58 +1,204 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; +using System.Reflection; using System.Threading.Tasks; using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; using Volo.Abp.RabbitMQ; using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Volo.Abp.Collections; using Volo.Abp.EventBus.Local; +using Volo.Abp.Threading; namespace Volo.Abp.EventBus.Distributed.RabbitMq { - /* TODO: Implement Retry system + /* TODO: How to handle unsubscribe to unbind on RabbitMq (may not be possible for) + * TODO: Implement Retry system * TODO: Should be improved */ [Dependency(ReplaceServices = true)] [ExposeServices(typeof(IDistributedEventBus), typeof(RabbitMqDistributedEventBus))] - public class RabbitMqDistributedEventBus : EventBusBase, IDistributedEventBus, ITransientDependency + public class RabbitMqDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency { - protected RabbitMqDistributedEventBusOptions Options { get; } + protected RabbitMqDistributedEventBusOptions RabbitMqDistributedEventBusOptions { get; } + protected DistributedEventBusOptions DistributedEventBusOptions { get; } protected IConnectionPool ConnectionPool { get; } protected IRabbitMqSerializer Serializer { get; } + protected ConcurrentDictionary> HandlerFactories { get; } //TODO: Accessing to the List may not be thread-safe! + protected ConcurrentDictionary EventTypes { get; } + protected IModel ConsumerChannel; + protected IServiceProvider ServiceProvider { get; } public RabbitMqDistributedEventBus( IOptions options, IConnectionPool connectionPool, - IRabbitMqSerializer serializer) + IRabbitMqSerializer serializer, + IServiceProvider serviceProvider, + DistributedEventBusOptions distributedEventBusOptions) { ConnectionPool = connectionPool; Serializer = serializer; - Options = options.Value; + ServiceProvider = serviceProvider; + DistributedEventBusOptions = distributedEventBusOptions; + RabbitMqDistributedEventBusOptions = options.Value; + + HandlerFactories = new ConcurrentDictionary>(); + EventTypes = new ConcurrentDictionary(); + + ConsumerChannel = CreateConsumerChannel(); + Subscribe(DistributedEventBusOptions.Handlers); + } + + public virtual void Subscribe(ITypeList handlers) + { + foreach (var handler in handlers) + { + var interfaces = handler.GetInterfaces(); + foreach (var @interface in interfaces) + { + if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface)) + { + continue; + } + + var genericArgs = @interface.GetGenericArguments(); + if (genericArgs.Length == 1) + { + Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceProvider, handler)); + } + } + } } - + + private IModel CreateConsumerChannel() + { + //TODO: Support multiple connection (and consumer)? + var channel = ConnectionPool.Get().CreateModel(); + channel.ExchangeDeclare( + exchange: RabbitMqDistributedEventBusOptions.ExchangeName, + type: "direct" + ); + + channel.QueueDeclare( + queue: RabbitMqDistributedEventBusOptions.ClientName, + durable: true, + exclusive: false, + autoDelete: false, + arguments: null + ); + + var consumer = new EventingBasicConsumer(channel); + consumer.Received += async (model, ea) => { await ProcessEventAsync(channel, ea); }; + + channel.BasicConsume( + queue: RabbitMqDistributedEventBusOptions.ClientName, + autoAck: false, + consumer: consumer + ); + + channel.CallbackException += (sender, ea) => + { + ConsumerChannel.Dispose(); + ConsumerChannel = CreateConsumerChannel(); + }; + + return channel; + } + + private async Task ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea) + { + var eventName = ea.RoutingKey; + var eventType = EventTypes.GetOrDefault(eventName); + if (eventType == null) + { + return; + } + + var eventData = Serializer.Deserialize(ea.Body, eventType); + + await TriggerHandlersAsync(eventType, eventData); + + channel.BasicAck(ea.DeliveryTag, multiple: false); + } + public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { - throw new NotImplementedException(); + var handlerFactories = GetOrCreateHandlerFactories(eventType); + + handlerFactories.Add(factory); + + if (handlerFactories.Count == 1) //TODO: Multi-threading! + { + var eventName = EventNameAttribute.GetName(eventType); + + using (var channel = ConnectionPool.Get().CreateModel()) //TODO: Connection name per event! + { + channel.QueueBind( + queue: RabbitMqDistributedEventBusOptions.ClientName, + exchange: RabbitMqDistributedEventBusOptions.ExchangeName, + routingKey: eventName + ); + } + } + + return new EventHandlerFactoryUnregistrar(this, eventType, factory); } + /// public override void Unsubscribe(Func action) { - throw new NotImplementedException(); + Check.NotNull(action, nameof(action)); + + GetOrCreateHandlerFactories(typeof(TEvent)) + .Locking(factories => + { + factories.RemoveAll( + factory => + { + var singleInstanceFactory = factory as SingleInstanceHandlerFactory; + if (singleInstanceFactory == null) + { + return false; + } + + var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler; + if (actionHandler == null) + { + return false; + } + + return actionHandler.Action == action; + }); + }); } + /// public override void Unsubscribe(Type eventType, IEventHandler handler) { - throw new NotImplementedException(); + GetOrCreateHandlerFactories(eventType) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory && + (factory as SingleInstanceHandlerFactory).HandlerInstance == handler + ); + }); } + /// public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) { - throw new NotImplementedException(); + GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory)); } + /// public override void UnsubscribeAll(Type eventType) { - throw new NotImplementedException(); + GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } public override Task PublishAsync(Type eventType, object eventData) @@ -60,16 +206,16 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq var eventName = EventNameAttribute.GetName(eventType); var body = Serializer.Serialize(eventData); - using (var channel = ConnectionPool.Get().CreateModel()) + using (var channel = ConnectionPool.Get().CreateModel()) //TODO: Connection name per event! { //TODO: Other properties like durable? - channel.ExchangeDeclare(Options.ExchangeName, ""); + channel.ExchangeDeclare(RabbitMqDistributedEventBusOptions.ExchangeName, ""); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //persistent channel.BasicPublish( - exchange: Options.ExchangeName, + exchange: RabbitMqDistributedEventBusOptions.ExchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, @@ -80,9 +226,47 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq return Task.CompletedTask; } + private List GetOrCreateHandlerFactories(Type eventType) + { + return HandlerFactories.GetOrAdd( + eventType, + type => + { + var eventName = EventNameAttribute.GetName(type); + EventTypes[eventName] = type; + return new List(); + } + ); + } + protected override IEnumerable GetHandlerFactories(Type eventType) { - throw new NotImplementedException(); + var handlerFactoryList = new List(); + + foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) + { + handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + } + + return handlerFactoryList.ToArray(); + } + + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) + { + //Should trigger same type + if (handlerEventType == targetEventType) + { + return true; + } + + //TODO: Support inheritance? But it does not support on subscription to RabbitMq! + //Should trigger for inherited types + if (handlerEventType.IsAssignableFrom(targetEventType)) + { + return true; + } + + return false; } } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/ActionEventHandler.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/ActionEventHandler.cs index 32b41ec800..995ce39356 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/ActionEventHandler.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/ActionEventHandler.cs @@ -8,7 +8,7 @@ namespace Volo.Abp.EventBus /// This event handler is an adapter to be able to use an action as implementation. /// /// Event type - internal class ActionEventHandler : + public class ActionEventHandler : IEventHandler, ITransientDependency { diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs index c11650c438..4c82dbd8eb 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs @@ -82,6 +82,23 @@ namespace Volo.Abp.EventBus.Local /// public abstract Task PublishAsync(Type eventType, object eventData); + public virtual async Task TriggerHandlersAsync(Type eventType, object eventData) + { + var exceptions = new List(); + + await TriggerHandlersAsync(eventType, eventData, exceptions); + + if (exceptions.Any()) + { + if (exceptions.Count == 1) + { + exceptions[0].ReThrow(); + } + + throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions); + } + } + protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData, List exceptions) { await new SynchronizationContextRemover(); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerFactoryUnregistrar.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerFactoryUnregistrar.cs index 0a07db67f6..dad858773c 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerFactoryUnregistrar.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerFactoryUnregistrar.cs @@ -5,7 +5,7 @@ namespace Volo.Abp.EventBus /// /// Used to unregister a on method. /// - internal class EventHandlerFactoryUnregistrar : IDisposable + public class EventHandlerFactoryUnregistrar : IDisposable { private readonly IEventBus _eventBus; private readonly Type _eventType; diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/SingleInstanceHandlerFactory.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/SingleInstanceHandlerFactory.cs index 3362084639..4fdd9cce69 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/SingleInstanceHandlerFactory.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/SingleInstanceHandlerFactory.cs @@ -7,7 +7,7 @@ namespace Volo.Abp.EventBus /// /// This class always gets the same single instance of handler. /// - internal class SingleInstanceHandlerFactory : IEventHandlerFactory + public class SingleInstanceHandlerFactory : IEventHandlerFactory { /// /// The event handler instance.