Merge pull request #11232 from abpframework/liangshiwei/eventbus

Enhancement Trigger Event Handler
pull/10783/head^2
maliming 4 years ago committed by GitHub
commit 615a35fd53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -39,13 +39,15 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
IOptions<AbpAzureEventBusOptions> abpAzureEventBusOptions,
IAzureServiceBusSerializer serializer,
IAzureServiceBusMessageConsumerFactory messageConsumerFactory,
IPublisherPool publisherPool)
IPublisherPool publisherPool,
IEventHandlerInvoker eventHandlerInvoker)
: base(serviceScopeFactory,
currentTenant,
unitOfWorkManager,
abpDistributedEventBusOptions,
guidGenerator,
clock)
clock,
eventHandlerInvoker)
{
_options = abpAzureEventBusOptions.Value;
_serializer = serializer;

@ -39,14 +39,16 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
IKafkaSerializer serializer,
IProducerPool producerPool,
IGuidGenerator guidGenerator,
IClock clock)
IClock clock,
IEventHandlerInvoker eventHandlerInvoker)
: base(
serviceScopeFactory,
currentTenant,
unitOfWorkManager,
abpDistributedEventBusOptions,
guidGenerator,
clock)
clock,
eventHandlerInvoker)
{
AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value;
MessageConsumerFactory = messageConsumerFactory;

@ -44,14 +44,16 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IGuidGenerator guidGenerator,
IClock clock)
IClock clock,
IEventHandlerInvoker eventHandlerInvoker)
: base(
serviceScopeFactory,
currentTenant,
unitOfWorkManager,
distributedEventBusOptions,
guidGenerator,
clock)
clock,
eventHandlerInvoker)
{
ConnectionPool = connectionPool;
Serializer = serializer;

@ -38,14 +38,16 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
IOptions<AbpRebusEventBusOptions> abpEventBusRebusOptions,
IRebusSerializer serializer,
IGuidGenerator guidGenerator,
IClock clock) :
IClock clock,
IEventHandlerInvoker eventHandlerInvoker) :
base(
serviceScopeFactory,
currentTenant,
unitOfWorkManager,
abpDistributedEventBusOptions,
guidGenerator,
clock)
clock,
eventHandlerInvoker)
{
Rebus = rebus;
Serializer = serializer;

@ -22,11 +22,13 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
IUnitOfWorkManager unitOfWorkManager,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IGuidGenerator guidGenerator,
IClock clock
IClock clock,
IEventHandlerInvoker eventHandlerInvoker
) : base(
serviceScopeFactory,
currentTenant,
unitOfWorkManager)
unitOfWorkManager,
eventHandlerInvoker)
{
GuidGenerator = guidGenerator;
Clock = clock;

@ -22,14 +22,18 @@ public abstract class EventBusBase : IEventBus
protected IUnitOfWorkManager UnitOfWorkManager { get; }
protected IEventHandlerInvoker EventHandlerInvoker { get; }
protected EventBusBase(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager)
IUnitOfWorkManager unitOfWorkManager,
IEventHandlerInvoker eventHandlerInvoker)
{
ServiceScopeFactory = serviceScopeFactory;
CurrentTenant = currentTenant;
UnitOfWorkManager = unitOfWorkManager;
EventHandlerInvoker = eventHandlerInvoker;
}
/// <inheritdoc/>
@ -210,32 +214,7 @@ public abstract class EventBusBase : IEventBus
using (CurrentTenant.Change(GetEventDataTenantId(eventData)))
{
if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(ILocalEventHandler<>)))
{
var method = typeof(ILocalEventHandler<>)
.MakeGenericType(eventType)
.GetMethod(
nameof(ILocalEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
await ((Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData }));
}
else if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(IDistributedEventHandler<>)))
{
var method = typeof(IDistributedEventHandler<>)
.MakeGenericType(eventType)
.GetMethod(
nameof(IDistributedEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
await ((Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData }));
}
else
{
throw new AbpException("The object instance is not an event handler. Object type: " + handlerType.AssemblyQualifiedName);
}
await EventHandlerInvoker.InvokeAsync(eventHandlerWrapper.EventHandler, eventData, eventType);
}
}
catch (TargetInvocationException ex)

