Add events to UOW to be published on complete.

pull/9909/head
Halil İbrahim Kalkan 4 years ago
parent e18313e9eb
commit b966ec7717

@ -220,7 +220,8 @@ namespace Volo.Abp.Domain.Entities.Events
{
await eventPublisher.PublishAsync(
eventType,
eventData
eventData,
onUnitOfWorkComplete: false
);
return;

@ -12,6 +12,7 @@ using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Kafka;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Kafka
{
@ -33,6 +34,7 @@ namespace Volo.Abp.EventBus.Kafka
public KafkaDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IOptions<AbpKafkaEventBusOptions> abpKafkaEventBusOptions,
IKafkaMessageConsumerFactory messageConsumerFactory,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
@ -40,7 +42,7 @@ namespace Volo.Abp.EventBus.Kafka
IProducerPool producerPool,
IEventErrorHandler errorHandler,
IOptions<AbpEventBusOptions> abpEventBusOptions)
: base(serviceScopeFactory, currentTenant, errorHandler)
: base(serviceScopeFactory, currentTenant, unitOfWorkManager, errorHandler)
{
AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value;
AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value;
@ -165,11 +167,16 @@ namespace Volo.Abp.EventBus.Kafka
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
}
public override async Task PublishAsync(Type eventType, object eventData)
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
{
await PublishAsync(eventType, eventData, new Headers {{"messageId", Serializer.Serialize(Guid.NewGuid())}}, null);
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments)
{
await PublishAsync(AbpKafkaEventBusOptions.TopicName, eventType, eventData, headers, headersArguments);

@ -13,6 +13,7 @@ using Volo.Abp.EventBus.Distributed;
using Volo.Abp.MultiTenancy;
using Volo.Abp.RabbitMQ;
using Volo.Abp.Threading;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.RabbitMq
{
@ -44,9 +45,10 @@ namespace Volo.Abp.EventBus.RabbitMq
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,
IRabbitMqMessageConsumerFactory messageConsumerFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IEventErrorHandler errorHandler,
IOptions<AbpEventBusOptions> abpEventBusOptions)
: base(serviceScopeFactory, currentTenant, errorHandler)
: base(serviceScopeFactory, currentTenant, unitOfWorkManager, errorHandler)
{
ConnectionPool = connectionPool;
Serializer = serializer;
@ -189,11 +191,16 @@ namespace Volo.Abp.EventBus.RabbitMq
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
}
public override async Task PublishAsync(Type eventType, object eventData)
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
{
await PublishAsync(eventType, eventData, null);
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
public Task PublishAsync(Type eventType, object eventData, IBasicProperties properties, Dictionary<string, object> headersArguments = null)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);

@ -10,6 +10,7 @@ using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Rebus
{
@ -28,11 +29,12 @@ namespace Volo.Abp.EventBus.Rebus
public RebusDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IBus rebus,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IOptions<AbpRebusEventBusOptions> abpEventBusRebusOptions,
IEventErrorHandler errorHandler) :
base(serviceScopeFactory, currentTenant, errorHandler)
base(serviceScopeFactory, currentTenant, unitOfWorkManager, errorHandler)
{
Rebus = rebus;
AbpRebusEventBusOptions = abpEventBusRebusOptions.Value;
@ -125,11 +127,16 @@ namespace Volo.Abp.EventBus.Rebus
return Subscribe(typeof(TEvent), handler);
}
public override async Task PublishAsync(Type eventType, object eventData)
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
{
await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData);
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(

@ -122,15 +122,15 @@ namespace Volo.Abp.EventBus.Distributed
_localEventBus.UnsubscribeAll(eventType);
}
public Task PublishAsync<TEvent>(TEvent eventData)
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
where TEvent : class
{
return _localEventBus.PublishAsync(eventData);
return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
}
public Task PublishAsync(Type eventType, object eventData)
public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
{
return _localEventBus.PublishAsync(eventType, eventData);
return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
}
}
}

@ -77,12 +77,12 @@ namespace Volo.Abp.EventBus.Distributed
}
public Task PublishAsync<TEvent>(TEvent eventData) where TEvent : class
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class
{
return Task.CompletedTask;
}
public Task PublishAsync(Type eventType, object eventData)
public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
{
return Task.CompletedTask;
}

