From 002b539c45ea37ce77cb5a22e21b0d6b952ba34f Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Sun, 7 Feb 2021 22:32:11 +0800 Subject: [PATCH] Add re-queue feature to kafka --- .../Volo/Abp/Kafka/AbpKafkaOptions.cs | 4 ++- .../Volo/Abp/Kafka/KafkaMessageConsumer.cs | 26 ++++++++++++++++--- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs index 1769d8a076..26d15ce818 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs @@ -9,11 +9,13 @@ namespace Volo.Abp.Kafka public KafkaConnections Connections { get; } public Action ConfigureProducer { get; set; } - + public Action ConfigureConsumer { get; set; } public Action ConfigureTopic { get; set; } + public bool ReQueue { get; set; } = true; + public AbpKafkaOptions() { Connections = new KafkaConnections(); diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs index 8ae81a2a26..a1c3fefd74 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Concurrent; -using System.Linq; using System.Threading.Tasks; using Confluent.Kafka; using Confluent.Kafka.Admin; @@ -20,6 +19,8 @@ namespace Volo.Abp.Kafka protected IConsumerPool ConsumerPool { get; } + protected IProducerPool ProducerPool { get; } + protected IExceptionNotifier ExceptionNotifier { get; } protected AbpKafkaOptions Options { get; } @@ -37,10 +38,12 @@ namespace Volo.Abp.Kafka public KafkaMessageConsumer( IConsumerPool consumerPool, IExceptionNotifier exceptionNotifier, - IOptions options) + IOptions options, + IProducerPool producerPool) { ConsumerPool = consumerPool; ExceptionNotifier = exceptionNotifier; + ProducerPool = producerPool; Options = options.Value; Logger = NullLogger.Instance; @@ -132,14 +135,29 @@ namespace Volo.Abp.Kafka { await callback(consumeResult.Message); } - - Consumer.Commit(consumeResult); } catch (Exception ex) { + await RequeueAsync(consumeResult); + Logger.LogException(ex); await ExceptionNotifier.NotifyAsync(ex); } + finally + { + Consumer.Commit(consumeResult); + } + } + + protected virtual async Task RequeueAsync(ConsumeResult consumeResult) + { + if (!Options.ReQueue) + { + return; + } + + var producer = ProducerPool.Get(ConnectionName); + await producer.ProduceAsync(consumeResult.Topic, consumeResult.Message); } public virtual void Dispose()