Batch publish events from outbox to the kafka event bus

pull/11243/head
liangshiwei 4 years ago
parent 1d42954313
commit 5ee5919463

@ -77,12 +77,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return;
}
string messageId = null;
if (message.Headers.TryGetLastBytes("messageId", out var messageIdBytes))
{
messageId = System.Text.Encoding.UTF8.GetString(messageIdBytes);
}
var messageId = message.GetMessageId();
if (await AddToInboxAsync(messageId, eventName, eventType, message.Value))
{
@ -196,9 +191,38 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
);
}
public override Task<MultipleOutgoingEventPublishResult> PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
public async override Task<MultipleOutgoingEventPublishResult> PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{
throw new NotImplementedException();
var pendingConfirms = new ConcurrentDictionary<string, Guid>();
var outgoingEventArray = outgoingEvents.ToArray();
var tasks = new List<Task>();
foreach (var outgoingEvent in outgoingEventArray)
{
var messageId = outgoingEvent.Id.ToString("N");
pendingConfirms.TryAdd(messageId, outgoingEvent.Id);
var task = PublishAsync(
AbpKafkaEventBusOptions.TopicName,
outgoingEvent.EventName,
outgoingEvent.EventData,
new Headers { { "messageId", System.Text.Encoding.UTF8.GetBytes(messageId)} },
null
);
tasks.Add(task.ContinueWith(t =>
{
if (!t.IsFaulted)
{
var message = t.Result.Message;
pendingConfirms.TryRemove(message.GetMessageId(), out _);
}
}));
}
await Task.WhenAll(tasks);
return new MultipleOutgoingEventPublishResult(outgoingEventArray.Where(x => !pendingConfirms.Select(p => p.Value).Contains(x.Id)).ToList());
}
public async override Task ProcessFromInboxAsync(
@ -244,13 +268,13 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return PublishAsync(topicName, eventName, body, headers, headersArguments);
}
private async Task PublishAsync(string topicName, string eventName, byte[] body, Headers headers, Dictionary<string, object> headersArguments)
private Task<DeliveryResult<string, byte[]>> PublishAsync(string topicName, string eventName, byte[] body, Headers headers, Dictionary<string, object> headersArguments)
{
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
SetEventMessageHeaders(headers, headersArguments);
await producer.ProduceAsync(
return producer.ProduceAsync(
topicName,
new Message<string, byte[]>
{

@ -0,0 +1,18 @@
using Confluent.Kafka;
namespace Volo.Abp.EventBus.Kafka;
public static class MessageExtensions
{
public static string GetMessageId<TKey, TValue>(this Message<TKey, TValue> message)
{
string messageId = null;
if (message.Headers.TryGetLastBytes("messageId", out var messageIdBytes))
{
messageId = System.Text.Encoding.UTF8.GetString(messageIdBytes);
}
return messageId;
}
}

@ -98,7 +98,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
break;
}
//Logger.LogInformation($"Found {waitingEvents.Count} events in the inbox.");
Logger.LogInformation($"Found {waitingEvents.Count} events in the inbox.");
foreach (var waitingEvent in waitingEvents)
{
@ -113,7 +113,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
await uow.CompleteAsync();
}
//Logger.LogInformation($"Processed the incoming event with id = {waitingEvent.Id:N}");
Logger.LogInformation($"Processed the incoming event with id = {waitingEvent.Id:N}");
}
}
}

@ -78,7 +78,7 @@ public class OutboxSender : IOutboxSender, ITransientDependency
while (true)
{
var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken);
if (waitingEvents.Count < 1000)
if (waitingEvents.Count <= 0)
{
break;
}

Loading…
Cancel
Save