From f6198eeb806b1fa2e5c382773c8a5fe629850b3d Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Fri, 23 Apr 2021 22:19:54 +0800 Subject: [PATCH] Implement EventErrorHandler for kafka --- .../Kafka/KafkaDistributedEventBus.cs | 21 ++++++++--- .../EventBus/Kafka/KafkaEventErrorHandler.cs | 34 ++++-------------- .../Volo/Abp/EventBus/AbpEventBusOptions.cs | 2 +- .../Abp/EventBus/EventErrorHandlerBase.cs | 12 +++---- .../EventBus/Local/LocalEventErrorHandler.cs | 2 +- .../Volo/Abp/Kafka/IKafkaMessageConsumer.cs | 2 -- .../Abp/Kafka/IKafkaMessageConsumerFactory.cs | 2 ++ .../Volo/Abp/Kafka/KafkaMessageConsumer.cs | 36 ++++++++++++++----- .../Abp/Kafka/KafkaMessageConsumerFactory.cs | 1 + 9 files changed, 62 insertions(+), 50 deletions(-) diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 46eb1c0827..b6dc3e9bb5 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -20,6 +20,7 @@ namespace Volo.Abp.EventBus.Kafka [ExposeServices(typeof(IDistributedEventBus), typeof(KafkaDistributedEventBus))] public class KafkaDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency { + protected AbpEventBusOptions AbpEventBusOptions { get; } protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; } protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } protected IKafkaMessageConsumerFactory MessageConsumerFactory { get; } @@ -28,6 +29,7 @@ namespace Volo.Abp.EventBus.Kafka protected ConcurrentDictionary> HandlerFactories { get; } protected ConcurrentDictionary EventTypes { get; } protected IKafkaMessageConsumer Consumer { get; private set; } + protected string DeadLetterTopicName { get; } public KafkaDistributedEventBus( IServiceScopeFactory serviceScopeFactory, @@ -37,14 +39,17 @@ namespace Volo.Abp.EventBus.Kafka IOptions abpDistributedEventBusOptions, IKafkaSerializer serializer, IProducerPool producerPool, - IEventErrorHandler errorHandler) + IEventErrorHandler errorHandler, + IOptions abpEventBusOptions) : base(serviceScopeFactory, currentTenant, errorHandler) { AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value; AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value; + AbpEventBusOptions = abpEventBusOptions.Value; MessageConsumerFactory = messageConsumerFactory; Serializer = serializer; ProducerPool = producerPool; + DeadLetterTopicName = AbpEventBusOptions.DeadLetterQueue ?? AbpKafkaEventBusOptions.TopicName + "_error"; HandlerFactories = new ConcurrentDictionary>(); EventTypes = new ConcurrentDictionary(); @@ -56,8 +61,6 @@ namespace Volo.Abp.EventBus.Kafka AbpKafkaEventBusOptions.TopicName, AbpKafkaEventBusOptions.GroupId, AbpKafkaEventBusOptions.ConnectionName); - - Consumer.Consume(); Consumer.OnMessageReceived(ProcessEventAsync); SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); @@ -157,6 +160,16 @@ namespace Volo.Abp.EventBus.Kafka } public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers) + { + await PublishAsync(AbpKafkaEventBusOptions.TopicName, eventType, eventData, headers); + } + + public virtual async Task PublishToDeadLetterAsync(Type eventType, object eventData, Headers headers) + { + await PublishAsync(DeadLetterTopicName, eventType, eventData, headers); + } + + private async Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); @@ -164,7 +177,7 @@ namespace Volo.Abp.EventBus.Kafka var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); await producer.ProduceAsync( - AbpKafkaEventBusOptions.TopicName, + topicName, new Message { Key = eventName, Value = body, Headers = headers diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs index 65c8561149..2ee2a23515 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs @@ -14,26 +14,14 @@ namespace Volo.Abp.EventBus.Kafka protected IKafkaSerializer Serializer { get; } protected KafkaDistributedEventBus EventBus { get; } - protected IProducerPool ProducerPool { get; } - protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; } - - protected string ErrorTopicName { get; } public KafkaEventErrorHandler( IOptions options, IKafkaSerializer serializer, - KafkaDistributedEventBus eventBus, - IKafkaMessageConsumerFactory consumerFactory, - IProducerPool producerPool, - IOptions abpKafkaEventBusOptions) : base(options) + KafkaDistributedEventBus eventBus) : base(options) { Serializer = serializer; EventBus = eventBus; - ProducerPool = producerPool; - AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value; - - ErrorTopicName = options.Value.ErrorQueue ?? abpKafkaEventBusOptions.Value.TopicName + "_error"; - consumerFactory.Create(ErrorTopicName, string.Empty, abpKafkaEventBusOptions.Value.ConnectionName); } protected override async Task Retry(EventExecutionErrorContext context) @@ -52,19 +40,12 @@ namespace Volo.Abp.EventBus.Kafka await EventBus.PublishAsync(context.EventType, context.EventData, headers); } - protected override async Task MoveToErrorQueue(EventExecutionErrorContext context) + protected override async Task MoveToDeadLetter(EventExecutionErrorContext context) { - var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); - var eventName = EventNameAttribute.GetNameOrDefault(context.EventType); - var body = Serializer.Serialize(context.EventData); - - await producer.ProduceAsync( - AbpKafkaEventBusOptions.TopicName, - new Message - { - Key = eventName, Value = body, - Headers = new Headers {{"exceptions", Serializer.Serialize(context.Exceptions)}} - }); + await EventBus.PublishToDeadLetterAsync(context.EventType, context.EventData, new Headers + { + {"exceptions", Serializer.Serialize(context.Exceptions)} + }); } protected override bool ShouldRetry(EventExecutionErrorContext context) @@ -75,14 +56,13 @@ namespace Volo.Abp.EventBus.Kafka } var headers = context.GetProperty(HeadersKey); - var index = 1; if (headers == null) { return true; } - index = Serializer.Deserialize(headers.GetLastBytes(RetryIndexKey)); + var index = Serializer.Deserialize(headers.GetLastBytes(RetryIndexKey)); return Options.RetryStrategyOptions.Count < index; } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusOptions.cs index 18d3290c25..ad3926b334 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusOptions.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusOptions.cs @@ -8,7 +8,7 @@ namespace Volo.Abp.EventBus public Func ErrorHandleSelector { get; set; } - public string ErrorQueue { get; set; } + public string DeadLetterQueue { get; set; } public AbpEventBusRetryStrategyOptions RetryStrategyOptions { get; set; } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventErrorHandlerBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventErrorHandlerBase.cs index 14a27c4221..0bb6430e56 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventErrorHandlerBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventErrorHandlerBase.cs @@ -1,7 +1,5 @@ -using System.Collections.Generic; -using System.Threading.Tasks; +using System.Threading.Tasks; using Microsoft.Extensions.Options; -using Volo.Abp.Data; namespace Volo.Abp.EventBus { @@ -9,7 +7,7 @@ namespace Volo.Abp.EventBus { protected AbpEventBusOptions Options { get; } - public EventErrorHandlerBase(IOptions options) + protected EventErrorHandlerBase(IOptions options) { Options = options.Value; } @@ -27,12 +25,12 @@ namespace Volo.Abp.EventBus return; } - await MoveToErrorQueue(context); + await MoveToDeadLetter(context); } protected abstract Task Retry(EventExecutionErrorContext context); - protected abstract Task MoveToErrorQueue(EventExecutionErrorContext context); + protected abstract Task MoveToDeadLetter(EventExecutionErrorContext context); protected virtual bool ShouldHandle(EventExecutionErrorContext context) { @@ -51,7 +49,7 @@ namespace Volo.Abp.EventBus protected virtual bool ShouldRetry(EventExecutionErrorContext context) { - return Options.RetryStrategyOptions == null && false; + return Options.RetryStrategyOptions != null; } } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventErrorHandler.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventErrorHandler.cs index db8964d380..5c3c2ba4f1 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventErrorHandler.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventErrorHandler.cs @@ -34,7 +34,7 @@ namespace Volo.Abp.EventBus.Local new LocalEventMessage(messageId, context.EventData, context.EventType)); } - protected override Task MoveToErrorQueue(EventExecutionErrorContext context) + protected override Task MoveToDeadLetter(EventExecutionErrorContext context) { if (context.Exceptions.Count == 1) { diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs index 721f0b5a9b..87872b31a2 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs @@ -7,7 +7,5 @@ namespace Volo.Abp.Kafka public interface IKafkaMessageConsumer { void OnMessageReceived(Func, Task> callback); - - void Consume(); } } diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs index 2b01b5a935..96ec753dc2 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs @@ -8,11 +8,13 @@ /// not disposed until end of the application. /// /// + /// /// /// /// IKafkaMessageConsumer Create( string topicName, + string deadLetterTopicName, string groupId, string connectionName = null); } diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs index f5c726063e..e519ebc87d 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Confluent.Kafka; @@ -36,6 +37,9 @@ namespace Volo.Abp.Kafka protected string TopicName { get; private set; } + + protected string DeadLetterTopicName { get; private set; } + public KafkaMessageConsumer( IConsumerPool consumerPool, IExceptionNotifier exceptionNotifier, @@ -53,16 +57,20 @@ namespace Volo.Abp.Kafka public virtual void Initialize( [NotNull] string topicName, + [NotNull] string deadLetterTopicName, [NotNull] string groupId, string connectionName = null) { Check.NotNull(topicName, nameof(topicName)); + Check.NotNull(deadLetterTopicName, nameof(deadLetterTopicName)); Check.NotNull(groupId, nameof(groupId)); TopicName = topicName; + DeadLetterTopicName = deadLetterTopicName; ConnectionName = connectionName ?? KafkaConnections.DefaultConnectionName; GroupId = groupId; AsyncHelper.RunSync(CreateTopicAsync); + Consume(); } public virtual void OnMessageReceived(Func, Task> callback) @@ -74,22 +82,34 @@ namespace Volo.Abp.Kafka { using (var adminClient = new AdminClientBuilder(Options.Connections.GetOrDefault(ConnectionName)).Build()) { - var topic = new TopicSpecification + var topics = new List { - Name = TopicName, - NumPartitions = 1, - ReplicationFactor = 1 + new() + { + Name = TopicName, + NumPartitions = 1, + ReplicationFactor = 1 + }, + new() + { + Name = DeadLetterTopicName, + NumPartitions = 1, + ReplicationFactor = 1 + } }; - Options.ConfigureTopic?.Invoke(topic); + topics.ForEach(topic => + { + Options.ConfigureTopic?.Invoke(topic); + }); try { - await adminClient.CreateTopicsAsync(new[] {topic}); + await adminClient.CreateTopicsAsync(topics); } catch (CreateTopicsException e) { - if(e.Results.First().Error.Code != ErrorCode.TopicAlreadyExists) + if(e.Results.Any(x => x.Error.Code != ErrorCode.TopicAlreadyExists)) { throw; } @@ -97,7 +117,7 @@ namespace Volo.Abp.Kafka } } - public virtual void Consume() + protected virtual void Consume() { Consumer = ConsumerPool.Get(GroupId, ConnectionName); diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs index fb08aa4144..68d1162b7f 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs @@ -16,6 +16,7 @@ namespace Volo.Abp.Kafka public IKafkaMessageConsumer Create( string topicName, + string deadLetterTopicName, string groupId, string connectionName = null) {