Added outbox parameter and dbcontext abstraction

pull/10008/head
Halil İbrahim Kalkan 4 years ago
parent cc386ae47a
commit 616258197e

@ -201,7 +201,7 @@ namespace Volo.Abp.EntityFrameworkCore
foreach (var distributedEvent in changeReport.DistributedEvents)
{
UnitOfWorkManager.Current?.AddOrReplaceDistributedEvent(
new UnitOfWorkEventRecord(distributedEvent.EventData.GetType(), distributedEvent.EventData, distributedEvent.EventOrder)
new UnitOfWorkEventRecord(distributedEvent.EventData.GetType(), distributedEvent.EventData, distributedEvent.EventOrder, useOutbox: true)
);
}
}

@ -0,0 +1,21 @@
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Data;
using Volo.Abp.EntityFrameworkCore.Modeling;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class EventOutboxDbContextModelBuilderExtensions
{
public static void ConfigureEventOutbox([NotNull] this ModelBuilder builder)
{
builder.Entity<OutgoingEventRecord>(b =>
{
b.ToTable(AbpCommonDbProperties.DbTablePrefix + "EventOutbox", AbpCommonDbProperties.DbSchema);
b.ConfigureByConvention();
b.Property(x => x.EventName).IsRequired().HasMaxLength(OutgoingEventRecord.MaxEventNameLength);
b.Property(x => x.EventData).IsRequired();
});
}
}
}

@ -0,0 +1,9 @@
using Microsoft.EntityFrameworkCore;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IHasEventOutbox
{
DbSet<OutgoingEventRecord> OutgoingEventRecords { get; set; }
}
}

@ -0,0 +1,29 @@
using System;
using Volo.Abp.Data;
using Volo.Abp.Domain.Entities;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OutgoingEventRecord : BasicAggregateRoot<Guid>, IHasExtraProperties
{
public static int MaxEventNameLength { get; set; } = 256;
public ExtraPropertyDictionary ExtraProperties { get; protected set; }
public string EventName { get; set; }
public byte[] EventData { get; set; }
protected OutgoingEventRecord()
{
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}
public OutgoingEventRecord(Guid id)
: base(id)
{
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}
}
}

@ -18,7 +18,7 @@ namespace Volo.Abp.EventBus.Kafka
{
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(KafkaDistributedEventBus))]
public class KafkaDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency
public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
protected AbpEventBusOptions AbpEventBusOptions { get; }
protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; }
@ -94,11 +94,6 @@ namespace Volo.Abp.EventBus.Kafka
});
}
public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
@ -169,7 +164,15 @@ namespace Volo.Abp.EventBus.Kafka
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
{
await PublishAsync(eventType, eventData, new Headers {{"messageId", Serializer.Serialize(Guid.NewGuid())}}, null);
await PublishAsync(
eventType,
eventData,
new Headers
{
{ "messageId", Serializer.Serialize(Guid.NewGuid()) }
},
null
);
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
@ -179,7 +182,13 @@ namespace Volo.Abp.EventBus.Kafka
public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments)
{
await PublishAsync(AbpKafkaEventBusOptions.TopicName, eventType, eventData, headers, headersArguments);
await PublishAsync(
AbpKafkaEventBusOptions.TopicName,
eventType,
eventData,
headers,
headersArguments
);
}
public virtual async Task PublishToDeadLetterAsync(Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments)

@ -23,7 +23,7 @@ namespace Volo.Abp.EventBus.RabbitMq
*/
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(RabbitMqDistributedEventBus))]
public class RabbitMqDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency
public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
protected AbpRabbitMqEventBusOptions AbpRabbitMqEventBusOptions { get; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
@ -113,11 +113,6 @@ namespace Volo.Abp.EventBus.RabbitMq
});
}
public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);

@ -16,7 +16,7 @@ namespace Volo.Abp.EventBus.Rebus
{
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(RebusDistributedEventBus))]
public class RebusDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency
public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
protected IBus Rebus { get; }
@ -122,11 +122,6 @@ namespace Volo.Abp.EventBus.Rebus
Rebus.Unsubscribe(eventType);
}
public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
{
await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData);

