From 616258197e95019ed76a2b37050b430bcf6e7e53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Halil=20=C4=B0brahim=20Kalkan?= Date: Tue, 7 Sep 2021 17:05:50 +0300 Subject: [PATCH] Added outbox parameter and dbcontext abstraction --- .../Abp/EntityFrameworkCore/AbpDbContext.cs | 2 +- ...ntOutboxDbContextModelBuilderExtensions.cs | 21 ++++++ .../DistributedEvents/IHasEventOutbox.cs | 9 +++ .../DistributedEvents/OutgoingEventRecord.cs | 29 ++++++++ .../Kafka/KafkaDistributedEventBus.cs | 25 ++++--- .../RabbitMq/RabbitMqDistributedEventBus.cs | 7 +- .../Rebus/RebusDistributedEventBus.cs | 7 +- .../Distributed/DistributedEventBusBase.cs | 69 +++++++++++++++++++ .../Distributed/IDistributedEventBus.cs | 13 ++++ .../Distributed/LocalDistributedEventBus.cs | 10 +++ .../Distributed/NullDistributedEventBus.cs | 10 +++ .../Volo/Abp/EventBus/EventBusBase.cs | 8 ++- .../Abp/EventBus/UnitOfWorkEventPublisher.cs | 13 +++- .../MemoryDb/MemoryDbRepository.cs | 3 +- .../Repositories/MongoDB/MongoDbRepository.cs | 3 +- .../Volo/Abp/Uow/UnitOfWorkEventRecord.cs | 6 +- 16 files changed, 207 insertions(+), 28 deletions(-) create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventOutboxDbContextModelBuilderExtensions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IHasEventOutbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs create mode 100644 framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpDbContext.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpDbContext.cs index b86f9b83b1..1fd8273e67 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpDbContext.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpDbContext.cs @@ -201,7 +201,7 @@ namespace Volo.Abp.EntityFrameworkCore foreach (var distributedEvent in changeReport.DistributedEvents) { UnitOfWorkManager.Current?.AddOrReplaceDistributedEvent( - new UnitOfWorkEventRecord(distributedEvent.EventData.GetType(), distributedEvent.EventData, distributedEvent.EventOrder) + new UnitOfWorkEventRecord(distributedEvent.EventData.GetType(), distributedEvent.EventData, distributedEvent.EventOrder, useOutbox: true) ); } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventOutboxDbContextModelBuilderExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventOutboxDbContextModelBuilderExtensions.cs new file mode 100644 index 0000000000..5443968343 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventOutboxDbContextModelBuilderExtensions.cs @@ -0,0 +1,21 @@ +using JetBrains.Annotations; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Data; +using Volo.Abp.EntityFrameworkCore.Modeling; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class EventOutboxDbContextModelBuilderExtensions + { + public static void ConfigureEventOutbox([NotNull] this ModelBuilder builder) + { + builder.Entity(b => + { + b.ToTable(AbpCommonDbProperties.DbTablePrefix + "EventOutbox", AbpCommonDbProperties.DbSchema); + b.ConfigureByConvention(); + b.Property(x => x.EventName).IsRequired().HasMaxLength(OutgoingEventRecord.MaxEventNameLength); + b.Property(x => x.EventData).IsRequired(); + }); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IHasEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IHasEventOutbox.cs new file mode 100644 index 0000000000..34c712c1a1 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IHasEventOutbox.cs @@ -0,0 +1,9 @@ +using Microsoft.EntityFrameworkCore; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IHasEventOutbox + { + DbSet OutgoingEventRecords { get; set; } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs new file mode 100644 index 0000000000..9ebee6894a --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs @@ -0,0 +1,29 @@ +using System; +using Volo.Abp.Data; +using Volo.Abp.Domain.Entities; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OutgoingEventRecord : BasicAggregateRoot, IHasExtraProperties + { + public static int MaxEventNameLength { get; set; } = 256; + + public ExtraPropertyDictionary ExtraProperties { get; protected set; } + + public string EventName { get; set; } + public byte[] EventData { get; set; } + + protected OutgoingEventRecord() + { + ExtraProperties = new ExtraPropertyDictionary(); + this.SetDefaultsForExtraProperties(); + } + + public OutgoingEventRecord(Guid id) + : base(id) + { + ExtraProperties = new ExtraPropertyDictionary(); + this.SetDefaultsForExtraProperties(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 0be3a326b6..c3050adb3b 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -18,7 +18,7 @@ namespace Volo.Abp.EventBus.Kafka { [Dependency(ReplaceServices = true)] [ExposeServices(typeof(IDistributedEventBus), typeof(KafkaDistributedEventBus))] - public class KafkaDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency + public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDependency { protected AbpEventBusOptions AbpEventBusOptions { get; } protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; } @@ -94,11 +94,6 @@ namespace Volo.Abp.EventBus.Kafka }); } - public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class - { - return Subscribe(typeof(TEvent), handler); - } - public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); @@ -169,7 +164,15 @@ namespace Volo.Abp.EventBus.Kafka protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { - await PublishAsync(eventType, eventData, new Headers {{"messageId", Serializer.Serialize(Guid.NewGuid())}}, null); + await PublishAsync( + eventType, + eventData, + new Headers + { + { "messageId", Serializer.Serialize(Guid.NewGuid()) } + }, + null + ); } protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) @@ -179,7 +182,13 @@ namespace Volo.Abp.EventBus.Kafka public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers, Dictionary headersArguments) { - await PublishAsync(AbpKafkaEventBusOptions.TopicName, eventType, eventData, headers, headersArguments); + await PublishAsync( + AbpKafkaEventBusOptions.TopicName, + eventType, + eventData, + headers, + headersArguments + ); } public virtual async Task PublishToDeadLetterAsync(Type eventType, object eventData, Headers headers, Dictionary headersArguments) diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index f5e3682a7a..9891b5cf12 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -23,7 +23,7 @@ namespace Volo.Abp.EventBus.RabbitMq */ [Dependency(ReplaceServices = true)] [ExposeServices(typeof(IDistributedEventBus), typeof(RabbitMqDistributedEventBus))] - public class RabbitMqDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency + public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDependency { protected AbpRabbitMqEventBusOptions AbpRabbitMqEventBusOptions { get; } protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } @@ -113,11 +113,6 @@ namespace Volo.Abp.EventBus.RabbitMq }); } - public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class - { - return Subscribe(typeof(TEvent), handler); - } - public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index 0206b5fefc..3f07cf7150 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -16,7 +16,7 @@ namespace Volo.Abp.EventBus.Rebus { [Dependency(ReplaceServices = true)] [ExposeServices(typeof(IDistributedEventBus), typeof(RebusDistributedEventBus))] - public class RebusDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency + public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDependency { protected IBus Rebus { get; } @@ -122,11 +122,6 @@ namespace Volo.Abp.EventBus.Rebus Rebus.Unsubscribe(eventType); } - public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class - { - return Subscribe(typeof(TEvent), handler); - } - protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs new file mode 100644 index 0000000000..94176d9f50 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -0,0 +1,69 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.MultiTenancy; +using Volo.Abp.Uow; + +namespace Volo.Abp.EventBus.Distributed +{ + public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus + { + protected DistributedEventBusBase( + IServiceScopeFactory serviceScopeFactory, + ICurrentTenant currentTenant, + IUnitOfWorkManager unitOfWorkManager, + IEventErrorHandler errorHandler + ) : base( + serviceScopeFactory, + currentTenant, + unitOfWorkManager, + errorHandler) + { + } + + public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class + { + return Subscribe(typeof(TEvent), handler); + } + + public Task PublishAsync( + TEvent eventData, + bool onUnitOfWorkComplete = true, + bool useOutbox = true) + where TEvent : class + { + return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete, useOutbox); + } + + public async Task PublishAsync( + Type eventType, + object eventData, + bool onUnitOfWorkComplete = true, + bool useOutbox = true) + { + if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null) + { + AddToUnitOfWork( + UnitOfWorkManager.Current, + new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext(), useOutbox) + ); + return; + } + + if (useOutbox) + { + if (await AddToOutboxAsync(eventType, eventData)) + { + return; + } + } + + await PublishToEventBusAsync(eventType, eventData); + } + + private async Task AddToOutboxAsync(Type eventType, object eventData) + { + return false; + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IDistributedEventBus.cs index e406f7069b..80af122242 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IDistributedEventBus.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; namespace Volo.Abp.EventBus.Distributed { @@ -12,5 +13,17 @@ namespace Volo.Abp.EventBus.Distributed /// Object to handle the event IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class; + + Task PublishAsync( + TEvent eventData, + bool onUnitOfWorkComplete = true, + bool useOutbox = true) + where TEvent : class; + + Task PublishAsync( + Type eventType, + object eventData, + bool onUnitOfWorkComplete = true, + bool useOutbox = true); } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs index 3ee3700a0d..d5dd961802 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs @@ -132,5 +132,15 @@ namespace Volo.Abp.EventBus.Distributed { return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete); } + + public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class + { + return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete); + } + + public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) + { + return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete); + } } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs index 60be7a147c..97004fbeac 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs @@ -86,5 +86,15 @@ namespace Volo.Abp.EventBus.Distributed { return Task.CompletedTask; } + + public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class + { + return Task.CompletedTask; + } + + public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) + { + return Task.CompletedTask; + } } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs index f29a169cb5..a739bf814d 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs @@ -92,13 +92,17 @@ namespace Volo.Abp.EventBus public abstract void UnsubscribeAll(Type eventType); /// - public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class + public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) + where TEvent : class { return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete); } /// - public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) + public async Task PublishAsync( + Type eventType, + object eventData, + bool onUnitOfWorkComplete = true) { if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null) { diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/UnitOfWorkEventPublisher.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/UnitOfWorkEventPublisher.cs index 65e271afa8..ea6a66a699 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/UnitOfWorkEventPublisher.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/UnitOfWorkEventPublisher.cs @@ -25,7 +25,11 @@ namespace Volo.Abp.EventBus { foreach (var localEvent in localEvents) { - await _localEventBus.PublishAsync(localEvent.EventType, localEvent.EventData, onUnitOfWorkComplete: false); + await _localEventBus.PublishAsync( + localEvent.EventType, + localEvent.EventData, + onUnitOfWorkComplete: false + ); } } @@ -33,7 +37,12 @@ namespace Volo.Abp.EventBus { foreach (var distributedEvent in distributedEvents) { - await _distributedEventBus.PublishAsync(distributedEvent.EventType, distributedEvent.EventData, onUnitOfWorkComplete: false); + await _distributedEventBus.PublishAsync( + distributedEvent.EventType, + distributedEvent.EventData, + onUnitOfWorkComplete: false, + useOutbox: distributedEvent.UseOutbox + ); } } } diff --git a/framework/src/Volo.Abp.MemoryDb/Volo/Abp/Domain/Repositories/MemoryDb/MemoryDbRepository.cs b/framework/src/Volo.Abp.MemoryDb/Volo/Abp/Domain/Repositories/MemoryDb/MemoryDbRepository.cs index cf3479de67..3193e15ad1 100644 --- a/framework/src/Volo.Abp.MemoryDb/Volo/Abp/Domain/Repositories/MemoryDb/MemoryDbRepository.cs +++ b/framework/src/Volo.Abp.MemoryDb/Volo/Abp/Domain/Repositories/MemoryDb/MemoryDbRepository.cs @@ -100,7 +100,8 @@ namespace Volo.Abp.Domain.Repositories.MemoryDb new UnitOfWorkEventRecord( distributedEvent.EventData.GetType(), distributedEvent.EventData, - distributedEvent.EventOrder + distributedEvent.EventOrder, + useOutbox: true ) ); } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/Domain/Repositories/MongoDB/MongoDbRepository.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/Domain/Repositories/MongoDB/MongoDbRepository.cs index a40117e0ff..db8448db23 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/Domain/Repositories/MongoDB/MongoDbRepository.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/Domain/Repositories/MongoDB/MongoDbRepository.cs @@ -674,7 +674,8 @@ namespace Volo.Abp.Domain.Repositories.MongoDB new UnitOfWorkEventRecord( distributedEvent.EventData.GetType(), distributedEvent.EventData, - distributedEvent.EventOrder + distributedEvent.EventOrder, + useOutbox: true ) ); } diff --git a/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/UnitOfWorkEventRecord.cs b/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/UnitOfWorkEventRecord.cs index 11ae4920cd..dd917d6fe2 100644 --- a/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/UnitOfWorkEventRecord.cs +++ b/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/UnitOfWorkEventRecord.cs @@ -11,6 +11,8 @@ namespace Volo.Abp.Uow public long EventOrder { get; } + public bool UseOutbox { get; } + /// /// Extra properties can be used if needed. /// @@ -19,11 +21,13 @@ namespace Volo.Abp.Uow public UnitOfWorkEventRecord( Type eventType, object eventData, - long eventOrder) + long eventOrder, + bool useOutbox = false) { EventType = eventType; EventData = eventData; EventOrder = eventOrder; + UseOutbox = useOutbox; } } } \ No newline at end of file