@ -0,0 +1,52 @@
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EventBus;
public class EventHandlerInvoker : IEventHandlerInvoker, ISingletonDependency
{
private readonly ConcurrentDictionary<string, EventHandlerInvokerCacheItem> _cache;
public EventHandlerInvoker()
{
_cache = new ConcurrentDictionary<string, EventHandlerInvokerCacheItem>();
}
public async Task InvokeAsync(IEventHandler eventHandler, object eventData, Type eventType)
{
var cacheItem = _cache.GetOrAdd($"{eventHandler.GetType().FullName}-{eventType.FullName}", _ =>
{
var item = new EventHandlerInvokerCacheItem();
if (typeof(ILocalEventHandler<>).MakeGenericType(eventType).IsInstanceOfType(eventHandler))
{
item.Local = (IEventHandlerMethodExecutor)Activator.CreateInstance(typeof(LocalEventHandlerMethodExecutor<>).MakeGenericType(eventType));
}
if (typeof(IDistributedEventHandler<>).MakeGenericType(eventType).IsInstanceOfType(eventHandler))
{
item.Distributed = (IEventHandlerMethodExecutor)Activator.CreateInstance(typeof(DistributedEventHandlerMethodExecutor<>).MakeGenericType(eventType));
}
return item;
});
if (cacheItem.Local != null)
{
await cacheItem.Local.ExecutorAsync(eventHandler, eventData);
}
if (cacheItem.Distributed != null)
{
await cacheItem.Distributed.ExecutorAsync(eventHandler, eventData);
}
if (cacheItem.Local == null && cacheItem.Distributed == null)
{
throw new AbpException("The object instance is not an event handler. Object type: " + eventHandler.GetType().AssemblyQualifiedName);
}
}
}

@ -0,0 +1,8 @@
namespace Volo.Abp.EventBus;
public class EventHandlerInvokerCacheItem
{
public IEventHandlerMethodExecutor Local { get; set; }
public IEventHandlerMethodExecutor Distributed { get; set; }
}

@ -0,0 +1,34 @@
using System;
using System.Threading.Tasks;
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EventBus;
public delegate Task EventHandlerMethodExecutorAsync(IEventHandler target, object parameter);
public interface IEventHandlerMethodExecutor
{
EventHandlerMethodExecutorAsync ExecutorAsync { get; }
}
public class LocalEventHandlerMethodExecutor<TEvent> : IEventHandlerMethodExecutor
where TEvent : class
{
public EventHandlerMethodExecutorAsync ExecutorAsync => (target, parameter) => target.As<ILocalEventHandler<TEvent>>().HandleEventAsync(parameter.As<TEvent>());
public Task ExecuteAsync(IEventHandler target, TEvent parameters)
{
return ExecutorAsync(target, parameters);
}
}
public class DistributedEventHandlerMethodExecutor<TEvent> : IEventHandlerMethodExecutor
where TEvent : class
{
public EventHandlerMethodExecutorAsync ExecutorAsync => (target, parameter) => target.As<IDistributedEventHandler<TEvent>>().HandleEventAsync(parameter.As<TEvent>());
public Task ExecuteAsync(IEventHandler target, TEvent parameters)
{
return ExecutorAsync(target, parameters);
}
}

@ -0,0 +1,9 @@
using System;
using System.Threading.Tasks;
namespace Volo.Abp.EventBus;
public interface IEventHandlerInvoker
{
Task InvokeAsync(IEventHandler eventHandler, object eventData, Type eventType);
}

