Publish event when uow completed,

Resolve #11100
pull/11125/head
maliming 4 years ago
parent 75084dcc45
commit eae8d6340f
No known key found for this signature in database
GPG Key ID: 096224957E51C89E

@ -6,6 +6,7 @@ using Microsoft.Extensions.Options;
using Volo.Abp.Collections;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Local;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Distributed
{
@ -15,16 +16,19 @@ namespace Volo.Abp.EventBus.Distributed
{
private readonly ILocalEventBus _localEventBus;
protected IUnitOfWorkManager UnitOfWorkManager { get; }
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
public LocalDistributedEventBus(
ILocalEventBus localEventBus,
IUnitOfWorkManager unitOfWorkManager,
IServiceScopeFactory serviceScopeFactory,
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions)
{
_localEventBus = localEventBus;
UnitOfWorkManager = unitOfWorkManager;
ServiceScopeFactory = serviceScopeFactory;
AbpDistributedEventBusOptions = distributedEventBusOptions.Value;
Subscribe(distributedEventBusOptions.Value.Handlers);
@ -122,25 +126,57 @@ namespace Volo.Abp.EventBus.Distributed
_localEventBus.UnsubscribeAll(eventType);
}
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
public async Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
where TEvent : class
{
return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
await PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
}
public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
{
return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
{
AddToUnitOfWork(
UnitOfWorkManager.Current,
new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext())
);
return;
}
await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete: false);
}
public async Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
where TEvent : class
{
await PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete, useOutbox);
}
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class
public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
{
return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
{
AddToUnitOfWork(
UnitOfWorkManager.Current,
new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext(), useOutbox)
);
return;
}
if (useOutbox && UnitOfWorkManager.Current != null)
{
UnitOfWorkManager.Current.OnCompleted(async() => {
await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete: false);
});
return;
}
await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete: false);
}
public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
protected virtual void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{
return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
}
}
}

@ -1,7 +1,9 @@
using System;
using System.Threading.Tasks;
using Shouldly;
using Volo.Abp.Domain.Entities.Events.Distributed;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Uow;
using Xunit;
namespace Volo.Abp.EventBus.Distributed
@ -44,12 +46,12 @@ namespace Volo.Abp.EventBus.Distributed
Assert.Equal(tenantId, MySimpleDistributedSingleInstanceEventHandler.TenantId);
}
[Fact]
public async Task Should_Get_TenantId_From_EventEto_Extra_Property()
{
var tenantId = Guid.NewGuid();
DistributedEventBus.Subscribe<MySimpleEto>(GetRequiredService<MySimpleDistributedSingleInstanceEventHandler>());
await DistributedEventBus.PublishAsync(new MySimpleEto
@ -59,8 +61,72 @@ namespace Volo.Abp.EventBus.Distributed
{"TenantId", tenantId.ToString()}
}
});
Assert.Equal(tenantId, MySimpleDistributedSingleInstanceEventHandler.TenantId);
}
[Fact]
public async Task Event_Should_Published_On_UnitOfWorkComplete()
{
var id = 0;
DistributedEventBus.Subscribe<MySimpleEventData>(data =>
{
id = data.Value;
return Task.CompletedTask;
});
var unitOfWorkManager = GetRequiredService<IUnitOfWorkManager>();
using (var uow = unitOfWorkManager.Begin())
{
await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: true, useOutbox: false);
}
id.ShouldBe(0);
using (var uow = unitOfWorkManager.Begin())
{
await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: true, useOutbox: false);
await uow.CompleteAsync();
}
id.ShouldBe(3);
id = 0;
using (var uow = unitOfWorkManager.Begin())
{
await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: false, useOutbox: false);
}
id.ShouldBe(3);
}
[Fact]
public async Task Event_Should_Published_On_UnitOfWorkComplete_UseOutbox()
{
var id = 0;
DistributedEventBus.Subscribe<MySimpleEventData>(data =>
{
id = data.Value;
return Task.CompletedTask;
});
var unitOfWorkManager = GetRequiredService<IUnitOfWorkManager>();
using (var uow = unitOfWorkManager.Begin())
{
await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: false, useOutbox: true);
}
id.ShouldBe(0);
using (var uow = unitOfWorkManager.Begin())
{
await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: false, useOutbox: true);
await uow.CompleteAsync();
}
id.ShouldBe(3);
id = 0;
using (var uow = unitOfWorkManager.Begin())
{
await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: false, useOutbox: false);
}
id.ShouldBe(3);
}
}
}

Loading…
Cancel
Save