@ -10,6 +10,7 @@ using Volo.Abp.Collections;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Reflection;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus
{
@ -18,16 +19,20 @@ namespace Volo.Abp.EventBus
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected ICurrentTenant CurrentTenant { get; }
protected IUnitOfWorkManager UnitOfWorkManager { get; }
protected IEventErrorHandler ErrorHandler { get; }
protected EventBusBase(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IEventErrorHandler errorHandler)
{
ServiceScopeFactory = serviceScopeFactory;
CurrentTenant = currentTenant;
UnitOfWorkManager = unitOfWorkManager;
ErrorHandler = errorHandler;
}
@ -87,13 +92,29 @@ namespace Volo.Abp.EventBus
public abstract void UnsubscribeAll(Type eventType);
/// <inheritdoc/>
public virtual Task PublishAsync<TEvent>(TEvent eventData) where TEvent : class
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class
{
return PublishAsync(typeof(TEvent), eventData);
return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
}
/// <inheritdoc/>
public abstract Task PublishAsync(Type eventType, object eventData);
public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
{
if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
{
AddToUnitOfWork(
UnitOfWorkManager.Current,
new UnitOfWorkEventRecord(eventType, eventData)
);
return;
}
await PublishToEventBusAsync(eventType, eventData);
}
protected abstract Task PublishToEventBusAsync(Type eventType, object eventData);
protected abstract void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord);
public virtual async Task TriggerHandlersAsync(Type eventType, object eventData, Action<EventExecutionErrorContext> onErrorAction = null)
{

@ -10,8 +10,9 @@ namespace Volo.Abp.EventBus
/// </summary>
/// <typeparam name="TEvent">Event type</typeparam>
/// <param name="eventData">Related data for the event</param>
/// <param name="onUnitOfWorkComplete">True, to publish the event at the end of the current unit of work, if available</param>
/// <returns>The task to handle async operation</returns>
Task PublishAsync<TEvent>(TEvent eventData)
Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
where TEvent : class;
/// <summary>
@ -19,8 +20,9 @@ namespace Volo.Abp.EventBus
/// </summary>
/// <param name="eventType">Event type</param>
/// <param name="eventData">Related data for the event</param>
/// <param name="onUnitOfWorkComplete">True, to publish the event at the end of the current unit of work, if available</param>
/// <returns>The task to handle async operation</returns>
Task PublishAsync(Type eventType, object eventData);
Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true);
/// <summary>
/// Registers to an event.

@ -12,6 +12,7 @@ using Volo.Abp.DependencyInjection;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
using Volo.Abp.Json;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Local
{
@ -34,8 +35,9 @@ namespace Volo.Abp.EventBus.Local
IOptions<AbpLocalEventBusOptions> options,
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IEventErrorHandler errorHandler)
: base(serviceScopeFactory, currentTenant, errorHandler)
: base(serviceScopeFactory, currentTenant, unitOfWorkManager, errorHandler)
{
Options = options.Value;
Logger = NullLogger<LocalEventBus>.Instance;
@ -120,11 +122,16 @@ namespace Volo.Abp.EventBus.Local
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
}
public override async Task PublishAsync(Type eventType, object eventData)
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
{
await PublishAsync(new LocalEventMessage(Guid.NewGuid(), eventData, eventType));
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{
unitOfWork.AddOrReplaceLocalEvent(eventRecord);
}
public virtual async Task PublishAsync(LocalEventMessage localEventMessage)
{
await TriggerHandlersAsync(localEventMessage.EventType, localEventMessage.EventData, errorContext =>

@ -77,12 +77,12 @@ namespace Volo.Abp.EventBus.Local
}
public Task PublishAsync<TEvent>(TEvent eventData) where TEvent : class
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class
{
return Task.CompletedTask;
}
public Task PublishAsync(Type eventType, object eventData)
public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
{
return Task.CompletedTask;
}

@ -25,7 +25,7 @@ namespace Volo.Abp.EventBus
{
foreach (var localEvent in localEvents)
{
await _localEventBus.PublishAsync(localEvent.EventType, localEvent.EventData);
await _localEventBus.PublishAsync(localEvent.EventType, localEvent.EventData, onUnitOfWorkComplete: false);
}
}
@ -33,7 +33,7 @@ namespace Volo.Abp.EventBus
{
foreach (var distributedEvent in distributedEvents)
{
await _distributedEventBus.PublishAsync(distributedEvent.EventType, distributedEvent.EventData);
await _distributedEventBus.PublishAsync(distributedEvent.EventType, distributedEvent.EventData, onUnitOfWorkComplete: false);
}
}
}

@ -27,6 +27,35 @@ namespace Volo.Abp.TestApp.Testing
DistributedEventBus = GetRequiredService<IDistributedEventBus>();
}
[Fact]
public virtual async Task Should_Publish_Events_In_Order()
{
bool entityCreatedEventHandled = false;
LocalEventBus.Subscribe<EntityCreatedEventData<Person>>(data =>
{
data.Entity.Name.ShouldBe("TestPerson1");
entityCreatedEventHandled = true;
return Task.CompletedTask;
});
LocalEventBus.Subscribe<MyCustomEventData>(data =>
{
data.Value.ShouldBe("42");
entityCreatedEventHandled.ShouldBe(true);
return Task.CompletedTask;
});
await WithUnitOfWorkAsync(new AbpUnitOfWorkOptions{IsTransactional = true}, async () =>
{
await PersonRepository.InsertAsync(
new Person(Guid.NewGuid(), "TestPerson1", 42)
);
await LocalEventBus.PublishAsync(new MyCustomEventData { Value = "42" });
});
}
[Fact]
public virtual async Task Should_Rollback_Uow_If_Event_Handler_Throws_Exception()
{
@ -91,5 +120,10 @@ namespace Volo.Abp.TestApp.Testing
isLocalEventTriggered.ShouldBeTrue();
isDistributedEventTriggered.ShouldBeTrue();
}
private class MyCustomEventData
{
public string Value { get; set; }
}
}
}
Loading…
Cancel
Save