diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index fd6d773952..0b228d676c 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; +using Volo.Abp.Data; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Kafka; @@ -18,6 +19,7 @@ namespace Volo.Abp.EventBus.Kafka [ExposeServices(typeof(IDistributedEventBus), typeof(KafkaDistributedEventBus))] public class KafkaDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency { + protected AbpEventBusOptions AbpEventBusOptions { get; } protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; } protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } protected IKafkaMessageConsumerFactory MessageConsumerFactory { get; } @@ -26,6 +28,7 @@ namespace Volo.Abp.EventBus.Kafka protected ConcurrentDictionary> HandlerFactories { get; } protected ConcurrentDictionary EventTypes { get; } protected IKafkaMessageConsumer Consumer { get; private set; } + protected string DeadLetterTopicName { get; } public KafkaDistributedEventBus( IServiceScopeFactory serviceScopeFactory, @@ -34,14 +37,19 @@ namespace Volo.Abp.EventBus.Kafka IKafkaMessageConsumerFactory messageConsumerFactory, IOptions abpDistributedEventBusOptions, IKafkaSerializer serializer, - IProducerPool producerPool) - : base(serviceScopeFactory, currentTenant) + IProducerPool producerPool, + IEventErrorHandler errorHandler, + IOptions abpEventBusOptions) + : base(serviceScopeFactory, currentTenant, errorHandler) { AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value; AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value; + AbpEventBusOptions = abpEventBusOptions.Value; MessageConsumerFactory = messageConsumerFactory; Serializer = serializer; ProducerPool = producerPool; + DeadLetterTopicName = + AbpEventBusOptions.DeadLetterName ?? AbpKafkaEventBusOptions.TopicName + "_dead_letter"; HandlerFactories = new ConcurrentDictionary>(); EventTypes = new ConcurrentDictionary(); @@ -51,9 +59,9 @@ namespace Volo.Abp.EventBus.Kafka { Consumer = MessageConsumerFactory.Create( AbpKafkaEventBusOptions.TopicName, + DeadLetterTopicName, AbpKafkaEventBusOptions.GroupId, AbpKafkaEventBusOptions.ConnectionName); - Consumer.OnMessageReceived(ProcessEventAsync); SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); @@ -70,7 +78,18 @@ namespace Volo.Abp.EventBus.Kafka var eventData = Serializer.Deserialize(message.Value, eventType); - await TriggerHandlersAsync(eventType, eventData); + await TriggerHandlersAsync(eventType, eventData, errorContext => + { + var retryAttempt = 0; + if (message.Headers.TryGetLastBytes(EventErrorHandlerBase.RetryAttemptKey, out var retryAttemptBytes)) + { + retryAttempt = Serializer.Deserialize(retryAttemptBytes); + } + + errorContext.EventData = Serializer.Deserialize(message.Value, eventType); + errorContext.SetProperty(EventErrorHandlerBase.HeadersKey, message.Headers); + errorContext.SetProperty(EventErrorHandlerBase.RetryAttemptKey, retryAttempt); + }); } public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class @@ -147,20 +166,51 @@ namespace Volo.Abp.EventBus.Kafka } public override async Task PublishAsync(Type eventType, object eventData) + { + await PublishAsync(eventType, eventData, new Headers {{"messageId", Serializer.Serialize(Guid.NewGuid())}}, null); + } + + public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers, Dictionary headersArguments) + { + await PublishAsync(AbpKafkaEventBusOptions.TopicName, eventType, eventData, headers, headersArguments); + } + + public virtual async Task PublishToDeadLetterAsync(Type eventType, object eventData, Headers headers, Dictionary headersArguments) + { + await PublishAsync(DeadLetterTopicName, eventType, eventData, headers, headersArguments); + } + + private async Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers, Dictionary headersArguments) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); + SetEventMessageHeaders(headers, headersArguments); + await producer.ProduceAsync( - AbpKafkaEventBusOptions.TopicName, + topicName, new Message { - Key = eventName, Value = body + Key = eventName, Value = body, Headers = headers }); } + private void SetEventMessageHeaders(Headers headers, Dictionary headersArguments) + { + if (headersArguments == null) + { + return; + } + + foreach (var header in headersArguments) + { + headers.Remove(header.Key); + headers.Add(header.Key, Serializer.Serialize(header.Value)); + } + } + private List GetOrCreateHandlerFactories(Type eventType) { return HandlerFactories.GetOrAdd( diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs new file mode 100644 index 0000000000..aee21f75a9 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaEventErrorHandler.cs @@ -0,0 +1,53 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Confluent.Kafka; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Volo.Abp.Data; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.EventBus.Kafka +{ + public class KafkaEventErrorHandler : EventErrorHandlerBase, ISingletonDependency + { + protected ILogger Logger { get; set; } + + public KafkaEventErrorHandler( + IOptions options) : base(options) + { + Logger = NullLogger.Instance; + } + + protected override async Task RetryAsync(EventExecutionErrorContext context) + { + if (Options.RetryStrategyOptions.IntervalMillisecond > 0) + { + await Task.Delay(Options.RetryStrategyOptions.IntervalMillisecond); + } + + context.TryGetRetryAttempt(out var retryAttempt); + + await context.EventBus.As().PublishAsync( + context.EventType, + context.EventData, + context.GetProperty(HeadersKey).As(), + new Dictionary {{RetryAttemptKey, ++retryAttempt}}); + } + + protected override async Task MoveToDeadLetterAsync(EventExecutionErrorContext context) + { + Logger.LogException( + context.Exceptions.Count == 1 ? context.Exceptions.First() : new AggregateException(context.Exceptions), + LogLevel.Error); + + await context.EventBus.As().PublishToDeadLetterAsync( + context.EventType, + context.EventData, + context.GetProperty(HeadersKey).As(), + new Dictionary {{"exceptions", context.Exceptions.Select(x => x.ToString()).ToList()}}); + } + } +} diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index e8c7fc6c5b..541fce1334 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; +using Volo.Abp.Data; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.MultiTenancy; @@ -25,6 +26,7 @@ namespace Volo.Abp.EventBus.RabbitMq { protected AbpRabbitMqEventBusOptions AbpRabbitMqEventBusOptions { get; } protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } + protected AbpEventBusOptions AbpEventBusOptions { get; } protected IConnectionPool ConnectionPool { get; } protected IRabbitMqSerializer Serializer { get; } @@ -41,12 +43,15 @@ namespace Volo.Abp.EventBus.RabbitMq IServiceScopeFactory serviceScopeFactory, IOptions distributedEventBusOptions, IRabbitMqMessageConsumerFactory messageConsumerFactory, - ICurrentTenant currentTenant) - : base(serviceScopeFactory, currentTenant) + ICurrentTenant currentTenant, + IEventErrorHandler errorHandler, + IOptions abpEventBusOptions) + : base(serviceScopeFactory, currentTenant, errorHandler) { ConnectionPool = connectionPool; Serializer = serializer; MessageConsumerFactory = messageConsumerFactory; + AbpEventBusOptions = abpEventBusOptions.Value; AbpDistributedEventBusOptions = distributedEventBusOptions.Value; AbpRabbitMqEventBusOptions = options.Value; @@ -56,17 +61,21 @@ namespace Volo.Abp.EventBus.RabbitMq public void Initialize() { + const string suffix = "_dead_letter"; + Consumer = MessageConsumerFactory.Create( new ExchangeDeclareConfiguration( AbpRabbitMqEventBusOptions.ExchangeName, type: "direct", - durable: true + durable: true, + deadLetterExchangeName: AbpRabbitMqEventBusOptions.ExchangeName + suffix ), new QueueDeclareConfiguration( AbpRabbitMqEventBusOptions.ClientName, durable: true, exclusive: false, - autoDelete: false + autoDelete: false, + AbpEventBusOptions.DeadLetterName ?? AbpRabbitMqEventBusOptions.ClientName + suffix ), AbpRabbitMqEventBusOptions.ConnectionName ); @@ -87,7 +96,19 @@ namespace Volo.Abp.EventBus.RabbitMq var eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType); - await TriggerHandlersAsync(eventType, eventData); + await TriggerHandlersAsync(eventType, eventData, errorContext => + { + var retryAttempt = 0; + if (ea.BasicProperties.Headers != null && + ea.BasicProperties.Headers.ContainsKey(EventErrorHandlerBase.RetryAttemptKey)) + { + retryAttempt = (int)ea.BasicProperties.Headers[EventErrorHandlerBase.RetryAttemptKey]; + } + + errorContext.EventData = Serializer.Deserialize(ea.Body.ToArray(), eventType); + errorContext.SetProperty(EventErrorHandlerBase.HeadersKey, ea.BasicProperties); + errorContext.SetProperty(EventErrorHandlerBase.RetryAttemptKey, retryAttempt); + }); } public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class @@ -168,8 +189,14 @@ namespace Volo.Abp.EventBus.RabbitMq GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } - public override Task PublishAsync(Type eventType, object eventData) + public override async Task PublishAsync(Type eventType, object eventData) + { + await PublishAsync(eventType, eventData, null); + } + + public Task PublishAsync(Type eventType, object eventData, IBasicProperties properties, Dictionary headersArguments = null) { + var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); @@ -181,11 +208,17 @@ namespace Volo.Abp.EventBus.RabbitMq durable: true ); - var properties = channel.CreateBasicProperties(); - properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; + if (properties == null) + { + properties = channel.CreateBasicProperties(); + properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; + properties.MessageId = Guid.NewGuid().ToString("N"); + } + + SetEventMessageHeaders(properties, headersArguments); channel.BasicPublish( - exchange: AbpRabbitMqEventBusOptions.ExchangeName, + exchange: AbpRabbitMqEventBusOptions.ExchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, @@ -196,6 +229,21 @@ namespace Volo.Abp.EventBus.RabbitMq return Task.CompletedTask; } + private void SetEventMessageHeaders(IBasicProperties properties, Dictionary headersArguments) + { + if (headersArguments == null) + { + return; + } + + properties.Headers ??= new Dictionary(); + + foreach (var header in headersArguments) + { + properties.Headers[header.Key] = header.Value; + } + } + private List GetOrCreateHandlerFactories(Type eventType) { return HandlerFactories.GetOrAdd( @@ -213,9 +261,11 @@ namespace Volo.Abp.EventBus.RabbitMq { var handlerFactoryList = new List(); - foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) + foreach (var handlerFactory in + HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) { - handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + handlerFactoryList.Add( + new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); } return handlerFactoryList.ToArray(); diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqEventErrorHandler.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqEventErrorHandler.cs new file mode 100644 index 0000000000..e8848bee72 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqEventErrorHandler.cs @@ -0,0 +1,47 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Options; +using RabbitMQ.Client; +using Volo.Abp.Data; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.EventBus.RabbitMq +{ + public class RabbitMqEventErrorHandler : EventErrorHandlerBase, ISingletonDependency + { + public RabbitMqEventErrorHandler( + IOptions options) + : base(options) + { + } + + protected override async Task RetryAsync(EventExecutionErrorContext context) + { + if (Options.RetryStrategyOptions.IntervalMillisecond > 0) + { + await Task.Delay(Options.RetryStrategyOptions.IntervalMillisecond); + } + + context.TryGetRetryAttempt(out var retryAttempt); + + await context.EventBus.As().PublishAsync( + context.EventType, + context.EventData, + context.GetProperty(HeadersKey).As(), + new Dictionary + { + {RetryAttemptKey, ++retryAttempt}, + {"exceptions", context.Exceptions.Select(x => x.ToString()).ToList()} + }); + } + + protected override Task MoveToDeadLetterAsync(EventExecutionErrorContext context) + { + ThrowOriginalExceptions(context); + + return Task.CompletedTask; + } + } +} diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs index 9ebd91a6a5..a13f964d3f 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.DependencyInjection; using Rebus.Handlers; +using Rebus.Retry.Simple; using Rebus.ServiceProvider; using Volo.Abp.Modularity; @@ -11,23 +12,29 @@ namespace Volo.Abp.EventBus.Rebus { public override void ConfigureServices(ServiceConfigurationContext context) { - var options = context.Services.ExecutePreConfiguredActions(); + var abpEventBusOptions = context.Services.ExecutePreConfiguredActions(); + var options = context.Services.ExecutePreConfiguredActions();; context.Services.AddTransient(typeof(IHandleMessages<>), typeof(RebusDistributedEventHandlerAdapter<>)); Configure(rebusOptions => { - rebusOptions.Configurer = options.Configurer; - rebusOptions.Publish = options.Publish; - rebusOptions.InputQueueName = options.InputQueueName; + context.Services.ExecutePreConfiguredActions(rebusOptions); }); - context.Services.AddRebus(configurer => + context.Services.AddRebus(configure => { - options.Configurer?.Invoke(configurer); - return configurer; - }); + if (abpEventBusOptions.RetryStrategyOptions != null) + { + configure.Options(b => + b.SimpleRetryStrategy( + errorQueueAddress: abpEventBusOptions.DeadLetterName ?? options.InputQueueName + "_dead_letter", + maxDeliveryAttempts: abpEventBusOptions.RetryStrategyOptions.MaxRetryAttempts)); + } + options.Configurer?.Invoke(configure); + return configure; + }); } public override void OnApplicationInitialization(ApplicationInitializationContext context) diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventBusOptions.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventBusOptions.cs index 8aaee7bd63..b6343204c0 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventBusOptions.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventBusOptions.cs @@ -32,7 +32,7 @@ namespace Volo.Abp.EventBus.Rebus public AbpRebusEventBusOptions() { _publish = DefaultPublish; - _configurer = DefaultConfigurer; + _configurer = DefaultConfigure; } private async Task DefaultPublish(IBus bus, Type eventType, object eventData) @@ -40,10 +40,10 @@ namespace Volo.Abp.EventBus.Rebus await bus.Advanced.Routing.Send(InputQueueName, eventData); } - private void DefaultConfigurer(RebusConfigurer configurer) + private void DefaultConfigure(RebusConfigurer configure) { - configurer.Subscriptions(s => s.StoreInMemory()); - configurer.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), InputQueueName)); + configure.Subscriptions(s => s.StoreInMemory()); + configure.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), InputQueueName)); } } } diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index ce3549646e..97bedaf8fb 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -30,8 +30,9 @@ namespace Volo.Abp.EventBus.Rebus ICurrentTenant currentTenant, IBus rebus, IOptions abpDistributedEventBusOptions, - IOptions abpEventBusRebusOptions) : - base(serviceScopeFactory, currentTenant) + IOptions abpEventBusRebusOptions, + IEventErrorHandler errorHandler) : + base(serviceScopeFactory, currentTenant, errorHandler) { Rebus = rebus; AbpRebusEventBusOptions = abpEventBusRebusOptions.Value; diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusEventErrorHandler.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusEventErrorHandler.cs new file mode 100644 index 0000000000..8fe6a53dbb --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusEventErrorHandler.cs @@ -0,0 +1,32 @@ +using System.Threading.Tasks; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.EventBus.Rebus +{ + /// + /// Rebus will automatic retries and error handling: https://github.com/rebus-org/Rebus/wiki/Automatic-retries-and-error-handling + /// + public class RebusEventErrorHandler : EventErrorHandlerBase, ISingletonDependency + { + public RebusEventErrorHandler( + IOptions options) + : base(options) + { + } + + protected override Task RetryAsync(EventExecutionErrorContext context) + { + ThrowOriginalExceptions(context); + + return Task.CompletedTask; + } + + protected override Task MoveToDeadLetterAsync(EventExecutionErrorContext context) + { + ThrowOriginalExceptions(context); + + return Task.CompletedTask; + } + } +} diff --git a/framework/src/Volo.Abp.EventBus/Volo.Abp.EventBus.csproj b/framework/src/Volo.Abp.EventBus/Volo.Abp.EventBus.csproj index 221454bed3..261b10d8ae 100644 --- a/framework/src/Volo.Abp.EventBus/Volo.Abp.EventBus.csproj +++ b/framework/src/Volo.Abp.EventBus/Volo.Abp.EventBus.csproj @@ -16,6 +16,7 @@ + diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusModule.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusModule.cs index eb9c17b129..329c26c93f 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusModule.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusModule.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using Volo.Abp.EventBus.Abstractions; using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Local; +using Volo.Abp.Json; using Volo.Abp.Modularity; using Volo.Abp.MultiTenancy; using Volo.Abp.Reflection; @@ -12,7 +13,8 @@ namespace Volo.Abp.EventBus { [DependsOn( typeof(AbpEventBusAbstractionsModule), - typeof(AbpMultiTenancyModule))] + typeof(AbpMultiTenancyModule), + typeof(AbpJsonModule))] public class AbpEventBusModule : AbpModule { public override void PreConfigureServices(ServiceConfigurationContext context) @@ -20,6 +22,14 @@ namespace Volo.Abp.EventBus AddEventHandlers(context.Services); } + public override void ConfigureServices(ServiceConfigurationContext context) + { + Configure(options => + { + context.Services.ExecutePreConfiguredActions(options); + }); + } + private static void AddEventHandlers(IServiceCollection services) { var localHandlers = new List(); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusOptions.cs new file mode 100644 index 0000000000..39631c7e18 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusOptions.cs @@ -0,0 +1,22 @@ +using System; + +namespace Volo.Abp.EventBus +{ + public class AbpEventBusOptions + { + public bool EnabledErrorHandle { get; set; } + + public Func ErrorHandleSelector { get; set; } + + public string DeadLetterName { get; set; } + + public AbpEventBusRetryStrategyOptions RetryStrategyOptions { get; set; } + + public void UseRetryStrategy(Action action = null) + { + EnabledErrorHandle = true; + RetryStrategyOptions = new AbpEventBusRetryStrategyOptions(); + action?.Invoke(RetryStrategyOptions); + } + } +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusRetryStrategyOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusRetryStrategyOptions.cs new file mode 100644 index 0000000000..4b5b722e96 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusRetryStrategyOptions.cs @@ -0,0 +1,9 @@ +namespace Volo.Abp.EventBus +{ + public class AbpEventBusRetryStrategyOptions + { + public int IntervalMillisecond { get; set; } = 3000; + + public int MaxRetryAttempts { get; set; } = 3; + } +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs index 80d4134db2..78d226538e 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs @@ -19,10 +19,16 @@ namespace Volo.Abp.EventBus protected ICurrentTenant CurrentTenant { get; } - protected EventBusBase(IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant) + protected IEventErrorHandler ErrorHandler { get; } + + protected EventBusBase( + IServiceScopeFactory serviceScopeFactory, + ICurrentTenant currentTenant, + IEventErrorHandler errorHandler) { ServiceScopeFactory = serviceScopeFactory; CurrentTenant = currentTenant; + ErrorHandler = errorHandler; } /// @@ -89,7 +95,7 @@ namespace Volo.Abp.EventBus /// public abstract Task PublishAsync(Type eventType, object eventData); - public virtual async Task TriggerHandlersAsync(Type eventType, object eventData) + public virtual async Task TriggerHandlersAsync(Type eventType, object eventData, Action onErrorAction = null) { var exceptions = new List(); @@ -97,16 +103,13 @@ namespace Volo.Abp.EventBus if (exceptions.Any()) { - if (exceptions.Count == 1) - { - exceptions[0].ReThrow(); - } - - throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions); + var context = new EventExecutionErrorContext(exceptions, eventType, this); + onErrorAction?.Invoke(context); + await ErrorHandler.HandleAsync(context); } } - protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData, List exceptions) + protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData , List exceptions) { await new SynchronizationContextRemover(); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventErrorHandlerBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventErrorHandlerBase.cs new file mode 100644 index 0000000000..ba4527fc70 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventErrorHandlerBase.cs @@ -0,0 +1,76 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Options; + +namespace Volo.Abp.EventBus +{ + public abstract class EventErrorHandlerBase : IEventErrorHandler + { + public const string HeadersKey = "headers"; + public const string RetryAttemptKey = "retryAttempt"; + + protected AbpEventBusOptions Options { get; } + + protected EventErrorHandlerBase(IOptions options) + { + Options = options.Value; + } + + public virtual async Task HandleAsync(EventExecutionErrorContext context) + { + if (!await ShouldHandleAsync(context)) + { + ThrowOriginalExceptions(context); + } + + if (await ShouldRetryAsync(context)) + { + await RetryAsync(context); + return; + } + + await MoveToDeadLetterAsync(context); + } + + protected abstract Task RetryAsync(EventExecutionErrorContext context); + + protected abstract Task MoveToDeadLetterAsync(EventExecutionErrorContext context); + + protected virtual Task ShouldHandleAsync(EventExecutionErrorContext context) + { + if (!Options.EnabledErrorHandle) + { + return Task.FromResult(false); + } + + return Task.FromResult(Options.ErrorHandleSelector == null || Options.ErrorHandleSelector.Invoke(context.EventType)); + } + + protected virtual Task ShouldRetryAsync(EventExecutionErrorContext context) + { + if (Options.RetryStrategyOptions == null) + { + return Task.FromResult(false); + } + + if (!context.TryGetRetryAttempt(out var retryAttempt)) + { + return Task.FromResult(false); + } + + return Task.FromResult(Options.RetryStrategyOptions.MaxRetryAttempts > retryAttempt); + } + + protected virtual void ThrowOriginalExceptions(EventExecutionErrorContext context) + { + if (context.Exceptions.Count == 1) + { + context.Exceptions[0].ReThrow(); + } + + throw new AggregateException( + "More than one error has occurred while triggering the event: " + context.EventType, + context.Exceptions); + } + } +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventExecutionErrorContext.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventExecutionErrorContext.cs new file mode 100644 index 0000000000..e192f61cbe --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventExecutionErrorContext.cs @@ -0,0 +1,38 @@ +using System; +using System.Collections.Generic; +using Volo.Abp.Data; +using Volo.Abp.ObjectExtending; + +namespace Volo.Abp.EventBus +{ + public class EventExecutionErrorContext : ExtensibleObject + { + public IReadOnlyList Exceptions { get; } + + public object EventData { get; set; } + + public Type EventType { get; } + + public IEventBus EventBus { get; } + + public EventExecutionErrorContext(List exceptions, Type eventType, IEventBus eventBus) + { + Exceptions = exceptions; + EventType = eventType; + EventBus = eventBus; + } + + public bool TryGetRetryAttempt(out int retryAttempt) + { + retryAttempt = 0; + if (!this.HasProperty(EventErrorHandlerBase.RetryAttemptKey)) + { + return false; + } + + retryAttempt = this.GetProperty(EventErrorHandlerBase.RetryAttemptKey); + return true; + + } + } +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IEventErrorHandler.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IEventErrorHandler.cs new file mode 100644 index 0000000000..f1b4a40f15 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IEventErrorHandler.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace Volo.Abp.EventBus +{ + public interface IEventErrorHandler + { + Task HandleAsync(EventExecutionErrorContext context); + } +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs index 8c7bef6f2d..16246a892f 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs @@ -7,9 +7,11 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.Data; using Volo.Abp.DependencyInjection; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; +using Volo.Abp.Json; namespace Volo.Abp.EventBus.Local { @@ -28,12 +30,17 @@ namespace Volo.Abp.EventBus.Local protected ConcurrentDictionary> HandlerFactories { get; } + protected IJsonSerializer Serializer { get; } + public LocalEventBus( IOptions options, IServiceScopeFactory serviceScopeFactory, - ICurrentTenant currentTenant) - : base(serviceScopeFactory, currentTenant) + ICurrentTenant currentTenant, + IEventErrorHandler errorHandler, + IJsonSerializer serializer) + : base(serviceScopeFactory, currentTenant, errorHandler) { + Serializer = serializer; Options = options.Value; Logger = NullLogger.Instance; @@ -119,19 +126,17 @@ namespace Volo.Abp.EventBus.Local public override async Task PublishAsync(Type eventType, object eventData) { - var exceptions = new List(); - - await TriggerHandlersAsync(eventType, eventData, exceptions); + await PublishAsync(new LocalEventMessage(Guid.NewGuid(), eventData, eventType)); + } - if (exceptions.Any()) + public virtual async Task PublishAsync(LocalEventMessage localEventMessage) + { + var rawEventData = Serializer.Serialize(localEventMessage.EventData); + await TriggerHandlersAsync(localEventMessage.EventType, localEventMessage.EventData, errorContext => { - if (exceptions.Count == 1) - { - exceptions[0].ReThrow(); - } - - throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions); - } + errorContext.EventData = Serializer.Deserialize(localEventMessage.EventType, rawEventData); + errorContext.SetProperty(nameof(LocalEventMessage.MessageId), localEventMessage.MessageId); + }); } protected override IEnumerable GetHandlerFactories(Type eventType) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventErrorHandler.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventErrorHandler.cs new file mode 100644 index 0000000000..c08bca019f --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventErrorHandler.cs @@ -0,0 +1,60 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Extensions.Options; +using Volo.Abp.Data; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.EventBus.Local +{ + [ExposeServices(typeof(LocalEventErrorHandler), typeof(IEventErrorHandler))] + public class LocalEventErrorHandler : EventErrorHandlerBase, ISingletonDependency + { + protected Dictionary RetryTracking { get; } + + public LocalEventErrorHandler( + IOptions options) + : base(options) + { + RetryTracking = new Dictionary(); + } + + protected override async Task RetryAsync(EventExecutionErrorContext context) + { + if (Options.RetryStrategyOptions.IntervalMillisecond > 0) + { + await Task.Delay(Options.RetryStrategyOptions.IntervalMillisecond); + } + + var messageId = context.GetProperty(nameof(LocalEventMessage.MessageId)); + + context.TryGetRetryAttempt(out var retryAttempt); + RetryTracking[messageId] = ++retryAttempt; + + await context.EventBus.As().PublishAsync(new LocalEventMessage(messageId, context.EventData, context.EventType)); + + RetryTracking.Remove(messageId); + } + + protected override Task MoveToDeadLetterAsync(EventExecutionErrorContext context) + { + ThrowOriginalExceptions(context); + + return Task.CompletedTask; + } + + protected override async Task ShouldRetryAsync(EventExecutionErrorContext context) + { + var messageId = context.GetProperty(nameof(LocalEventMessage.MessageId)); + context.SetProperty(RetryAttemptKey, RetryTracking.GetOrDefault(messageId)); + + if (await base.ShouldRetryAsync(context)) + { + return true; + } + + RetryTracking.Remove(messageId); + return false; + } + } +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventMessage.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventMessage.cs new file mode 100644 index 0000000000..550de897bd --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventMessage.cs @@ -0,0 +1,20 @@ +using System; + +namespace Volo.Abp.EventBus.Local +{ + public class LocalEventMessage + { + public Guid MessageId { get; } + + public object EventData { get; } + + public Type EventType { get; } + + public LocalEventMessage(Guid messageId, object eventData, Type eventType) + { + MessageId = messageId; + EventData = eventData; + EventType = eventType; + } + } +} diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs index 26d15ce818..f6679aec3c 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs @@ -14,8 +14,6 @@ namespace Volo.Abp.Kafka public Action ConfigureTopic { get; set; } - public bool ReQueue { get; set; } = true; - public AbpKafkaOptions() { Connections = new KafkaConnections(); diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs index 2b01b5a935..96ec753dc2 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs @@ -8,11 +8,13 @@ /// not disposed until end of the application. /// /// + /// /// /// /// IKafkaMessageConsumer Create( string topicName, + string deadLetterTopicName, string groupId, string connectionName = null); } diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs index 58e718831c..a283eb7e50 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs @@ -7,5 +7,7 @@ namespace Volo.Abp.Kafka byte[] Serialize(object obj); object Deserialize(byte[] value, Type type); + + T Deserialize(byte[] value); } } diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs index 3b8f022012..d1bad00533 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Confluent.Kafka; @@ -26,6 +27,8 @@ namespace Volo.Abp.Kafka protected AbpKafkaOptions Options { get; } + protected AbpAsyncTimer Timer { get; } + protected ConcurrentBag, Task>> Callbacks { get; } protected IConsumer Consumer { get; private set; } @@ -36,34 +39,43 @@ namespace Volo.Abp.Kafka protected string TopicName { get; private set; } + protected string DeadLetterTopicName { get; private set; } + public KafkaMessageConsumer( IConsumerPool consumerPool, IExceptionNotifier exceptionNotifier, IOptions options, - IProducerPool producerPool) + IProducerPool producerPool, + AbpAsyncTimer timer) { ConsumerPool = consumerPool; ExceptionNotifier = exceptionNotifier; ProducerPool = producerPool; + Timer = timer; Options = options.Value; Logger = NullLogger.Instance; Callbacks = new ConcurrentBag, Task>>(); + + Timer.Period = 5000; //5 sec. + Timer.Elapsed = Timer_Elapsed; + Timer.RunOnStart = true; } public virtual void Initialize( [NotNull] string topicName, + [NotNull] string deadLetterTopicName, [NotNull] string groupId, string connectionName = null) { Check.NotNull(topicName, nameof(topicName)); + Check.NotNull(deadLetterTopicName, nameof(deadLetterTopicName)); Check.NotNull(groupId, nameof(groupId)); TopicName = topicName; + DeadLetterTopicName = deadLetterTopicName; ConnectionName = connectionName ?? KafkaConnections.DefaultConnectionName; GroupId = groupId; - - AsyncHelper.RunSync(CreateTopicAsync); - Consume(); + Timer.Start(); } public virtual void OnMessageReceived(Func, Task> callback) @@ -71,26 +83,45 @@ namespace Volo.Abp.Kafka Callbacks.Add(callback); } + protected virtual async Task Timer_Elapsed(AbpAsyncTimer timer) + { + await CreateTopicAsync(); + Consume(); + Timer.Stop(); + } + protected virtual async Task CreateTopicAsync() { using (var adminClient = new AdminClientBuilder(Options.Connections.GetOrDefault(ConnectionName)).Build()) { - var topic = new TopicSpecification + var topics = new List { - Name = TopicName, - NumPartitions = 1, - ReplicationFactor = 1 + new() + { + Name = TopicName, + NumPartitions = 1, + ReplicationFactor = 1 + }, + new() + { + Name = DeadLetterTopicName, + NumPartitions = 1, + ReplicationFactor = 1 + } }; - Options.ConfigureTopic?.Invoke(topic); + topics.ForEach(topic => + { + Options.ConfigureTopic?.Invoke(topic); + }); try { - await adminClient.CreateTopicsAsync(new[] {topic}); + await adminClient.CreateTopicsAsync(topics); } catch (CreateTopicsException e) { - if(e.Results.First().Error.Code != ErrorCode.TopicAlreadyExists) + if(e.Results.Any(x => x.Error.Code != ErrorCode.TopicAlreadyExists)) { throw; } @@ -139,8 +170,6 @@ namespace Volo.Abp.Kafka } catch (Exception ex) { - await RequeueAsync(consumeResult); - Logger.LogException(ex); await ExceptionNotifier.NotifyAsync(ex); } @@ -150,17 +179,6 @@ namespace Volo.Abp.Kafka } } - protected virtual async Task RequeueAsync(ConsumeResult consumeResult) - { - if (!Options.ReQueue) - { - return; - } - - var producer = ProducerPool.Get(ConnectionName); - await producer.ProduceAsync(consumeResult.Topic, consumeResult.Message); - } - public virtual void Dispose() { if (Consumer == null) diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs index fb08aa4144..4a22fd04f6 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs @@ -16,11 +16,12 @@ namespace Volo.Abp.Kafka public IKafkaMessageConsumer Create( string topicName, + string deadLetterTopicName, string groupId, string connectionName = null) { var consumer = ServiceScope.ServiceProvider.GetRequiredService(); - consumer.Initialize(topicName, groupId, connectionName); + consumer.Initialize(topicName, deadLetterTopicName, groupId, connectionName); return consumer; } diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs index a04125f8a6..a8a199c140 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs @@ -23,5 +23,10 @@ namespace Volo.Abp.Kafka { return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value)); } + + public T Deserialize(byte[] value) + { + return _jsonSerializer.Deserialize(Encoding.UTF8.GetString(value)); + } } } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs index 67f234aec4..b9e762abbe 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs @@ -6,6 +6,8 @@ namespace Volo.Abp.RabbitMQ { public string ExchangeName { get; } + public string DeadLetterExchangeName { get; set; } + public string Type { get; } public bool Durable { get; set; } @@ -15,16 +17,18 @@ namespace Volo.Abp.RabbitMQ public IDictionary Arguments { get; } public ExchangeDeclareConfiguration( - string exchangeName, - string type, - bool durable = false, - bool autoDelete = false) + string exchangeName, + string type, + bool durable = false, + bool autoDelete = false, + string deadLetterExchangeName = null) { ExchangeName = exchangeName; + DeadLetterExchangeName = deadLetterExchangeName; Type = type; Durable = durable; AutoDelete = autoDelete; Arguments = new Dictionary(); } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IRabbitMqSerializer.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IRabbitMqSerializer.cs index 771d1a05a6..2de5cd2191 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IRabbitMqSerializer.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IRabbitMqSerializer.cs @@ -7,5 +7,7 @@ namespace Volo.Abp.RabbitMQ byte[] Serialize(object obj); object Deserialize(byte[] value, Type type); + + T Deserialize(byte[] value); } } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs index 8cc07b7bb9..b84f08ec42 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs @@ -6,8 +6,9 @@ namespace Volo.Abp.RabbitMQ { public class QueueDeclareConfiguration { - [NotNull] - public string QueueName { get; } + [NotNull] public string QueueName { get; } + + public string DeadLetterQueueName { get; set; } public bool Durable { get; set; } @@ -18,12 +19,14 @@ namespace Volo.Abp.RabbitMQ public IDictionary Arguments { get; } public QueueDeclareConfiguration( - [NotNull] string queueName, - bool durable = true, - bool exclusive = false, - bool autoDelete = false) + [NotNull] string queueName, + bool durable = true, + bool exclusive = false, + bool autoDelete = false, + string deadLetterQueueName = null) { QueueName = queueName; + DeadLetterQueueName = deadLetterQueueName; Durable = durable; Exclusive = exclusive; AutoDelete = autoDelete; @@ -41,4 +44,4 @@ namespace Volo.Abp.RabbitMQ ); } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs index 894f68d097..671445e00d 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs @@ -6,6 +6,7 @@ using RabbitMQ.Client.Events; using System; using System.Collections.Concurrent; using System.Threading.Tasks; +using RabbitMQ.Client.Exceptions; using Volo.Abp.DependencyInjection; using Volo.Abp.ExceptionHandling; using Volo.Abp.Threading; @@ -147,6 +148,7 @@ namespace Volo.Abp.RabbitMQ Channel = ConnectionPool .Get(ConnectionName) .CreateModel(); + Channel.ExchangeDeclare( exchange: Exchange.ExchangeName, type: Exchange.Type, @@ -155,7 +157,29 @@ namespace Volo.Abp.RabbitMQ arguments: Exchange.Arguments ); - Channel.QueueDeclare( + if (!Exchange.DeadLetterExchangeName.IsNullOrWhiteSpace() && + !Queue.DeadLetterQueueName.IsNullOrWhiteSpace()) + { + Channel.ExchangeDeclare( + Exchange.DeadLetterExchangeName, + Exchange.Type, + Exchange.Durable, + Exchange.AutoDelete + ); + + Channel.QueueDeclare( + Queue.DeadLetterQueueName, + Queue.Durable, + Queue.Exclusive, + Queue.AutoDelete); + + Queue.Arguments["x-dead-letter-exchange"] = Exchange.DeadLetterExchangeName; + Queue.Arguments["x-dead-letter-routing-key"] = Queue.DeadLetterQueueName; + + Channel.QueueBind(Queue.DeadLetterQueueName, Exchange.DeadLetterExchangeName, Queue.DeadLetterQueueName); + } + + var result = Channel.QueueDeclare( queue: Queue.QueueName, durable: Queue.Durable, exclusive: Queue.Exclusive, @@ -174,6 +198,17 @@ namespace Volo.Abp.RabbitMQ } catch (Exception ex) { + if (ex is OperationInterruptedException operationInterruptedException && + operationInterruptedException.ShutdownReason.ReplyCode == 406 && + operationInterruptedException.Message.Contains("arg 'x-dead-letter-exchange'")) + { + Exchange.DeadLetterExchangeName = null; + Queue.DeadLetterQueueName = null; + Queue.Arguments.Remove("x-dead-letter-exchange"); + Queue.Arguments.Remove("x-dead-letter-routing-key"); + Logger.LogWarning("Unable to bind the dead letter queue to an existing queue. You can delete the queue or add policy. See: https://www.rabbitmq.com/parameters.html"); + } + Logger.LogException(ex, LogLevel.Warning); await ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning); } @@ -194,14 +229,10 @@ namespace Volo.Abp.RabbitMQ { try { - Channel.BasicNack( - basicDeliverEventArgs.DeliveryTag, - multiple: false, - requeue: true - ); + Channel.BasicReject(basicDeliverEventArgs.DeliveryTag, false); } catch { } - + Logger.LogException(ex); await ExceptionNotifier.NotifyAsync(ex); } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/Utf8JsonRabbitMqSerializer.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/Utf8JsonRabbitMqSerializer.cs index c179193fdd..cc815f686a 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/Utf8JsonRabbitMqSerializer.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/Utf8JsonRabbitMqSerializer.cs @@ -23,5 +23,10 @@ namespace Volo.Abp.RabbitMQ { return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value)); } + + public T Deserialize(byte[] value) + { + return _jsonSerializer.Deserialize(Encoding.UTF8.GetString(value)); + } } -} \ No newline at end of file +} diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/EventBusTestModule.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/EventBusTestModule.cs index f514f37018..f260fecbea 100644 --- a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/EventBusTestModule.cs +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/EventBusTestModule.cs @@ -5,6 +5,17 @@ namespace Volo.Abp.EventBus [DependsOn(typeof(AbpEventBusModule))] public class EventBusTestModule : AbpModule { + public override void PreConfigureServices(ServiceConfigurationContext context) + { + PreConfigure(options => + { + options.UseRetryStrategy(retryStrategyOptions => + { + retryStrategyOptions.IntervalMillisecond = 0; + }); + options.ErrorHandleSelector = type => type == typeof(MyExceptionHandleEventData); + }); + } } -} \ No newline at end of file +} diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/EventBus_Exception_Handler_Tests.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/EventBus_Exception_Handler_Tests.cs new file mode 100644 index 0000000000..d6b61ee863 --- /dev/null +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/EventBus_Exception_Handler_Tests.cs @@ -0,0 +1,75 @@ +using System; +using System.Threading.Tasks; +using Shouldly; +using Xunit; + +namespace Volo.Abp.EventBus.Local +{ + public class EventBus_Exception_Handler_Tests : EventBusTestBase + { + [Fact] + public async Task Should_Not_Handle_Exception() + { + var retryAttempt = 0; + LocalEventBus.Subscribe(eventData => + { + retryAttempt++; + throw new Exception("This exception is intentionally thrown!"); + }); + + var appException = await Assert.ThrowsAsync(async () => + { + await LocalEventBus.PublishAsync(new MySimpleEventData(1)); + }); + + retryAttempt.ShouldBe(1); + appException.Message.ShouldBe("This exception is intentionally thrown!"); + } + + [Fact] + public async Task Should_Handle_Exception() + { + var retryAttempt = 0; + LocalEventBus.Subscribe(eventData => + { + eventData.Value.ShouldBe(0); + + retryAttempt++; + eventData.Value++; + if (retryAttempt < 2) + { + throw new Exception("This exception is intentionally thrown!"); + } + + return Task.CompletedTask; + + }); + + await LocalEventBus.PublishAsync(new MyExceptionHandleEventData(0)); + retryAttempt.ShouldBe(2); + } + + [Fact] + public async Task Should_Throw_Exception_After_Error_Handle() + { + var retryAttempt = 0; + LocalEventBus.Subscribe(eventData => + { + eventData.Value.ShouldBe(0); + + retryAttempt++; + eventData.Value++; + + throw new Exception("This exception is intentionally thrown!"); + }); + + var appException = await Assert.ThrowsAsync(async () => + { + await LocalEventBus.PublishAsync(new MyExceptionHandleEventData(0)); + }); + + retryAttempt.ShouldBe(4); + appException.Message.ShouldBe("This exception is intentionally thrown!"); + } + } +} diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/MyExceptionHandleEventData.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/MyExceptionHandleEventData.cs new file mode 100644 index 0000000000..f490d58211 --- /dev/null +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/MyExceptionHandleEventData.cs @@ -0,0 +1,12 @@ +namespace Volo.Abp.EventBus +{ + public class MyExceptionHandleEventData + { + public int Value { get; set; } + + public MyExceptionHandleEventData(int value) + { + Value = value; + } + } +}