diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs index 567c303704..6bb1307a12 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs @@ -32,7 +32,7 @@ namespace Volo.Abp.EventBus.Kafka var headers = context.GetProperty(HeadersKey) ?? new Headers(); - var index = 1; + var index = 0; if (headers.Any(x => x.Key == RetryIndexKey)) { index = Serializer.Deserialize(headers.GetLastBytes(RetryIndexKey)); diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index aa1aacc485..ad672b8b7d 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; +using Volo.Abp.Data; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.MultiTenancy; @@ -65,19 +66,16 @@ namespace Volo.Abp.EventBus.RabbitMq Consumer = MessageConsumerFactory.Create( new ExchangeDeclareConfiguration( AbpRabbitMqEventBusOptions.ExchangeName, + AbpRabbitMqEventBusOptions.ExchangeName + suffix, type: "direct", durable: true ), new QueueDeclareConfiguration( AbpRabbitMqEventBusOptions.ClientName, + AbpEventBusOptions.DeadLetterName ?? AbpRabbitMqEventBusOptions.ClientName + suffix, durable: true, exclusive: false, - autoDelete: false, - arguments: new Dictionary - { - {"x-dead-letter-exchange", AbpRabbitMqEventBusOptions.ExchangeName + suffix}, - {"x-dead-letter-routing-key", AbpEventBusOptions.DeadLetterName ?? AbpRabbitMqEventBusOptions.ClientName + suffix} - } + autoDelete: false ), AbpRabbitMqEventBusOptions.ConnectionName ); @@ -98,7 +96,10 @@ namespace Volo.Abp.EventBus.RabbitMq var eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType); - await TriggerHandlersAsync(eventType, eventData); + await TriggerHandlersAsync(eventType, eventData, errorContext => + { + errorContext.SetProperty("headers", ea.BasicProperties); + }); } public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class @@ -179,35 +180,12 @@ namespace Volo.Abp.EventBus.RabbitMq GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } - public override Task PublishAsync(Type eventType, object eventData) + public override async Task PublishAsync(Type eventType, object eventData) { - var eventName = EventNameAttribute.GetNameOrDefault(eventType); - var body = Serializer.Serialize(eventData); - - using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel()) - { - channel.ExchangeDeclare( - AbpRabbitMqEventBusOptions.ExchangeName, - "direct", - durable: true - ); - - var properties = channel.CreateBasicProperties(); - properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; - - channel.BasicPublish( - exchange: AbpRabbitMqEventBusOptions.ExchangeName, - routingKey: eventName, - mandatory: true, - basicProperties: properties, - body: body - ); - } - - return Task.CompletedTask; + await PublishAsync(eventType, eventData, null); } - public Task PublishAsync(Type eventType, object eventData, Dictionary headers) + public Task PublishAsync(Type eventType, object eventData, IBasicProperties properties) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); @@ -220,9 +198,12 @@ namespace Volo.Abp.EventBus.RabbitMq durable: true ); - var properties = channel.CreateBasicProperties(); - properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; - properties.Headers = headers; + if (properties == null) + { + properties = channel.CreateBasicProperties(); + properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; + properties.MessageId = Guid.NewGuid().ToString("N"); + } channel.BasicPublish( exchange: AbpRabbitMqEventBusOptions.ExchangeName, @@ -253,9 +234,11 @@ namespace Volo.Abp.EventBus.RabbitMq { var handlerFactoryList = new List(); - foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) + foreach (var handlerFactory in + HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) { - handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + handlerFactoryList.Add( + new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); } return handlerFactoryList.ToArray(); diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqEventErrorHandler.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqEventErrorHandler.cs index b9b0ac6497..01ec6a3888 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqEventErrorHandler.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqEventErrorHandler.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Microsoft.Extensions.Options; +using RabbitMQ.Client; using Volo.Abp.Data; using Volo.Abp.DependencyInjection; @@ -25,23 +27,20 @@ namespace Volo.Abp.EventBus.RabbitMq await Task.Delay(Options.RetryStrategyOptions.IntervalMillisecond); } - var headers = context.GetProperty>(HeadersKey) ?? - new Dictionary(); + var properties = context.GetProperty(HeadersKey).As(); + var headers = properties.Headers ?? new Dictionary(); - var index = 1; + var index = 0; if (headers.ContainsKey(RetryIndexKey)) { index = (int) headers[RetryIndexKey]; - headers[RetryIndexKey] = ++index; - } - else - { - headers[RetryIndexKey] = index; } - headers["exceptions"] = context.Exceptions; + headers[RetryIndexKey] = ++index; + headers["exceptions"] = context.Exceptions.Select(x => x.ToString()).ToList(); + properties.Headers = headers; - await context.EventBus.As().PublishAsync(context.EventType, context.EventData, headers); + await context.EventBus.As().PublishAsync(context.EventType, context.EventData, properties); } protected override Task MoveToDeadLetter(EventExecutionErrorContext context) @@ -63,14 +62,14 @@ namespace Volo.Abp.EventBus.RabbitMq return false; } - var headers = context.GetProperty>(HeadersKey); + var properties = context.GetProperty(HeadersKey).As(); - if (headers == null || !headers.ContainsKey(RetryIndexKey)) + if (properties.Headers == null || !properties.Headers.ContainsKey(RetryIndexKey)) { return true; } - var index = (int) headers[RetryIndexKey]; + var index = (int) properties.Headers[RetryIndexKey]; return Options.RetryStrategyOptions.MaxRetryAttempts > index; } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventErrorHandlerBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventErrorHandlerBase.cs index b5d22389d3..32240eb0e5 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventErrorHandlerBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventErrorHandlerBase.cs @@ -52,7 +52,7 @@ namespace Volo.Abp.EventBus return Options.ErrorHandleSelector.Invoke(context.EventType); } - return false; + return true; } protected virtual bool ShouldRetry(EventExecutionErrorContext context) diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs index 67f234aec4..18af1b758e 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs @@ -6,6 +6,8 @@ namespace Volo.Abp.RabbitMQ { public string ExchangeName { get; } + public string DeadLetterExchangeName { get; } + public string Type { get; } public bool Durable { get; set; } @@ -15,16 +17,18 @@ namespace Volo.Abp.RabbitMQ public IDictionary Arguments { get; } public ExchangeDeclareConfiguration( - string exchangeName, - string type, - bool durable = false, + string exchangeName, + string deadLetterExchangeName, + string type, + bool durable = false, bool autoDelete = false) { ExchangeName = exchangeName; + DeadLetterExchangeName = deadLetterExchangeName; Type = type; Durable = durable; AutoDelete = autoDelete; Arguments = new Dictionary(); } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs index 17a5c2dfbf..4fc0aff1ef 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs @@ -8,6 +8,8 @@ namespace Volo.Abp.RabbitMQ { [NotNull] public string QueueName { get; } + [NotNull] public string DeadLetterQueueName { get; } + public bool Durable { get; set; } public bool Exclusive { get; set; } @@ -18,12 +20,14 @@ namespace Volo.Abp.RabbitMQ public QueueDeclareConfiguration( [NotNull] string queueName, + [NotNull] string deadLetterQueueName, bool durable = true, bool exclusive = false, bool autoDelete = false, Dictionary arguments = null) { QueueName = queueName; + DeadLetterQueueName = deadLetterQueueName; Durable = durable; Exclusive = exclusive; AutoDelete = autoDelete; diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs index b5a165157f..100a79cc11 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs @@ -156,21 +156,26 @@ namespace Volo.Abp.RabbitMQ arguments: Exchange.Arguments ); - if (Queue.Arguments.ContainsKey("x-dead-letter-exchange") && - Queue.Arguments.ContainsKey("x-dead-letter-routing-key")) + if (!Exchange.DeadLetterExchangeName.IsNullOrWhiteSpace() && + !Queue.DeadLetterQueueName.IsNullOrWhiteSpace()) { Channel.ExchangeDeclare( - Exchange.Arguments["x-dead-letter-exchange"].ToString(), + Exchange.DeadLetterExchangeName, Exchange.Type, Exchange.Durable, Exchange.AutoDelete ); Channel.QueueDeclare( - Queue.Arguments["x-dead-letter-routing-key"].ToString(), + Queue.DeadLetterQueueName, Queue.Durable, Queue.Exclusive, Queue.AutoDelete); + + Queue.Arguments["x-dead-letter-exchange"] = Exchange.DeadLetterExchangeName; + Queue.Arguments["x-dead-letter-routing-key"] = Queue.DeadLetterQueueName; + + Channel.QueueBind(Queue.DeadLetterQueueName, Exchange.DeadLetterExchangeName, Queue.DeadLetterQueueName); } Channel.QueueDeclare(