@ -0,0 +1,69 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Distributed
{
public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus
{
protected DistributedEventBusBase(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IEventErrorHandler errorHandler
) : base(
serviceScopeFactory,
currentTenant,
unitOfWorkManager,
errorHandler)
{
}
public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}
public Task PublishAsync<TEvent>(
TEvent eventData,
bool onUnitOfWorkComplete = true,
bool useOutbox = true)
where TEvent : class
{
return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete, useOutbox);
}
public async Task PublishAsync(
Type eventType,
object eventData,
bool onUnitOfWorkComplete = true,
bool useOutbox = true)
{
if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
{
AddToUnitOfWork(
UnitOfWorkManager.Current,
new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext(), useOutbox)
);
return;
}
if (useOutbox)
{
if (await AddToOutboxAsync(eventType, eventData))
{
return;
}
}
await PublishToEventBusAsync(eventType, eventData);
}
private async Task<bool> AddToOutboxAsync(Type eventType, object eventData)
{
return false;
}
}
}

@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
namespace Volo.Abp.EventBus.Distributed
{
@ -12,5 +13,17 @@ namespace Volo.Abp.EventBus.Distributed
/// <param name="handler">Object to handle the event</param>
IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler)
where TEvent : class;
Task PublishAsync<TEvent>(
TEvent eventData,
bool onUnitOfWorkComplete = true,
bool useOutbox = true)
where TEvent : class;
Task PublishAsync(
Type eventType,
object eventData,
bool onUnitOfWorkComplete = true,
bool useOutbox = true);
}
}

@ -132,5 +132,15 @@ namespace Volo.Abp.EventBus.Distributed
{
return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
}
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class
{
return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
}
public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
{
return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
}
}
}

@ -86,5 +86,15 @@ namespace Volo.Abp.EventBus.Distributed
{
return Task.CompletedTask;
}
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class
{
return Task.CompletedTask;
}
public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
{
return Task.CompletedTask;
}
}
}

@ -92,13 +92,17 @@ namespace Volo.Abp.EventBus
public abstract void UnsubscribeAll(Type eventType);
/// <inheritdoc/>
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
where TEvent : class
{
return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
}
/// <inheritdoc/>
public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
public async Task PublishAsync(
Type eventType,
object eventData,
bool onUnitOfWorkComplete = true)
{
if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
{

@ -25,7 +25,11 @@ namespace Volo.Abp.EventBus
{
foreach (var localEvent in localEvents)
{
await _localEventBus.PublishAsync(localEvent.EventType, localEvent.EventData, onUnitOfWorkComplete: false);
await _localEventBus.PublishAsync(
localEvent.EventType,
localEvent.EventData,
onUnitOfWorkComplete: false
);
}
}
@ -33,7 +37,12 @@ namespace Volo.Abp.EventBus
{
foreach (var distributedEvent in distributedEvents)
{
await _distributedEventBus.PublishAsync(distributedEvent.EventType, distributedEvent.EventData, onUnitOfWorkComplete: false);
await _distributedEventBus.PublishAsync(
distributedEvent.EventType,
distributedEvent.EventData,
onUnitOfWorkComplete: false,
useOutbox: distributedEvent.UseOutbox
);
}
}
}

@ -100,7 +100,8 @@ namespace Volo.Abp.Domain.Repositories.MemoryDb
new UnitOfWorkEventRecord(
distributedEvent.EventData.GetType(),
distributedEvent.EventData,
distributedEvent.EventOrder
distributedEvent.EventOrder,
useOutbox: true
)
);
}

@ -674,7 +674,8 @@ namespace Volo.Abp.Domain.Repositories.MongoDB
new UnitOfWorkEventRecord(
distributedEvent.EventData.GetType(),
distributedEvent.EventData,
distributedEvent.EventOrder
distributedEvent.EventOrder,
useOutbox: true
)
);
}

@ -11,6 +11,8 @@ namespace Volo.Abp.Uow
public long EventOrder { get; }
public bool UseOutbox { get; }
/// <summary>
/// Extra properties can be used if needed.
/// </summary>
@ -19,11 +21,13 @@ namespace Volo.Abp.Uow
public UnitOfWorkEventRecord(
Type eventType,
object eventData,
long eventOrder)
long eventOrder,
bool useOutbox = false)
{
EventType = eventType;
EventData = eventData;
EventOrder = eventOrder;
UseOutbox = useOutbox;
}
}
}
Loading…
Cancel
Save