Merge pull request #7632 from abpframework/auto-merge/rel-4-2/153

Merge branch dev with rel-4.2
pull/7637/head
liangshiwei 5 years ago committed by GitHub
commit 298a981505
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -23,5 +23,7 @@
public bool MvcUi { get; set; }
public bool BlazorUi { get; set; }
public bool IsFreeToActiveLicenseOwners { get; set; }
}
}

@ -9,11 +9,13 @@ namespace Volo.Abp.Kafka
public KafkaConnections Connections { get; }
public Action<ProducerConfig> ConfigureProducer { get; set; }
public Action<ConsumerConfig> ConfigureConsumer { get; set; }
public Action<TopicSpecification> ConfigureTopic { get; set; }
public bool ReQueue { get; set; } = true;
public AbpKafkaOptions()
{
Connections = new KafkaConnections();

@ -1,6 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
@ -20,6 +19,8 @@ namespace Volo.Abp.Kafka
protected IConsumerPool ConsumerPool { get; }
protected IProducerPool ProducerPool { get; }
protected IExceptionNotifier ExceptionNotifier { get; }
protected AbpKafkaOptions Options { get; }
@ -37,10 +38,12 @@ namespace Volo.Abp.Kafka
public KafkaMessageConsumer(
IConsumerPool consumerPool,
IExceptionNotifier exceptionNotifier,
IOptions<AbpKafkaOptions> options)
IOptions<AbpKafkaOptions> options,
IProducerPool producerPool)
{
ConsumerPool = consumerPool;
ExceptionNotifier = exceptionNotifier;
ProducerPool = producerPool;
Options = options.Value;
Logger = NullLogger<KafkaMessageConsumer>.Instance;
@ -132,14 +135,29 @@ namespace Volo.Abp.Kafka
{
await callback(consumeResult.Message);
}
Consumer.Commit(consumeResult);
}
catch (Exception ex)
{
await RequeueAsync(consumeResult);
Logger.LogException(ex);
await ExceptionNotifier.NotifyAsync(ex);
}
finally
{
Consumer.Commit(consumeResult);
}
}
protected virtual async Task RequeueAsync(ConsumeResult<string, byte[]> consumeResult)
{
if (!Options.ReQueue)
{
return;
}
var producer = ProducerPool.Get(ConnectionName);
await producer.ProduceAsync(consumeResult.Topic, consumeResult.Message);
}
public virtual void Dispose()

@ -191,7 +191,16 @@ namespace Volo.Abp.RabbitMQ
}
catch (Exception ex)
{
Channel.BasicNack(basicDeliverEventArgs.DeliveryTag, multiple: false, requeue: true);
try
{
Channel.BasicNack(
basicDeliverEventArgs.DeliveryTag,
multiple: false,
requeue: true
);
}
catch { }
Logger.LogException(ex);
await ExceptionNotifier.NotifyAsync(ex);
}

Loading…
Cancel
Save