From 69a82a9f4b8e36c802f2b665cbdd67da26d9f055 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Mon, 24 Jan 2022 21:56:46 +0800 Subject: [PATCH] Add AbpRebusEventHandlerStep --- .../EventBus/Rebus/AbpEventBusRebusModule.cs | 15 ++++++++++++- .../EventBus/Rebus/AbpRebusEventBusOptions.cs | 13 +---------- .../Rebus/AbpRebusEventHandlerStep.cs | 22 +++++++++++++++++++ .../Rebus/RebusDistributedEventBus.cs | 10 +++++++-- 4 files changed, 45 insertions(+), 15 deletions(-) create mode 100644 framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventHandlerStep.cs diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs index 657c34d2be..e65d86f3c5 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs @@ -1,5 +1,7 @@ using Microsoft.Extensions.DependencyInjection; using Rebus.Handlers; +using Rebus.Pipeline; +using Rebus.Pipeline.Receive; using Rebus.ServiceProvider; using Volo.Abp.Modularity; @@ -21,6 +23,17 @@ public class AbpEventBusRebusModule : AbpModule context.Services.AddRebus(configure => { + configure.Options(options => + { + options.Decorate(d => + { + var step = new AbpRebusEventHandlerStep(); + var pipeline = d.Get(); + + return new PipelineStepInjector(pipeline).OnReceive(step, PipelineRelativePosition.After, typeof(ActivateHandlersStep)); + }); + }); + preActions.Configure().Configurer?.Invoke(configure); return configure; }); @@ -35,4 +48,4 @@ public class AbpEventBusRebusModule : AbpModule .GetRequiredService() .Initialize(); } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventBusOptions.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventBusOptions.cs index 5238cdd8bb..4c39b9bf1d 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventBusOptions.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventBusOptions.cs @@ -20,24 +20,13 @@ public class AbpRebusEventBusOptions } private Action _configurer; - [NotNull] - public Func Publish { - get => _publish; - set => _publish = Check.NotNull(value, nameof(value)); - } - private Func _publish; + public Func Publish { get; set; } public AbpRebusEventBusOptions() { - _publish = DefaultPublish; _configurer = DefaultConfigure; } - private async Task DefaultPublish(IBus bus, Type eventType, object eventData) - { - await bus.Advanced.Routing.Send(InputQueueName, eventData); - } - private void DefaultConfigure(RebusConfigurer configure) { configure.Subscriptions(s => s.StoreInMemory()); diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventHandlerStep.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventHandlerStep.cs new file mode 100644 index 0000000000..c0c655b596 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventHandlerStep.cs @@ -0,0 +1,22 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Rebus.Messages; +using Rebus.Pipeline; +using Rebus.Pipeline.Receive; + +namespace Volo.Abp.EventBus.Rebus; + +public class AbpRebusEventHandlerStep : IIncomingStep +{ + public Task Process(IncomingStepContext context, Func next) + { + var message = context.Load(); + var handlerInvokers = context.Load().ToList(); + + handlerInvokers.RemoveAll(x => x.Handler.GetType() == typeof(RebusDistributedEventHandlerAdapter)); + context.Save(new HandlerInvokers(message, handlerInvokers)); + + return next(); + } +} diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index d637f3fe89..cf6c1d6b0f 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -148,9 +148,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen await TriggerHandlersAsync(eventType, eventData); } - protected override async Task PublishToEventBusAsync(Type eventType, object eventData) + protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData); + if (AbpRebusEventBusOptions.Publish != null) + { + await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData); + return; + } + + await Rebus.Publish(eventData); } protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)