diff --git a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs index 3bc51216e6..d8a3d1341a 100644 --- a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs @@ -39,13 +39,15 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen IOptions abpAzureEventBusOptions, IAzureServiceBusSerializer serializer, IAzureServiceBusMessageConsumerFactory messageConsumerFactory, - IPublisherPool publisherPool) + IPublisherPool publisherPool, + IEventHandlerInvoker eventHandlerInvoker) : base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, - clock) + clock, + eventHandlerInvoker) { _options = abpAzureEventBusOptions.Value; _serializer = serializer; diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index ec1901161a..a19b01311c 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -39,14 +39,16 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen IKafkaSerializer serializer, IProducerPool producerPool, IGuidGenerator guidGenerator, - IClock clock) + IClock clock, + IEventHandlerInvoker eventHandlerInvoker) : base( serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, - clock) + clock, + eventHandlerInvoker) { AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value; MessageConsumerFactory = messageConsumerFactory; diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index 1564cfd213..38f8e4eea5 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -44,14 +44,16 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe ICurrentTenant currentTenant, IUnitOfWorkManager unitOfWorkManager, IGuidGenerator guidGenerator, - IClock clock) + IClock clock, + IEventHandlerInvoker eventHandlerInvoker) : base( serviceScopeFactory, currentTenant, unitOfWorkManager, distributedEventBusOptions, guidGenerator, - clock) + clock, + eventHandlerInvoker) { ConnectionPool = connectionPool; Serializer = serializer; diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index 0542a705eb..d637f3fe89 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -38,14 +38,16 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen IOptions abpEventBusRebusOptions, IRebusSerializer serializer, IGuidGenerator guidGenerator, - IClock clock) : + IClock clock, + IEventHandlerInvoker eventHandlerInvoker) : base( serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, - clock) + clock, + eventHandlerInvoker) { Rebus = rebus; Serializer = serializer; diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index dee9d1f7d3..6547c1686c 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -22,11 +22,13 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB IUnitOfWorkManager unitOfWorkManager, IOptions abpDistributedEventBusOptions, IGuidGenerator guidGenerator, - IClock clock + IClock clock, + IEventHandlerInvoker eventHandlerInvoker ) : base( serviceScopeFactory, currentTenant, - unitOfWorkManager) + unitOfWorkManager, + eventHandlerInvoker) { GuidGenerator = guidGenerator; Clock = clock; diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs index 15b597da30..6e48b27e5f 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs @@ -22,14 +22,18 @@ public abstract class EventBusBase : IEventBus protected IUnitOfWorkManager UnitOfWorkManager { get; } + protected IEventHandlerInvoker EventHandlerInvoker { get; } + protected EventBusBase( IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, - IUnitOfWorkManager unitOfWorkManager) + IUnitOfWorkManager unitOfWorkManager, + IEventHandlerInvoker eventHandlerInvoker) { ServiceScopeFactory = serviceScopeFactory; CurrentTenant = currentTenant; UnitOfWorkManager = unitOfWorkManager; + EventHandlerInvoker = eventHandlerInvoker; } /// @@ -210,32 +214,7 @@ public abstract class EventBusBase : IEventBus using (CurrentTenant.Change(GetEventDataTenantId(eventData))) { - if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(ILocalEventHandler<>))) - { - var method = typeof(ILocalEventHandler<>) - .MakeGenericType(eventType) - .GetMethod( - nameof(ILocalEventHandler.HandleEventAsync), - new[] { eventType } - ); - - await ((Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData })); - } - else if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(IDistributedEventHandler<>))) - { - var method = typeof(IDistributedEventHandler<>) - .MakeGenericType(eventType) - .GetMethod( - nameof(IDistributedEventHandler.HandleEventAsync), - new[] { eventType } - ); - - await ((Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData })); - } - else - { - throw new AbpException("The object instance is not an event handler. Object type: " + handlerType.AssemblyQualifiedName); - } + await EventHandlerInvoker.InvokeAsync(eventHandlerWrapper.EventHandler, eventData, eventType); } } catch (TargetInvocationException ex) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerInvoker.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerInvoker.cs new file mode 100644 index 0000000000..5c7517cfe1 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerInvoker.cs @@ -0,0 +1,52 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using Volo.Abp.DependencyInjection; +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EventBus; + +public class EventHandlerInvoker : IEventHandlerInvoker, ISingletonDependency +{ + private readonly ConcurrentDictionary _cache; + + public EventHandlerInvoker() + { + _cache = new ConcurrentDictionary(); + } + + public async Task InvokeAsync(IEventHandler eventHandler, object eventData, Type eventType) + { + var cacheItem = _cache.GetOrAdd($"{eventHandler.GetType().FullName}-{eventType.FullName}", _ => + { + var item = new EventHandlerInvokerCacheItem(); + + if (typeof(ILocalEventHandler<>).MakeGenericType(eventType).IsInstanceOfType(eventHandler)) + { + item.Local = (IEventHandlerMethodExecutor)Activator.CreateInstance(typeof(LocalEventHandlerMethodExecutor<>).MakeGenericType(eventType)); + } + + if (typeof(IDistributedEventHandler<>).MakeGenericType(eventType).IsInstanceOfType(eventHandler)) + { + item.Distributed = (IEventHandlerMethodExecutor)Activator.CreateInstance(typeof(DistributedEventHandlerMethodExecutor<>).MakeGenericType(eventType)); + } + + return item; + }); + + if (cacheItem.Local != null) + { + await cacheItem.Local.ExecutorAsync(eventHandler, eventData); + } + + if (cacheItem.Distributed != null) + { + await cacheItem.Distributed.ExecutorAsync(eventHandler, eventData); + } + + if (cacheItem.Local == null && cacheItem.Distributed == null) + { + throw new AbpException("The object instance is not an event handler. Object type: " + eventHandler.GetType().AssemblyQualifiedName); + } + } +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerInvokerCacheItem.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerInvokerCacheItem.cs new file mode 100644 index 0000000000..0ec617afca --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerInvokerCacheItem.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.EventBus; + +public class EventHandlerInvokerCacheItem +{ + public IEventHandlerMethodExecutor Local { get; set; } + + public IEventHandlerMethodExecutor Distributed { get; set; } +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerMethodExecutor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerMethodExecutor.cs new file mode 100644 index 0000000000..accbe513b2 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerMethodExecutor.cs @@ -0,0 +1,34 @@ +using System; +using System.Threading.Tasks; +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EventBus; + +public delegate Task EventHandlerMethodExecutorAsync(IEventHandler target, object parameter); + +public interface IEventHandlerMethodExecutor +{ + EventHandlerMethodExecutorAsync ExecutorAsync { get; } +} + +public class LocalEventHandlerMethodExecutor : IEventHandlerMethodExecutor + where TEvent : class +{ + public EventHandlerMethodExecutorAsync ExecutorAsync => (target, parameter) => target.As>().HandleEventAsync(parameter.As()); + + public Task ExecuteAsync(IEventHandler target, TEvent parameters) + { + return ExecutorAsync(target, parameters); + } +} + +public class DistributedEventHandlerMethodExecutor : IEventHandlerMethodExecutor + where TEvent : class +{ + public EventHandlerMethodExecutorAsync ExecutorAsync => (target, parameter) => target.As>().HandleEventAsync(parameter.As()); + + public Task ExecuteAsync(IEventHandler target, TEvent parameters) + { + return ExecutorAsync(target, parameters); + } +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IEventHandlerInvoker.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IEventHandlerInvoker.cs new file mode 100644 index 0000000000..71c2bb552c --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IEventHandlerInvoker.cs @@ -0,0 +1,9 @@ +using System; +using System.Threading.Tasks; + +namespace Volo.Abp.EventBus; + +public interface IEventHandlerInvoker +{ + Task InvokeAsync(IEventHandler eventHandler, object eventData, Type eventType); +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs index 800c4a61df..3f7dbbc851 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs @@ -33,8 +33,9 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency IOptions options, IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, - IUnitOfWorkManager unitOfWorkManager) - : base(serviceScopeFactory, currentTenant, unitOfWorkManager) + IUnitOfWorkManager unitOfWorkManager, + IEventHandlerInvoker eventHandlerInvoker) + : base(serviceScopeFactory, currentTenant, unitOfWorkManager, eventHandlerInvoker) { Options = options.Value; Logger = NullLogger.Instance; diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/EventHandlerInvoker_Tests.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/EventHandlerInvoker_Tests.cs new file mode 100644 index 0000000000..294debbb09 --- /dev/null +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/EventHandlerInvoker_Tests.cs @@ -0,0 +1,128 @@ +using System; +using System.Threading.Tasks; +using Shouldly; +using Volo.Abp.Domain.Entities; +using Volo.Abp.Domain.Entities.Events; +using Volo.Abp.Domain.Entities.Events.Distributed; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.EventBus.Local; +using Xunit; + +namespace Volo.Abp.EventBus; + +public class EventHandlerInvoker_Tests : EventBusTestBase +{ + private readonly IEventHandlerInvoker _eventHandlerInvoker; + + public EventHandlerInvoker_Tests() + { + _eventHandlerInvoker = GetRequiredService(); + } + + [Fact] + public async Task Should_Invoke_LocalEventHandler_With_MyEventData() + { + var localHandler = new MyLocalEventHandler(); + var eventData = new MyEventData(); + + await _eventHandlerInvoker.InvokeAsync(localHandler, eventData, eventData.GetType()); + + localHandler.MyEventDataCount.ShouldBe(2); + localHandler.EntityChangedEventDataCount.ShouldBe(0); + localHandler.EntityChangedEventDataCount.ShouldBe(0); + } + + [Fact] + public async Task Should_Invoke_LocalEventHandler_Created_And_Changed_Once() + { + var localHandler = new MyLocalEventHandler(); + var eventData = new EntityCreatedEventData(new MyEntity()); + + await _eventHandlerInvoker.InvokeAsync(localHandler, eventData, eventData.GetType()); + await _eventHandlerInvoker.InvokeAsync(localHandler, eventData, typeof(EntityChangedEventData)); + + localHandler.MyEventDataCount.ShouldBe(0); + localHandler.EntityChangedEventDataCount.ShouldBe(1); + localHandler.EntityChangedEventDataCount.ShouldBe(1); + } + + [Fact] + public async Task Should_Invoke_DistributedEventHandler_With_MyEventData() + { + var localHandler = new MyDistributedEventHandler(); + var eventData = new MyEventData(); + + await _eventHandlerInvoker.InvokeAsync(localHandler, eventData, eventData.GetType()); + + localHandler.MyEventDataCount.ShouldBe(1); + localHandler.EntityCreatedCount.ShouldBe(0); + } + + [Fact] + public async Task Should_Invoke_DistributedEventHandler_With_EntityCreatedEto() + { + var localHandler = new MyDistributedEventHandler(); + var eventData = new EntityCreatedEto(new MyEntity()); + + await _eventHandlerInvoker.InvokeAsync(localHandler, eventData, eventData.GetType()); + + localHandler.MyEventDataCount.ShouldBe(0); + localHandler.EntityCreatedCount.ShouldBe(1); + } + + public class MyEventData + { + } + + public class MyEntity : Entity + { + + } + + public class MyDistributedEventHandler : IDistributedEventHandler, + IDistributedEventHandler> + { + public int MyEventDataCount { get; set; } + public int EntityCreatedCount { get; set; } + + public Task HandleEventAsync(MyEventData eventData) + { + MyEventDataCount++; + return Task.CompletedTask; + } + + public Task HandleEventAsync(EntityCreatedEto eventData) + { + EntityCreatedCount++; + return Task.CompletedTask; + } + } + + public class MyLocalEventHandler : ILocalEventHandler, + IDistributedEventHandler, + IDistributedEventHandler>, + IDistributedEventHandler> + { + public int MyEventDataCount { get; set; } + public int EntityCreatedEventDataCount { get; set; } + public int EntityChangedEventDataCount { get; set; } + + public Task HandleEventAsync(MyEventData eventData) + { + MyEventDataCount++; + return Task.CompletedTask; + } + + public Task HandleEventAsync(EntityCreatedEventData eventData) + { + EntityCreatedEventDataCount++; + return Task.CompletedTask; + } + + public Task HandleEventAsync(EntityChangedEventData eventData) + { + EntityChangedEventDataCount++; + return Task.CompletedTask; + } + } +}