From 5ee59194637c187ecfee34aab7163c801e182044 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Sun, 9 Jan 2022 20:24:39 +0800 Subject: [PATCH] Batch publish events from outbox to the kafka event bus --- .../Kafka/KafkaDistributedEventBus.cs | 44 ++++++++++++++----- .../Abp/EventBus/Kafka/MessageExtensions.cs | 18 ++++++++ .../EventBus/Distributed/InboxProcessor.cs | 4 +- .../Abp/EventBus/Distributed/OutboxSender.cs | 2 +- 4 files changed, 55 insertions(+), 13 deletions(-) create mode 100644 framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 23ae188170..aef7662095 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -77,12 +77,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen return; } - string messageId = null; - - if (message.Headers.TryGetLastBytes("messageId", out var messageIdBytes)) - { - messageId = System.Text.Encoding.UTF8.GetString(messageIdBytes); - } + var messageId = message.GetMessageId(); if (await AddToInboxAsync(messageId, eventName, eventType, message.Value)) { @@ -196,9 +191,38 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen ); } - public override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) + public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) { - throw new NotImplementedException(); + var pendingConfirms = new ConcurrentDictionary(); + var outgoingEventArray = outgoingEvents.ToArray(); + + var tasks = new List(); + foreach (var outgoingEvent in outgoingEventArray) + { + var messageId = outgoingEvent.Id.ToString("N"); + pendingConfirms.TryAdd(messageId, outgoingEvent.Id); + + var task = PublishAsync( + AbpKafkaEventBusOptions.TopicName, + outgoingEvent.EventName, + outgoingEvent.EventData, + new Headers { { "messageId", System.Text.Encoding.UTF8.GetBytes(messageId)} }, + null + ); + + tasks.Add(task.ContinueWith(t => + { + if (!t.IsFaulted) + { + var message = t.Result.Message; + pendingConfirms.TryRemove(message.GetMessageId(), out _); + } + })); + } + + await Task.WhenAll(tasks); + + return new MultipleOutgoingEventPublishResult(outgoingEventArray.Where(x => !pendingConfirms.Select(p => p.Value).Contains(x.Id)).ToList()); } public async override Task ProcessFromInboxAsync( @@ -244,13 +268,13 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen return PublishAsync(topicName, eventName, body, headers, headersArguments); } - private async Task PublishAsync(string topicName, string eventName, byte[] body, Headers headers, Dictionary headersArguments) + private Task> PublishAsync(string topicName, string eventName, byte[] body, Headers headers, Dictionary headersArguments) { var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); SetEventMessageHeaders(headers, headersArguments); - await producer.ProduceAsync( + return producer.ProduceAsync( topicName, new Message { diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs new file mode 100644 index 0000000000..17a80ec87c --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs @@ -0,0 +1,18 @@ +using Confluent.Kafka; + +namespace Volo.Abp.EventBus.Kafka; + +public static class MessageExtensions +{ + public static string GetMessageId(this Message message) + { + string messageId = null; + + if (message.Headers.TryGetLastBytes("messageId", out var messageIdBytes)) + { + messageId = System.Text.Encoding.UTF8.GetString(messageIdBytes); + } + + return messageId; + } +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index 0d22245d13..50c1a4e009 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -98,7 +98,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency break; } - //Logger.LogInformation($"Found {waitingEvents.Count} events in the inbox."); + Logger.LogInformation($"Found {waitingEvents.Count} events in the inbox."); foreach (var waitingEvent in waitingEvents) { @@ -113,7 +113,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency await uow.CompleteAsync(); } - //Logger.LogInformation($"Processed the incoming event with id = {waitingEvent.Id:N}"); + Logger.LogInformation($"Processed the incoming event with id = {waitingEvent.Id:N}"); } } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs index d42110e4ea..17dec0a810 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs @@ -78,7 +78,7 @@ public class OutboxSender : IOutboxSender, ITransientDependency while (true) { var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken); - if (waitingEvents.Count < 1000) + if (waitingEvents.Count <= 0) { break; }