Add AbpRebusEventHandlerStep

pull/11432/head
liangshiwei 3 years ago
parent 5fca6daa4b
commit 69a82a9f4b

@ -1,5 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using Rebus.Handlers;
using Rebus.Pipeline;
using Rebus.Pipeline.Receive;
using Rebus.ServiceProvider;
using Volo.Abp.Modularity;
@ -21,6 +23,17 @@ public class AbpEventBusRebusModule : AbpModule
context.Services.AddRebus(configure =>
{
configure.Options(options =>
{
options.Decorate<IPipeline>(d =>
{
var step = new AbpRebusEventHandlerStep();
var pipeline = d.Get<IPipeline>();
return new PipelineStepInjector(pipeline).OnReceive(step, PipelineRelativePosition.After, typeof(ActivateHandlersStep));
});
});
preActions.Configure().Configurer?.Invoke(configure);
return configure;
});
@ -35,4 +48,4 @@ public class AbpEventBusRebusModule : AbpModule
.GetRequiredService<RebusDistributedEventBus>()
.Initialize();
}
}
}

@ -20,24 +20,13 @@ public class AbpRebusEventBusOptions
}
private Action<RebusConfigurer> _configurer;
[NotNull]
public Func<IBus, Type, object, Task> Publish {
get => _publish;
set => _publish = Check.NotNull(value, nameof(value));
}
private Func<IBus, Type, object, Task> _publish;
public Func<IBus, Type, object, Task> Publish { get; set; }
public AbpRebusEventBusOptions()
{
_publish = DefaultPublish;
_configurer = DefaultConfigure;
}
private async Task DefaultPublish(IBus bus, Type eventType, object eventData)
{
await bus.Advanced.Routing.Send(InputQueueName, eventData);
}
private void DefaultConfigure(RebusConfigurer configure)
{
configure.Subscriptions(s => s.StoreInMemory());

@ -0,0 +1,22 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Rebus.Messages;
using Rebus.Pipeline;
using Rebus.Pipeline.Receive;
namespace Volo.Abp.EventBus.Rebus;
public class AbpRebusEventHandlerStep : IIncomingStep
{
public Task Process(IncomingStepContext context, Func<Task> next)
{
var message = context.Load<Message>();
var handlerInvokers = context.Load<HandlerInvokers>().ToList();
handlerInvokers.RemoveAll(x => x.Handler.GetType() == typeof(RebusDistributedEventHandlerAdapter<object>));
context.Save(new HandlerInvokers(message, handlerInvokers));
return next();
}
}

@ -148,9 +148,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
await TriggerHandlersAsync(eventType, eventData);
}
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
{
await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData);
if (AbpRebusEventBusOptions.Publish != null)
{
await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData);
return;
}
await Rebus.Publish(eventData);
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)

Loading…
Cancel
Save