@ -33,8 +33,9 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
IOptions<AbpLocalEventBusOptions> options,
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager)
: base(serviceScopeFactory, currentTenant, unitOfWorkManager)
IUnitOfWorkManager unitOfWorkManager,
IEventHandlerInvoker eventHandlerInvoker)
: base(serviceScopeFactory, currentTenant, unitOfWorkManager, eventHandlerInvoker)
{
Options = options.Value;
Logger = NullLogger<LocalEventBus>.Instance;

@ -0,0 +1,128 @@
using System;
using System.Threading.Tasks;
using Shouldly;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Entities.Events;
using Volo.Abp.Domain.Entities.Events.Distributed;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Local;
using Xunit;
namespace Volo.Abp.EventBus;
public class EventHandlerInvoker_Tests : EventBusTestBase
{
private readonly IEventHandlerInvoker _eventHandlerInvoker;
public EventHandlerInvoker_Tests()
{
_eventHandlerInvoker = GetRequiredService<IEventHandlerInvoker>();
}
[Fact]
public async Task Should_Invoke_LocalEventHandler_With_MyEventData()
{
var localHandler = new MyLocalEventHandler();
var eventData = new MyEventData();
await _eventHandlerInvoker.InvokeAsync(localHandler, eventData, eventData.GetType());
localHandler.MyEventDataCount.ShouldBe(2);
localHandler.EntityChangedEventDataCount.ShouldBe(0);
localHandler.EntityChangedEventDataCount.ShouldBe(0);
}
[Fact]
public async Task Should_Invoke_LocalEventHandler_Created_And_Changed_Once()
{
var localHandler = new MyLocalEventHandler();
var eventData = new EntityCreatedEventData<MyEntity>(new MyEntity());
await _eventHandlerInvoker.InvokeAsync(localHandler, eventData, eventData.GetType());
await _eventHandlerInvoker.InvokeAsync(localHandler, eventData, typeof(EntityChangedEventData<MyEntity>));
localHandler.MyEventDataCount.ShouldBe(0);
localHandler.EntityChangedEventDataCount.ShouldBe(1);
localHandler.EntityChangedEventDataCount.ShouldBe(1);
}
[Fact]
public async Task Should_Invoke_DistributedEventHandler_With_MyEventData()
{
var localHandler = new MyDistributedEventHandler();
var eventData = new MyEventData();
await _eventHandlerInvoker.InvokeAsync(localHandler, eventData, eventData.GetType());
localHandler.MyEventDataCount.ShouldBe(1);
localHandler.EntityCreatedCount.ShouldBe(0);
}
[Fact]
public async Task Should_Invoke_DistributedEventHandler_With_EntityCreatedEto()
{
var localHandler = new MyDistributedEventHandler();
var eventData = new EntityCreatedEto<MyEntity>(new MyEntity());
await _eventHandlerInvoker.InvokeAsync(localHandler, eventData, eventData.GetType());
localHandler.MyEventDataCount.ShouldBe(0);
localHandler.EntityCreatedCount.ShouldBe(1);
}
public class MyEventData
{
}
public class MyEntity : Entity<Guid>
{
}
public class MyDistributedEventHandler : IDistributedEventHandler<MyEventData>,
IDistributedEventHandler<EntityCreatedEto<MyEntity>>
{
public int MyEventDataCount { get; set; }
public int EntityCreatedCount { get; set; }
public Task HandleEventAsync(MyEventData eventData)
{
MyEventDataCount++;
return Task.CompletedTask;
}
public Task HandleEventAsync(EntityCreatedEto<MyEntity> eventData)
{
EntityCreatedCount++;
return Task.CompletedTask;
}
}
public class MyLocalEventHandler : ILocalEventHandler<MyEventData>,
IDistributedEventHandler<MyEventData>,
IDistributedEventHandler<EntityCreatedEventData<MyEntity>>,
IDistributedEventHandler<EntityChangedEventData<MyEntity>>
{
public int MyEventDataCount { get; set; }
public int EntityCreatedEventDataCount { get; set; }
public int EntityChangedEventDataCount { get; set; }
public Task HandleEventAsync(MyEventData eventData)
{
MyEventDataCount++;
return Task.CompletedTask;
}
public Task HandleEventAsync(EntityCreatedEventData<MyEntity> eventData)
{
EntityCreatedEventDataCount++;
return Task.CompletedTask;
}
public Task HandleEventAsync(EntityChangedEventData<MyEntity> eventData)
{
EntityChangedEventDataCount++;
return Task.CompletedTask;
}
}
}
Loading…
Cancel
Save