Merge pull request #7630 from abpframework/liangshiwei/kafka

Add re-queue feature to kafka
pull/7631/head
Halil İbrahim Kalkan 5 years ago committed by GitHub
commit 9b5a75fc49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -9,11 +9,13 @@ namespace Volo.Abp.Kafka
public KafkaConnections Connections { get; }
public Action<ProducerConfig> ConfigureProducer { get; set; }
public Action<ConsumerConfig> ConfigureConsumer { get; set; }
public Action<TopicSpecification> ConfigureTopic { get; set; }
public bool ReQueue { get; set; } = true;
public AbpKafkaOptions()
{
Connections = new KafkaConnections();

@ -1,6 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
@ -20,6 +19,8 @@ namespace Volo.Abp.Kafka
protected IConsumerPool ConsumerPool { get; }
protected IProducerPool ProducerPool { get; }
protected IExceptionNotifier ExceptionNotifier { get; }
protected AbpKafkaOptions Options { get; }
@ -37,10 +38,12 @@ namespace Volo.Abp.Kafka
public KafkaMessageConsumer(
IConsumerPool consumerPool,
IExceptionNotifier exceptionNotifier,
IOptions<AbpKafkaOptions> options)
IOptions<AbpKafkaOptions> options,
IProducerPool producerPool)
{
ConsumerPool = consumerPool;
ExceptionNotifier = exceptionNotifier;
ProducerPool = producerPool;
Options = options.Value;
Logger = NullLogger<KafkaMessageConsumer>.Instance;
@ -132,14 +135,29 @@ namespace Volo.Abp.Kafka
{
await callback(consumeResult.Message);
}
Consumer.Commit(consumeResult);
}
catch (Exception ex)
{
await RequeueAsync(consumeResult);
Logger.LogException(ex);
await ExceptionNotifier.NotifyAsync(ex);
}
finally
{
Consumer.Commit(consumeResult);
}
}
protected virtual async Task RequeueAsync(ConsumeResult<string, byte[]> consumeResult)
{
if (!Options.ReQueue)
{
return;
}
var producer = ProducerPool.Get(ConnectionName);
await producer.ProduceAsync(consumeResult.Topic, consumeResult.Message);
}
public virtual void Dispose()

Loading…
Cancel
Save