diff --git a/framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBus.cs index fa83254bad..72126873e2 100644 --- a/framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBus.cs @@ -1,17 +1,20 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; using Volo.Abp.RabbitMQ; using RabbitMQ.Client; +using Volo.Abp.EventBus.Local; namespace Volo.Abp.EventBus.Distributed.RabbitMq { - /* Inspired from the implementation of "eShopOnContainers" - * TODO: Implement Retry system + /* TODO: Implement Retry system * TODO: Should be improved */ - public class RabbitMqDistributedEventBus : IDistributedEventBus, ITransientDependency + [Dependency(ReplaceServices = true)] + [ExposeServices(typeof(IDistributedEventBus), typeof(RabbitMqDistributedEventBus))] + public class RabbitMqDistributedEventBus : EventBusBase, IDistributedEventBus, ITransientDependency { protected RabbitMqDistributedEventBusOptions Options { get; } protected IChannelPool ChannelPool { get; } @@ -26,79 +29,33 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq Serializer = serializer; Options = options.Value; } - - public IDisposable Subscribe(Func action) where TEvent : class - { - throw new NotImplementedException(); - } - - public IDisposable Subscribe(IEventHandler handler) where TEvent : class - { - throw new NotImplementedException(); - } - - public IDisposable Subscribe() where TEvent : class where THandler : IEventHandler, new() - { - throw new NotImplementedException(); - } - - public IDisposable Subscribe(Type eventType, IEventHandler handler) - { - throw new NotImplementedException(); - } - - public IDisposable Subscribe(IEventHandlerFactory factory) where TEvent : class - { - throw new NotImplementedException(); - } - - public IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) - { - throw new NotImplementedException(); - } - - public void Unsubscribe(Func action) where TEvent : class - { - throw new NotImplementedException(); - } - - public void Unsubscribe(IEventHandler handler) where TEvent : class - { - throw new NotImplementedException(); - } - - public void Unsubscribe(Type eventType, IEventHandler handler) + + public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { throw new NotImplementedException(); } - public void Unsubscribe(IEventHandlerFactory factory) where TEvent : class + public override void Unsubscribe(Func action) { throw new NotImplementedException(); } - public void Unsubscribe(Type eventType, IEventHandlerFactory factory) + public override void Unsubscribe(Type eventType, IEventHandler handler) { throw new NotImplementedException(); } - public void UnsubscribeAll() where TEvent : class + public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) { throw new NotImplementedException(); } - public void UnsubscribeAll(Type eventType) + public override void UnsubscribeAll(Type eventType) { throw new NotImplementedException(); } - public Task PublishAsync(TEvent eventData) - where TEvent : class - { - return PublishAsync(typeof(TEvent), eventData); - } - - public Task PublishAsync(Type eventType, object eventData) + public override Task PublishAsync(Type eventType, object eventData) { var eventName = eventType.FullName; //TODO: Get eventname from an attribute if available var body = Serializer.Serialize(eventData); @@ -112,7 +69,7 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq properties.DeliveryMode = 2; //persistent channelAccessor.Channel.BasicPublish( - exchange: Options.ExchangeName, + exchange: Options.ExchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, @@ -122,5 +79,10 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq return Task.CompletedTask; } + + protected override IEnumerable GetHandlerFactories(Type eventType) + { + throw new NotImplementedException(); + } } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBusOptions.cs b/framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBusOptions.cs index a998861db4..fe277f8599 100644 --- a/framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBusOptions.cs +++ b/framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBusOptions.cs @@ -2,6 +2,8 @@ { public class RabbitMqDistributedEventBusOptions { + public string ClientName { get; set; } + public string ExchangeName { get; set; } } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/EventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/EventBusBase.cs new file mode 100644 index 0000000000..d57f37f6db --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/EventBusBase.cs @@ -0,0 +1,212 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Reflection; + +namespace Volo.Abp.EventBus.Local +{ + public abstract class EventBusBase : IEventBus + { + /// + public virtual IDisposable Subscribe(Func action) where TEvent : class + { + return Subscribe(typeof(TEvent), new ActionEventHandler(action)); + } + + /// + public virtual IDisposable Subscribe(IEventHandler handler) where TEvent : class + { + return Subscribe(typeof(TEvent), handler); + } + + /// + public virtual IDisposable Subscribe() + where TEvent : class + where THandler : IEventHandler, new() + { + return Subscribe(typeof(TEvent), new TransientEventHandlerFactory()); + } + + /// + public virtual IDisposable Subscribe(Type eventType, IEventHandler handler) + { + return Subscribe(eventType, new SingleInstanceHandlerFactory(handler)); + } + + /// + public virtual IDisposable Subscribe(IEventHandlerFactory factory) where TEvent : class + { + return Subscribe(typeof(TEvent), factory); + } + + public abstract IDisposable Subscribe(Type eventType, IEventHandlerFactory factory); + + public abstract void Unsubscribe(Func action) where TEvent : class; + + /// + public virtual void Unsubscribe(IEventHandler handler) where TEvent : class + { + Unsubscribe(typeof(TEvent), handler); + } + + public abstract void Unsubscribe(Type eventType, IEventHandler handler); + + /// + public virtual void Unsubscribe(IEventHandlerFactory factory) where TEvent : class + { + Unsubscribe(typeof(TEvent), factory); + } + + public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory); + + /// + public virtual void UnsubscribeAll() where TEvent : class + { + UnsubscribeAll(typeof(TEvent)); + } + + /// + public abstract void UnsubscribeAll(Type eventType); + + /// + public virtual Task PublishAsync(TEvent eventData) where TEvent : class + { + return PublishAsync(typeof(TEvent), eventData); + } + + /// + public abstract Task PublishAsync(Type eventType, object eventData); + + protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData, List exceptions) + { + await new SynchronizationContextRemover(); + + foreach (var handlerFactories in GetHandlerFactories(eventType)) + { + foreach (var handlerFactory in handlerFactories.EventHandlerFactories) + { + await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData, exceptions); + } + } + + //Implements generic argument inheritance. See IEventDataWithInheritableGenericArgument + if (eventType.GetTypeInfo().IsGenericType && + eventType.GetGenericArguments().Length == 1 && + typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType)) + { + var genericArg = eventType.GetGenericArguments()[0]; + var baseArg = genericArg.GetTypeInfo().BaseType; + if (baseArg != null) + { + var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg); + var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs(); + var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs); + await PublishAsync(baseEventType, baseEventData); + } + } + } + + protected abstract IEnumerable GetHandlerFactories(Type eventType); + + protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData, List exceptions) + { + using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler()) + { + try + { + var handlerType = eventHandlerWrapper.EventHandler.GetType(); + + if (ReflectionHelper.IsAssignableToGenericType( + handlerType, + typeof(IEventHandler<>))) + { + var method = typeof(IEventHandler<>) //TODO: to a static field + .MakeGenericType(eventType) + .GetMethod( + nameof(IEventHandler.HandleEventAsync), + new[] { eventType } + ); + + await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData }); + } + else if (ReflectionHelper.IsAssignableToGenericType( + handlerType, + typeof(IDistributedEventHandler<>))) + { + var method = typeof(IDistributedEventHandler<>) //TODO: to a static field + .MakeGenericType(eventType) + .GetMethod( + nameof(IDistributedEventHandler.HandleEventAsync), + new[] { eventType } + ); + + await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData }); + } + else + { + throw new AbpException("The object instance is not an event handler. Object type: " + handlerType.AssemblyQualifiedName); + } + } + catch (TargetInvocationException ex) + { + exceptions.Add(ex.InnerException); + } + catch (Exception ex) + { + exceptions.Add(ex); + } + } + } + + protected class EventTypeWithEventHandlerFactories + { + public Type EventType { get; } + + public List EventHandlerFactories { get; } + + public EventTypeWithEventHandlerFactories(Type eventType, List eventHandlerFactories) + { + EventType = eventType; + EventHandlerFactories = eventHandlerFactories; + } + } + + // Reference from + // https://blogs.msdn.microsoft.com/benwilli/2017/02/09/an-alternative-to-configureawaitfalse-everywhere/ + protected struct SynchronizationContextRemover : INotifyCompletion + { + public bool IsCompleted + { + get { return SynchronizationContext.Current == null; } + } + + public void OnCompleted(Action continuation) + { + var prevContext = SynchronizationContext.Current; + try + { + SynchronizationContext.SetSynchronizationContext(null); + continuation(); + } + finally + { + SynchronizationContext.SetSynchronizationContext(prevContext); + } + } + + public SynchronizationContextRemover GetAwaiter() + { + return this; + } + + public void GetResult() + { + } + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs index 8f4cc5265c..967bdfb81f 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs @@ -11,8 +11,6 @@ using System.Threading; using System.Threading.Tasks; using Volo.Abp.Collections; using Volo.Abp.DependencyInjection; -using Volo.Abp.EventBus.Distributed; -using Volo.Abp.Reflection; using Volo.Abp.Threading; namespace Volo.Abp.EventBus.Local @@ -21,7 +19,7 @@ namespace Volo.Abp.EventBus.Local /// Implements EventBus as Singleton pattern. /// [ExposeServices(typeof(ILocalEventBus), typeof(LocalEventBus))] - public class LocalEventBus : ILocalEventBus, ISingletonDependency + public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency { /// /// Reference to the Logger. @@ -68,39 +66,7 @@ namespace Volo.Abp.EventBus.Local } /// - public IDisposable Subscribe(Func action) where TEvent : class - { - return Subscribe(typeof(TEvent), new ActionEventHandler(action)); - } - - /// - public IDisposable Subscribe(IEventHandler handler) where TEvent : class - { - return Subscribe(typeof(TEvent), handler); - } - - /// - public IDisposable Subscribe() - where TEvent : class - where THandler : IEventHandler, new() - { - return Subscribe(typeof(TEvent), new TransientEventHandlerFactory()); - } - - /// - public IDisposable Subscribe(Type eventType, IEventHandler handler) - { - return Subscribe(eventType, new SingleInstanceHandlerFactory(handler)); - } - - /// - public IDisposable Subscribe(IEventHandlerFactory factory) where TEvent : class - { - return Subscribe(typeof(TEvent), factory); - } - - /// - public IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) + public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { GetOrCreateHandlerFactories(eventType) .Locking(factories => factories.Add(factory)); @@ -109,7 +75,7 @@ namespace Volo.Abp.EventBus.Local } /// - public void Unsubscribe(Func action) where TEvent : class + public override void Unsubscribe(Func action) { Check.NotNull(action, nameof(action)); @@ -137,13 +103,7 @@ namespace Volo.Abp.EventBus.Local } /// - public void Unsubscribe(IEventHandler handler) where TEvent : class - { - Unsubscribe(typeof(TEvent), handler); - } - - /// - public void Unsubscribe(Type eventType, IEventHandler handler) + public override void Unsubscribe(Type eventType, IEventHandler handler) { GetOrCreateHandlerFactories(eventType) .Locking(factories => @@ -157,67 +117,22 @@ namespace Volo.Abp.EventBus.Local } /// - public void Unsubscribe(IEventHandlerFactory factory) where TEvent : class - { - Unsubscribe(typeof(TEvent), factory); - } - - /// - public void Unsubscribe(Type eventType, IEventHandlerFactory factory) + public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) { GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory)); } /// - public void UnsubscribeAll() where TEvent : class - { - UnsubscribeAll(typeof(TEvent)); - } - - /// - public void UnsubscribeAll(Type eventType) + public override void UnsubscribeAll(Type eventType) { GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } - /// - public Task PublishAsync(TEvent eventData) where TEvent : class + public override async Task PublishAsync(Type eventType, object eventData) { - return PublishAsync(typeof(TEvent), eventData); - } - - /// - public async Task PublishAsync(Type eventType, object eventData) - { - //TODO: Aggregate all exceptions (including the recursive call)! - var exceptions = new List(); - await new SynchronizationContextRemover(); - - foreach (var handlerFactories in GetHandlerFactories(eventType)) - { - foreach (var handlerFactory in handlerFactories.EventHandlerFactories) - { - await TriggerAsyncHandlingException(handlerFactory, handlerFactories.EventType, eventData, exceptions); - } - } - - //Implements generic argument inheritance. See IEventDataWithInheritableGenericArgument - if (eventType.GetTypeInfo().IsGenericType && - eventType.GetGenericArguments().Length == 1 && - typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType)) - { - var genericArg = eventType.GetGenericArguments()[0]; - var baseArg = genericArg.GetTypeInfo().BaseType; - if (baseArg != null) - { - var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg); - var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs(); - var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs); - await PublishAsync(baseEventType, baseEventData); - } - } + await TriggerHandlersAsync(eventType, eventData, exceptions); if (exceptions.Any()) { @@ -230,57 +145,7 @@ namespace Volo.Abp.EventBus.Local } } - private async Task TriggerAsyncHandlingException(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData, List exceptions) - { - using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler()) - { - try - { - var handlerType = eventHandlerWrapper.EventHandler.GetType(); - - if (ReflectionHelper.IsAssignableToGenericType( - handlerType, - typeof(IEventHandler<>))) - { - var method = typeof(IEventHandler<>) //TODO: to a static field - .MakeGenericType(eventType) - .GetMethod( - nameof(IEventHandler.HandleEventAsync), - new[] {eventType} - ); - - await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData }); - } - else if (ReflectionHelper.IsAssignableToGenericType( - handlerType, - typeof(IDistributedEventHandler<>))) - { - var method = typeof(IDistributedEventHandler<>) //TODO: to a static field - .MakeGenericType(eventType) - .GetMethod( - nameof(IDistributedEventHandler.HandleEventAsync), - new[] {eventType} - ); - - await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData }); - } - else - { - throw new AbpException("The object instance is not an event handler. Object type: " + handlerType.AssemblyQualifiedName); - } - } - catch (TargetInvocationException ex) - { - exceptions.Add(ex.InnerException); - } - catch (Exception ex) - { - exceptions.Add(ex); - } - } - } - - public virtual IEnumerable GetHandlerFactories(Type eventType) + protected override IEnumerable GetHandlerFactories(Type eventType) { var handlerFactoryList = new List(); @@ -313,51 +178,5 @@ namespace Volo.Abp.EventBus.Local return false; } - - public class EventTypeWithEventHandlerFactories - { - public Type EventType { get; } - - public List EventHandlerFactories { get; } - - public EventTypeWithEventHandlerFactories(Type eventType, List eventHandlerFactories) - { - EventType = eventType; - EventHandlerFactories = eventHandlerFactories; - } - } - - // Reference from - // https://blogs.msdn.microsoft.com/benwilli/2017/02/09/an-alternative-to-configureawaitfalse-everywhere/ - private struct SynchronizationContextRemover : INotifyCompletion - { - public bool IsCompleted - { - get { return SynchronizationContext.Current == null; } - } - - public void OnCompleted(Action continuation) - { - var prevContext = SynchronizationContext.Current; - try - { - SynchronizationContext.SetSynchronizationContext(null); - continuation(); - } - finally - { - SynchronizationContext.SetSynchronizationContext(prevContext); - } - } - - public SynchronizationContextRemover GetAwaiter() - { - return this; - } - - public void GetResult() - { - } - } } } \ No newline at end of file