Batch publish events from outbox to the rebus event bus

pull/11243/head
liangshiwei 4 years ago
parent f36c44fc78
commit 9ec54780d4

@ -20,24 +20,13 @@ public class AbpRebusEventBusOptions
}
private Action<RebusConfigurer> _configurer;
[NotNull]
public Func<IBus, Type, object, Task> Publish {
get => _publish;
set => _publish = Check.NotNull(value, nameof(value));
}
private Func<IBus, Type, object, Task> _publish;
public Func<IBus, Type, object, Task> Publish { get; set; }
public AbpRebusEventBusOptions()
{
_publish = DefaultPublish;
_configurer = DefaultConfigure;
}
private async Task DefaultPublish(IBus bus, Type eventType, object eventData)
{
await bus.Advanced.Routing.Send(InputQueueName, eventData);
}
private void DefaultConfigure(RebusConfigurer configure)
{
configure.Subscriptions(s => s.StoreInMemory());

@ -148,7 +148,18 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
{
await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData);
await PublishAsync(eventType, eventData);
}
protected virtual async Task PublishAsync(Type eventType, object eventData)
{
if (AbpRebusEventBusOptions.Publish != null)
{
await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData);
return;
}
await Rebus.Advanced.Routing.Send(AbpRebusEventBusOptions.InputQueueName, eventData);
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
@ -210,9 +221,16 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return PublishToEventBusAsync(eventType, eventData);
}
public override Task<MultipleOutgoingEventPublishResult> PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
public async override Task<MultipleOutgoingEventPublishResult> PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{
throw new NotImplementedException();
var outgoingEventArray = outgoingEvents.ToArray();
foreach (var outgoingEvent in outgoingEventArray)
{
await PublishFromOutboxAsync(outgoingEvent, outboxConfig);
}
return new MultipleOutgoingEventPublishResult(outgoingEventArray);
}
public async override Task ProcessFromInboxAsync(

Loading…
Cancel
Save