From eae8d6340fe2a63cc6ccb78cf3675044d02edd02 Mon Sep 17 00:00:00 2001 From: maliming Date: Thu, 30 Dec 2021 12:14:02 +0800 Subject: [PATCH] Publish event when uow completed, Resolve #11100 --- .../Distributed/LocalDistributedEventBus.cs | 56 ++++++++++++--- .../LocalDistributedEventBus_Test.cs | 72 ++++++++++++++++++- 2 files changed, 115 insertions(+), 13 deletions(-) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs index d5dd961802..3389c2d243 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs @@ -6,6 +6,7 @@ using Microsoft.Extensions.Options; using Volo.Abp.Collections; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Local; +using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Distributed { @@ -15,16 +16,19 @@ namespace Volo.Abp.EventBus.Distributed { private readonly ILocalEventBus _localEventBus; + protected IUnitOfWorkManager UnitOfWorkManager { get; } protected IServiceScopeFactory ServiceScopeFactory { get; } protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } public LocalDistributedEventBus( ILocalEventBus localEventBus, + IUnitOfWorkManager unitOfWorkManager, IServiceScopeFactory serviceScopeFactory, IOptions distributedEventBusOptions) { _localEventBus = localEventBus; + UnitOfWorkManager = unitOfWorkManager; ServiceScopeFactory = serviceScopeFactory; AbpDistributedEventBusOptions = distributedEventBusOptions.Value; Subscribe(distributedEventBusOptions.Value.Handlers); @@ -122,25 +126,57 @@ namespace Volo.Abp.EventBus.Distributed _localEventBus.UnsubscribeAll(eventType); } - public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) + public async Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class { - return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete); + await PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete); } - public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) + public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) { - return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete); + if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null) + { + AddToUnitOfWork( + UnitOfWorkManager.Current, + new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext()) + ); + return; + } + + await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete: false); + } + + public async Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) + where TEvent : class + { + await PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete, useOutbox); } - - public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class + + public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) { - return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete); + if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null) + { + AddToUnitOfWork( + UnitOfWorkManager.Current, + new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext(), useOutbox) + ); + return; + } + + if (useOutbox && UnitOfWorkManager.Current != null) + { + UnitOfWorkManager.Current.OnCompleted(async() => { + await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete: false); + }); + return; + } + + await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete: false); } - public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) + protected virtual void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) { - return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete); + unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } } -} \ No newline at end of file +} diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs index 5bea07fc44..9d768f1054 100644 --- a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs @@ -1,7 +1,9 @@ using System; using System.Threading.Tasks; +using Shouldly; using Volo.Abp.Domain.Entities.Events.Distributed; using Volo.Abp.MultiTenancy; +using Volo.Abp.Uow; using Xunit; namespace Volo.Abp.EventBus.Distributed @@ -44,12 +46,12 @@ namespace Volo.Abp.EventBus.Distributed Assert.Equal(tenantId, MySimpleDistributedSingleInstanceEventHandler.TenantId); } - + [Fact] public async Task Should_Get_TenantId_From_EventEto_Extra_Property() { var tenantId = Guid.NewGuid(); - + DistributedEventBus.Subscribe(GetRequiredService()); await DistributedEventBus.PublishAsync(new MySimpleEto @@ -59,8 +61,72 @@ namespace Volo.Abp.EventBus.Distributed {"TenantId", tenantId.ToString()} } }); - + Assert.Equal(tenantId, MySimpleDistributedSingleInstanceEventHandler.TenantId); } + + [Fact] + public async Task Event_Should_Published_On_UnitOfWorkComplete() + { + var id = 0; + DistributedEventBus.Subscribe(data => + { + id = data.Value; + return Task.CompletedTask; + }); + + var unitOfWorkManager = GetRequiredService(); + using (var uow = unitOfWorkManager.Begin()) + { + await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: true, useOutbox: false); + } + id.ShouldBe(0); + + using (var uow = unitOfWorkManager.Begin()) + { + await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: true, useOutbox: false); + await uow.CompleteAsync(); + } + id.ShouldBe(3); + + id = 0; + using (var uow = unitOfWorkManager.Begin()) + { + await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: false, useOutbox: false); + } + id.ShouldBe(3); + } + + [Fact] + public async Task Event_Should_Published_On_UnitOfWorkComplete_UseOutbox() + { + var id = 0; + DistributedEventBus.Subscribe(data => + { + id = data.Value; + return Task.CompletedTask; + }); + + var unitOfWorkManager = GetRequiredService(); + using (var uow = unitOfWorkManager.Begin()) + { + await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: false, useOutbox: true); + } + id.ShouldBe(0); + + using (var uow = unitOfWorkManager.Begin()) + { + await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: false, useOutbox: true); + await uow.CompleteAsync(); + } + id.ShouldBe(3); + + id = 0; + using (var uow = unitOfWorkManager.Begin()) + { + await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: false, useOutbox: false); + } + id.ShouldBe(3); + } } }