diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index aeeccef062..0ca0eaebbd 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -9,7 +9,6 @@ using Microsoft.Extensions.Options; using Volo.Abp.Data; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; -using Volo.Abp.EventBus.Local; using Volo.Abp.Kafka; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; @@ -49,7 +48,8 @@ namespace Volo.Abp.EventBus.Kafka MessageConsumerFactory = messageConsumerFactory; Serializer = serializer; ProducerPool = producerPool; - DeadLetterTopicName = AbpEventBusOptions.DeadLetterName ?? AbpKafkaEventBusOptions.TopicName + "_dead_letter"; + DeadLetterTopicName = + AbpEventBusOptions.DeadLetterName ?? AbpKafkaEventBusOptions.TopicName + "_dead_letter"; HandlerFactories = new ConcurrentDictionary>(); EventTypes = new ConcurrentDictionary(); @@ -59,6 +59,7 @@ namespace Volo.Abp.EventBus.Kafka { Consumer = MessageConsumerFactory.Create( AbpKafkaEventBusOptions.TopicName, + DeadLetterTopicName, AbpKafkaEventBusOptions.GroupId, AbpKafkaEventBusOptions.ConnectionName); Consumer.OnMessageReceived(ProcessEventAsync); @@ -75,7 +76,7 @@ namespace Volo.Abp.EventBus.Kafka return; } - var eventMessage = Serializer.Deserialize(message.Value); + var eventMessage = Serializer.Deserialize(message.Value, eventType); await TriggerHandlersAsync(eventType, eventMessage, context => { context.SetProperty(KafkaEventErrorHandler.HeadersKey, message.Headers); }); @@ -156,7 +157,7 @@ namespace Volo.Abp.EventBus.Kafka public override async Task PublishAsync(Type eventType, object eventData) { - await PublishAsync(eventType, eventData, null); + await PublishAsync(eventType, eventData, new Headers {{"messageId", Serializer.Serialize(Guid.NewGuid())}}); } public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers) diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs index 6bb1307a12..d5e4452618 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs @@ -12,7 +12,7 @@ namespace Volo.Abp.EventBus.Kafka public class KafkaEventErrorHandler : EventErrorHandlerBase, ISingletonDependency { public const string HeadersKey = "headers"; - public const string RetryIndexKey = "retryIndex"; + public const string RetryAttemptKey = "retryAttempt"; protected IKafkaSerializer Serializer { get; } @@ -30,26 +30,25 @@ namespace Volo.Abp.EventBus.Kafka await Task.Delay(Options.RetryStrategyOptions.IntervalMillisecond); } - var headers = context.GetProperty(HeadersKey) ?? new Headers(); + var headers = context.GetProperty(HeadersKey).As(); - var index = 0; - if (headers.Any(x => x.Key == RetryIndexKey)) + var retryAttempt = 0; + if (headers.Any(x => x.Key == RetryAttemptKey)) { - index = Serializer.Deserialize(headers.GetLastBytes(RetryIndexKey)); + retryAttempt = Serializer.Deserialize(headers.GetLastBytes(RetryAttemptKey)); } - headers.Remove(RetryIndexKey); - headers.Add(RetryIndexKey, Serializer.Serialize(++index)); + headers.Remove(RetryAttemptKey); + headers.Add(RetryAttemptKey, Serializer.Serialize(++retryAttempt)); await context.EventBus.As().PublishAsync(context.EventType, context.EventData, headers); } protected override async Task MoveToDeadLetter(EventExecutionErrorContext context) { - await context.EventBus.As().PublishToDeadLetterAsync(context.EventType, context.EventData, new Headers - { - {"exceptions", Serializer.Serialize(context.Exceptions)} - }); + var headers = context.GetProperty(HeadersKey).As(); + headers.Add("exceptions", Serializer.Serialize(context.Exceptions.Select(x => x.ToString()).ToList())); + await context.EventBus.As().PublishToDeadLetterAsync(context.EventType, context.EventData, headers); } protected override bool ShouldRetry(EventExecutionErrorContext context) @@ -59,16 +58,16 @@ namespace Volo.Abp.EventBus.Kafka return false; } - var headers = context.GetProperty(HeadersKey); + var headers = context.GetProperty(HeadersKey).As(); - if (headers == null) + if (headers.All(x => x.Key != RetryAttemptKey)) { return true; } - var index = Serializer.Deserialize(headers.GetLastBytes(RetryIndexKey)); + var retryAttempt = Serializer.Deserialize(headers.GetLastBytes(RetryAttemptKey)); - return Options.RetryStrategyOptions.MaxRetryAttempts > index; + return Options.RetryStrategyOptions.MaxRetryAttempts > retryAttempt; } } } diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqEventErrorHandler.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqEventErrorHandler.cs index 01ec6a3888..03a675d4f2 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqEventErrorHandler.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqEventErrorHandler.cs @@ -12,7 +12,7 @@ namespace Volo.Abp.EventBus.RabbitMq public class RabbitMqEventErrorHandler : EventErrorHandlerBase, ISingletonDependency { public const string HeadersKey = "headers"; - public const string RetryIndexKey = "retryIndex"; + public const string RetryAttemptKey = "retryAttempt"; public RabbitMqEventErrorHandler( IOptions options) @@ -30,13 +30,13 @@ namespace Volo.Abp.EventBus.RabbitMq var properties = context.GetProperty(HeadersKey).As(); var headers = properties.Headers ?? new Dictionary(); - var index = 0; - if (headers.ContainsKey(RetryIndexKey)) + var retryAttempt = 0; + if (headers.ContainsKey(RetryAttemptKey)) { - index = (int) headers[RetryIndexKey]; + retryAttempt = (int) headers[RetryAttemptKey]; } - headers[RetryIndexKey] = ++index; + headers[RetryAttemptKey] = ++retryAttempt; headers["exceptions"] = context.Exceptions.Select(x => x.ToString()).ToList(); properties.Headers = headers; @@ -64,14 +64,14 @@ namespace Volo.Abp.EventBus.RabbitMq var properties = context.GetProperty(HeadersKey).As(); - if (properties.Headers == null || !properties.Headers.ContainsKey(RetryIndexKey)) + if (properties.Headers == null || !properties.Headers.ContainsKey(RetryAttemptKey)) { return true; } - var index = (int) properties.Headers[RetryIndexKey]; + var retryAttempt = (int) properties.Headers[RetryAttemptKey]; - return Options.RetryStrategyOptions.MaxRetryAttempts > index; + return Options.RetryStrategyOptions.MaxRetryAttempts > retryAttempt; } } } diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs index 83b3f84340..a13f964d3f 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs @@ -13,26 +13,27 @@ namespace Volo.Abp.EventBus.Rebus public override void ConfigureServices(ServiceConfigurationContext context) { var abpEventBusOptions = context.Services.ExecutePreConfiguredActions(); + var options = context.Services.ExecutePreConfiguredActions();; context.Services.AddTransient(typeof(IHandleMessages<>), typeof(RebusDistributedEventHandlerAdapter<>)); Configure(rebusOptions => { context.Services.ExecutePreConfiguredActions(rebusOptions); + }); - context.Services.AddRebus(configure => + context.Services.AddRebus(configure => + { + if (abpEventBusOptions.RetryStrategyOptions != null) { - if (abpEventBusOptions.RetryStrategyOptions != null) - { - configure.Options(b => - b.SimpleRetryStrategy( - errorQueueAddress: abpEventBusOptions.DeadLetterName ?? rebusOptions.InputQueueName + "_dead_letter", - maxDeliveryAttempts: abpEventBusOptions.RetryStrategyOptions.MaxRetryAttempts)); - } + configure.Options(b => + b.SimpleRetryStrategy( + errorQueueAddress: abpEventBusOptions.DeadLetterName ?? options.InputQueueName + "_dead_letter", + maxDeliveryAttempts: abpEventBusOptions.RetryStrategyOptions.MaxRetryAttempts)); + } - rebusOptions.Configurer?.Invoke(configure); - return configure; - }); + options.Configurer?.Invoke(configure); + return configure; }); } diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs index 279765efdb..d1bad00533 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs @@ -87,7 +87,6 @@ namespace Volo.Abp.Kafka { await CreateTopicAsync(); Consume(); - Timer.Stop(); } diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs index 68d1162b7f..4a22fd04f6 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs @@ -21,7 +21,7 @@ namespace Volo.Abp.Kafka string connectionName = null) { var consumer = ServiceScope.ServiceProvider.GetRequiredService(); - consumer.Initialize(topicName, groupId, connectionName); + consumer.Initialize(topicName, deadLetterTopicName, groupId, connectionName); return consumer; } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs index 18af1b758e..6af3f8364f 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs @@ -6,7 +6,7 @@ namespace Volo.Abp.RabbitMQ { public string ExchangeName { get; } - public string DeadLetterExchangeName { get; } + public string DeadLetterExchangeName { get; set; } public string Type { get; } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs index 4fc0aff1ef..f1fcb07255 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs @@ -8,7 +8,7 @@ namespace Volo.Abp.RabbitMQ { [NotNull] public string QueueName { get; } - [NotNull] public string DeadLetterQueueName { get; } + public string DeadLetterQueueName { get; set; } public bool Durable { get; set; } @@ -20,7 +20,7 @@ namespace Volo.Abp.RabbitMQ public QueueDeclareConfiguration( [NotNull] string queueName, - [NotNull] string deadLetterQueueName, + string deadLetterQueueName, bool durable = true, bool exclusive = false, bool autoDelete = false, diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs index 100a79cc11..671445e00d 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs @@ -6,6 +6,7 @@ using RabbitMQ.Client.Events; using System; using System.Collections.Concurrent; using System.Threading.Tasks; +using RabbitMQ.Client.Exceptions; using Volo.Abp.DependencyInjection; using Volo.Abp.ExceptionHandling; using Volo.Abp.Threading; @@ -178,7 +179,7 @@ namespace Volo.Abp.RabbitMQ Channel.QueueBind(Queue.DeadLetterQueueName, Exchange.DeadLetterExchangeName, Queue.DeadLetterQueueName); } - Channel.QueueDeclare( + var result = Channel.QueueDeclare( queue: Queue.QueueName, durable: Queue.Durable, exclusive: Queue.Exclusive, @@ -197,6 +198,17 @@ namespace Volo.Abp.RabbitMQ } catch (Exception ex) { + if (ex is OperationInterruptedException operationInterruptedException && + operationInterruptedException.ShutdownReason.ReplyCode == 406 && + operationInterruptedException.Message.Contains("arg 'x-dead-letter-exchange'")) + { + Exchange.DeadLetterExchangeName = null; + Queue.DeadLetterQueueName = null; + Queue.Arguments.Remove("x-dead-letter-exchange"); + Queue.Arguments.Remove("x-dead-letter-routing-key"); + Logger.LogWarning("Unable to bind the dead letter queue to an existing queue. You can delete the queue or add policy. See: https://www.rabbitmq.com/parameters.html"); + } + Logger.LogException(ex, LogLevel.Warning); await ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning); }