From b966ec77171f2d1939d1106791a65e078fd22791 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Halil=20=C4=B0brahim=20Kalkan?= Date: Thu, 26 Aug 2021 12:09:49 +0300 Subject: [PATCH] Add events to UOW to be published on complete. --- .../Events/EntityChangeEventHelper.cs | 3 +- .../Kafka/KafkaDistributedEventBus.cs | 11 ++++-- .../RabbitMq/RabbitMqDistributedEventBus.cs | 11 ++++-- .../Rebus/RebusDistributedEventBus.cs | 11 ++++-- .../Distributed/LocalDistributedEventBus.cs | 8 ++--- .../Distributed/NullDistributedEventBus.cs | 4 +-- .../Volo/Abp/EventBus/EventBusBase.cs | 27 +++++++++++++-- .../Volo/Abp/EventBus/IEventBus.cs | 6 ++-- .../Volo/Abp/EventBus/Local/LocalEventBus.cs | 11 ++++-- .../Abp/EventBus/Local/NullLocalEventBus.cs | 4 +-- .../Abp/EventBus/UnitOfWorkEventPublisher.cs | 4 +-- .../Abp/TestApp/Testing/DomainEvents_Tests.cs | 34 +++++++++++++++++++ 12 files changed, 110 insertions(+), 24 deletions(-) diff --git a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/EntityChangeEventHelper.cs b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/EntityChangeEventHelper.cs index 4c81f859d0..87b62efe6e 100644 --- a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/EntityChangeEventHelper.cs +++ b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/EntityChangeEventHelper.cs @@ -220,7 +220,8 @@ namespace Volo.Abp.Domain.Entities.Events { await eventPublisher.PublishAsync( eventType, - eventData + eventData, + onUnitOfWorkComplete: false ); return; diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 0b228d676c..0be3a326b6 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -12,6 +12,7 @@ using Volo.Abp.EventBus.Distributed; using Volo.Abp.Kafka; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; +using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Kafka { @@ -33,6 +34,7 @@ namespace Volo.Abp.EventBus.Kafka public KafkaDistributedEventBus( IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, + IUnitOfWorkManager unitOfWorkManager, IOptions abpKafkaEventBusOptions, IKafkaMessageConsumerFactory messageConsumerFactory, IOptions abpDistributedEventBusOptions, @@ -40,7 +42,7 @@ namespace Volo.Abp.EventBus.Kafka IProducerPool producerPool, IEventErrorHandler errorHandler, IOptions abpEventBusOptions) - : base(serviceScopeFactory, currentTenant, errorHandler) + : base(serviceScopeFactory, currentTenant, unitOfWorkManager, errorHandler) { AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value; AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value; @@ -165,11 +167,16 @@ namespace Volo.Abp.EventBus.Kafka GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } - public override async Task PublishAsync(Type eventType, object eventData) + protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { await PublishAsync(eventType, eventData, new Headers {{"messageId", Serializer.Serialize(Guid.NewGuid())}}, null); } + protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) + { + unitOfWork.AddOrReplaceDistributedEvent(eventRecord); + } + public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers, Dictionary headersArguments) { await PublishAsync(AbpKafkaEventBusOptions.TopicName, eventType, eventData, headers, headersArguments); diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index 76322ee43d..f5e3682a7a 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -13,6 +13,7 @@ using Volo.Abp.EventBus.Distributed; using Volo.Abp.MultiTenancy; using Volo.Abp.RabbitMQ; using Volo.Abp.Threading; +using Volo.Abp.Uow; namespace Volo.Abp.EventBus.RabbitMq { @@ -44,9 +45,10 @@ namespace Volo.Abp.EventBus.RabbitMq IOptions distributedEventBusOptions, IRabbitMqMessageConsumerFactory messageConsumerFactory, ICurrentTenant currentTenant, + IUnitOfWorkManager unitOfWorkManager, IEventErrorHandler errorHandler, IOptions abpEventBusOptions) - : base(serviceScopeFactory, currentTenant, errorHandler) + : base(serviceScopeFactory, currentTenant, unitOfWorkManager, errorHandler) { ConnectionPool = connectionPool; Serializer = serializer; @@ -189,11 +191,16 @@ namespace Volo.Abp.EventBus.RabbitMq GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } - public override async Task PublishAsync(Type eventType, object eventData) + protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { await PublishAsync(eventType, eventData, null); } + protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) + { + unitOfWork.AddOrReplaceDistributedEvent(eventRecord); + } + public Task PublishAsync(Type eventType, object eventData, IBasicProperties properties, Dictionary headersArguments = null) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index 97bedaf8fb..0206b5fefc 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -10,6 +10,7 @@ using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; +using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Rebus { @@ -28,11 +29,12 @@ namespace Volo.Abp.EventBus.Rebus public RebusDistributedEventBus( IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, + IUnitOfWorkManager unitOfWorkManager, IBus rebus, IOptions abpDistributedEventBusOptions, IOptions abpEventBusRebusOptions, IEventErrorHandler errorHandler) : - base(serviceScopeFactory, currentTenant, errorHandler) + base(serviceScopeFactory, currentTenant, unitOfWorkManager, errorHandler) { Rebus = rebus; AbpRebusEventBusOptions = abpEventBusRebusOptions.Value; @@ -125,11 +127,16 @@ namespace Volo.Abp.EventBus.Rebus return Subscribe(typeof(TEvent), handler); } - public override async Task PublishAsync(Type eventType, object eventData) + protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData); } + protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) + { + unitOfWork.AddOrReplaceDistributedEvent(eventRecord); + } + private List GetOrCreateHandlerFactories(Type eventType) { return HandlerFactories.GetOrAdd( diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs index 0d0ea6b107..3ee3700a0d 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs @@ -122,15 +122,15 @@ namespace Volo.Abp.EventBus.Distributed _localEventBus.UnsubscribeAll(eventType); } - public Task PublishAsync(TEvent eventData) + public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class { - return _localEventBus.PublishAsync(eventData); + return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete); } - public Task PublishAsync(Type eventType, object eventData) + public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) { - return _localEventBus.PublishAsync(eventType, eventData); + return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete); } } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs index cfec832893..60be7a147c 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs @@ -77,12 +77,12 @@ namespace Volo.Abp.EventBus.Distributed } - public Task PublishAsync(TEvent eventData) where TEvent : class + public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class { return Task.CompletedTask; } - public Task PublishAsync(Type eventType, object eventData) + public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) { return Task.CompletedTask; } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs index 78d226538e..b5318b48ef 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs @@ -10,6 +10,7 @@ using Volo.Abp.Collections; using Volo.Abp.EventBus.Distributed; using Volo.Abp.MultiTenancy; using Volo.Abp.Reflection; +using Volo.Abp.Uow; namespace Volo.Abp.EventBus { @@ -18,16 +19,20 @@ namespace Volo.Abp.EventBus protected IServiceScopeFactory ServiceScopeFactory { get; } protected ICurrentTenant CurrentTenant { get; } + + protected IUnitOfWorkManager UnitOfWorkManager { get; } protected IEventErrorHandler ErrorHandler { get; } protected EventBusBase( IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, + IUnitOfWorkManager unitOfWorkManager, IEventErrorHandler errorHandler) { ServiceScopeFactory = serviceScopeFactory; CurrentTenant = currentTenant; + UnitOfWorkManager = unitOfWorkManager; ErrorHandler = errorHandler; } @@ -87,13 +92,29 @@ namespace Volo.Abp.EventBus public abstract void UnsubscribeAll(Type eventType); /// - public virtual Task PublishAsync(TEvent eventData) where TEvent : class + public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class { - return PublishAsync(typeof(TEvent), eventData); + return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete); } /// - public abstract Task PublishAsync(Type eventType, object eventData); + public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) + { + if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null) + { + AddToUnitOfWork( + UnitOfWorkManager.Current, + new UnitOfWorkEventRecord(eventType, eventData) + ); + return; + } + + await PublishToEventBusAsync(eventType, eventData); + } + + protected abstract Task PublishToEventBusAsync(Type eventType, object eventData); + + protected abstract void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord); public virtual async Task TriggerHandlersAsync(Type eventType, object eventData, Action onErrorAction = null) { diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IEventBus.cs index d28b3ab2a4..6fb424c550 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IEventBus.cs @@ -10,8 +10,9 @@ namespace Volo.Abp.EventBus /// /// Event type /// Related data for the event + /// True, to publish the event at the end of the current unit of work, if available /// The task to handle async operation - Task PublishAsync(TEvent eventData) + Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class; /// @@ -19,8 +20,9 @@ namespace Volo.Abp.EventBus /// /// Event type /// Related data for the event + /// True, to publish the event at the end of the current unit of work, if available /// The task to handle async operation - Task PublishAsync(Type eventType, object eventData); + Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true); /// /// Registers to an event. diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs index 3dc6045250..77bf5b43bd 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs @@ -12,6 +12,7 @@ using Volo.Abp.DependencyInjection; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; using Volo.Abp.Json; +using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Local { @@ -34,8 +35,9 @@ namespace Volo.Abp.EventBus.Local IOptions options, IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, + IUnitOfWorkManager unitOfWorkManager, IEventErrorHandler errorHandler) - : base(serviceScopeFactory, currentTenant, errorHandler) + : base(serviceScopeFactory, currentTenant, unitOfWorkManager, errorHandler) { Options = options.Value; Logger = NullLogger.Instance; @@ -120,11 +122,16 @@ namespace Volo.Abp.EventBus.Local GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } - public override async Task PublishAsync(Type eventType, object eventData) + protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { await PublishAsync(new LocalEventMessage(Guid.NewGuid(), eventData, eventType)); } + protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) + { + unitOfWork.AddOrReplaceLocalEvent(eventRecord); + } + public virtual async Task PublishAsync(LocalEventMessage localEventMessage) { await TriggerHandlersAsync(localEventMessage.EventType, localEventMessage.EventData, errorContext => diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs index 5c74af2195..f3eab30dc1 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs @@ -77,12 +77,12 @@ namespace Volo.Abp.EventBus.Local } - public Task PublishAsync(TEvent eventData) where TEvent : class + public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class { return Task.CompletedTask; } - public Task PublishAsync(Type eventType, object eventData) + public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) { return Task.CompletedTask; } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/UnitOfWorkEventPublisher.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/UnitOfWorkEventPublisher.cs index fd8d3aec2d..65e271afa8 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/UnitOfWorkEventPublisher.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/UnitOfWorkEventPublisher.cs @@ -25,7 +25,7 @@ namespace Volo.Abp.EventBus { foreach (var localEvent in localEvents) { - await _localEventBus.PublishAsync(localEvent.EventType, localEvent.EventData); + await _localEventBus.PublishAsync(localEvent.EventType, localEvent.EventData, onUnitOfWorkComplete: false); } } @@ -33,7 +33,7 @@ namespace Volo.Abp.EventBus { foreach (var distributedEvent in distributedEvents) { - await _distributedEventBus.PublishAsync(distributedEvent.EventType, distributedEvent.EventData); + await _distributedEventBus.PublishAsync(distributedEvent.EventType, distributedEvent.EventData, onUnitOfWorkComplete: false); } } } diff --git a/framework/test/Volo.Abp.TestApp/Volo/Abp/TestApp/Testing/DomainEvents_Tests.cs b/framework/test/Volo.Abp.TestApp/Volo/Abp/TestApp/Testing/DomainEvents_Tests.cs index cdaf60a649..ef789f57f5 100644 --- a/framework/test/Volo.Abp.TestApp/Volo/Abp/TestApp/Testing/DomainEvents_Tests.cs +++ b/framework/test/Volo.Abp.TestApp/Volo/Abp/TestApp/Testing/DomainEvents_Tests.cs @@ -27,6 +27,35 @@ namespace Volo.Abp.TestApp.Testing DistributedEventBus = GetRequiredService(); } + [Fact] + public virtual async Task Should_Publish_Events_In_Order() + { + bool entityCreatedEventHandled = false; + + LocalEventBus.Subscribe>(data => + { + data.Entity.Name.ShouldBe("TestPerson1"); + entityCreatedEventHandled = true; + return Task.CompletedTask; + }); + + LocalEventBus.Subscribe(data => + { + data.Value.ShouldBe("42"); + entityCreatedEventHandled.ShouldBe(true); + return Task.CompletedTask; + }); + + await WithUnitOfWorkAsync(new AbpUnitOfWorkOptions{IsTransactional = true}, async () => + { + await PersonRepository.InsertAsync( + new Person(Guid.NewGuid(), "TestPerson1", 42) + ); + + await LocalEventBus.PublishAsync(new MyCustomEventData { Value = "42" }); + }); + } + [Fact] public virtual async Task Should_Rollback_Uow_If_Event_Handler_Throws_Exception() { @@ -91,5 +120,10 @@ namespace Volo.Abp.TestApp.Testing isLocalEventTriggered.ShouldBeTrue(); isDistributedEventTriggered.ShouldBeTrue(); } + + private class MyCustomEventData + { + public string Value { get; set; } + } } } \ No newline at end of file