pull/8829/head
liangshiwei 5 years ago
parent fbedc63bc3
commit 1ecc16d161

@ -9,7 +9,6 @@ using Microsoft.Extensions.Options;
using Volo.Abp.Data;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Local;
using Volo.Abp.Kafka;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
@ -49,7 +48,8 @@ namespace Volo.Abp.EventBus.Kafka
MessageConsumerFactory = messageConsumerFactory;
Serializer = serializer;
ProducerPool = producerPool;
DeadLetterTopicName = AbpEventBusOptions.DeadLetterName ?? AbpKafkaEventBusOptions.TopicName + "_dead_letter";
DeadLetterTopicName =
AbpEventBusOptions.DeadLetterName ?? AbpKafkaEventBusOptions.TopicName + "_dead_letter";
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
@ -59,6 +59,7 @@ namespace Volo.Abp.EventBus.Kafka
{
Consumer = MessageConsumerFactory.Create(
AbpKafkaEventBusOptions.TopicName,
DeadLetterTopicName,
AbpKafkaEventBusOptions.GroupId,
AbpKafkaEventBusOptions.ConnectionName);
Consumer.OnMessageReceived(ProcessEventAsync);
@ -75,7 +76,7 @@ namespace Volo.Abp.EventBus.Kafka
return;
}
var eventMessage = Serializer.Deserialize<LocalEventMessage>(message.Value);
var eventMessage = Serializer.Deserialize(message.Value, eventType);
await TriggerHandlersAsync(eventType, eventMessage,
context => { context.SetProperty(KafkaEventErrorHandler.HeadersKey, message.Headers); });
@ -156,7 +157,7 @@ namespace Volo.Abp.EventBus.Kafka
public override async Task PublishAsync(Type eventType, object eventData)
{
await PublishAsync(eventType, eventData, null);
await PublishAsync(eventType, eventData, new Headers {{"messageId", Serializer.Serialize(Guid.NewGuid())}});
}
public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers)

@ -12,7 +12,7 @@ namespace Volo.Abp.EventBus.Kafka
public class KafkaEventErrorHandler : EventErrorHandlerBase, ISingletonDependency
{
public const string HeadersKey = "headers";
public const string RetryIndexKey = "retryIndex";
public const string RetryAttemptKey = "retryAttempt";
protected IKafkaSerializer Serializer { get; }
@ -30,26 +30,25 @@ namespace Volo.Abp.EventBus.Kafka
await Task.Delay(Options.RetryStrategyOptions.IntervalMillisecond);
}
var headers = context.GetProperty<Headers>(HeadersKey) ?? new Headers();
var headers = context.GetProperty(HeadersKey).As<Headers>();
var index = 0;
if (headers.Any(x => x.Key == RetryIndexKey))
var retryAttempt = 0;
if (headers.Any(x => x.Key == RetryAttemptKey))
{
index = Serializer.Deserialize<int>(headers.GetLastBytes(RetryIndexKey));
retryAttempt = Serializer.Deserialize<int>(headers.GetLastBytes(RetryAttemptKey));
}
headers.Remove(RetryIndexKey);
headers.Add(RetryIndexKey, Serializer.Serialize(++index));
headers.Remove(RetryAttemptKey);
headers.Add(RetryAttemptKey, Serializer.Serialize(++retryAttempt));
await context.EventBus.As<KafkaDistributedEventBus>().PublishAsync(context.EventType, context.EventData, headers);
}
protected override async Task MoveToDeadLetter(EventExecutionErrorContext context)
{
await context.EventBus.As<KafkaDistributedEventBus>().PublishToDeadLetterAsync(context.EventType, context.EventData, new Headers
{
{"exceptions", Serializer.Serialize(context.Exceptions)}
});
var headers = context.GetProperty(HeadersKey).As<Headers>();
headers.Add("exceptions", Serializer.Serialize(context.Exceptions.Select(x => x.ToString()).ToList()));
await context.EventBus.As<KafkaDistributedEventBus>().PublishToDeadLetterAsync(context.EventType, context.EventData, headers);
}
protected override bool ShouldRetry(EventExecutionErrorContext context)
@ -59,16 +58,16 @@ namespace Volo.Abp.EventBus.Kafka
return false;
}
var headers = context.GetProperty<Headers>(HeadersKey);
var headers = context.GetProperty(HeadersKey).As<Headers>();
if (headers == null)
if (headers.All(x => x.Key != RetryAttemptKey))
{
return true;
}
var index = Serializer.Deserialize<int>(headers.GetLastBytes(RetryIndexKey));
var retryAttempt = Serializer.Deserialize<int>(headers.GetLastBytes(RetryAttemptKey));
return Options.RetryStrategyOptions.MaxRetryAttempts > index;
return Options.RetryStrategyOptions.MaxRetryAttempts > retryAttempt;
}
}
}

@ -12,7 +12,7 @@ namespace Volo.Abp.EventBus.RabbitMq
public class RabbitMqEventErrorHandler : EventErrorHandlerBase, ISingletonDependency
{
public const string HeadersKey = "headers";
public const string RetryIndexKey = "retryIndex";
public const string RetryAttemptKey = "retryAttempt";
public RabbitMqEventErrorHandler(
IOptions<AbpEventBusOptions> options)
@ -30,13 +30,13 @@ namespace Volo.Abp.EventBus.RabbitMq
var properties = context.GetProperty(HeadersKey).As<IBasicProperties>();
var headers = properties.Headers ?? new Dictionary<string, object>();
var index = 0;
if (headers.ContainsKey(RetryIndexKey))
var retryAttempt = 0;
if (headers.ContainsKey(RetryAttemptKey))
{
index = (int) headers[RetryIndexKey];
retryAttempt = (int) headers[RetryAttemptKey];
}
headers[RetryIndexKey] = ++index;
headers[RetryAttemptKey] = ++retryAttempt;
headers["exceptions"] = context.Exceptions.Select(x => x.ToString()).ToList();
properties.Headers = headers;
@ -64,14 +64,14 @@ namespace Volo.Abp.EventBus.RabbitMq
var properties = context.GetProperty(HeadersKey).As<IBasicProperties>();
if (properties.Headers == null || !properties.Headers.ContainsKey(RetryIndexKey))
if (properties.Headers == null || !properties.Headers.ContainsKey(RetryAttemptKey))
{
return true;
}
var index = (int) properties.Headers[RetryIndexKey];
var retryAttempt = (int) properties.Headers[RetryAttemptKey];
return Options.RetryStrategyOptions.MaxRetryAttempts > index;
return Options.RetryStrategyOptions.MaxRetryAttempts > retryAttempt;
}
}
}

@ -13,26 +13,27 @@ namespace Volo.Abp.EventBus.Rebus
public override void ConfigureServices(ServiceConfigurationContext context)
{
var abpEventBusOptions = context.Services.ExecutePreConfiguredActions<AbpEventBusOptions>();
var options = context.Services.ExecutePreConfiguredActions<AbpRebusEventBusOptions>();;
context.Services.AddTransient(typeof(IHandleMessages<>), typeof(RebusDistributedEventHandlerAdapter<>));
Configure<AbpRebusEventBusOptions>(rebusOptions =>
{
context.Services.ExecutePreConfiguredActions(rebusOptions);
});
context.Services.AddRebus(configure =>
context.Services.AddRebus(configure =>
{
if (abpEventBusOptions.RetryStrategyOptions != null)
{
if (abpEventBusOptions.RetryStrategyOptions != null)
{
configure.Options(b =>
b.SimpleRetryStrategy(
errorQueueAddress: abpEventBusOptions.DeadLetterName ?? rebusOptions.InputQueueName + "_dead_letter",
maxDeliveryAttempts: abpEventBusOptions.RetryStrategyOptions.MaxRetryAttempts));
}
configure.Options(b =>
b.SimpleRetryStrategy(
errorQueueAddress: abpEventBusOptions.DeadLetterName ?? options.InputQueueName + "_dead_letter",
maxDeliveryAttempts: abpEventBusOptions.RetryStrategyOptions.MaxRetryAttempts));
}
rebusOptions.Configurer?.Invoke(configure);
return configure;
});
options.Configurer?.Invoke(configure);
return configure;
});
}

@ -87,7 +87,6 @@ namespace Volo.Abp.Kafka
{
await CreateTopicAsync();
Consume();
Timer.Stop();
}

@ -21,7 +21,7 @@ namespace Volo.Abp.Kafka
string connectionName = null)
{
var consumer = ServiceScope.ServiceProvider.GetRequiredService<KafkaMessageConsumer>();
consumer.Initialize(topicName, groupId, connectionName);
consumer.Initialize(topicName, deadLetterTopicName, groupId, connectionName);
return consumer;
}

@ -6,7 +6,7 @@ namespace Volo.Abp.RabbitMQ
{
public string ExchangeName { get; }
public string DeadLetterExchangeName { get; }
public string DeadLetterExchangeName { get; set; }
public string Type { get; }

@ -8,7 +8,7 @@ namespace Volo.Abp.RabbitMQ
{
[NotNull] public string QueueName { get; }
[NotNull] public string DeadLetterQueueName { get; }
public string DeadLetterQueueName { get; set; }
public bool Durable { get; set; }
@ -20,7 +20,7 @@ namespace Volo.Abp.RabbitMQ
public QueueDeclareConfiguration(
[NotNull] string queueName,
[NotNull] string deadLetterQueueName,
string deadLetterQueueName,
bool durable = true,
bool exclusive = false,
bool autoDelete = false,

@ -6,6 +6,7 @@ using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions;
using Volo.Abp.DependencyInjection;
using Volo.Abp.ExceptionHandling;
using Volo.Abp.Threading;
@ -178,7 +179,7 @@ namespace Volo.Abp.RabbitMQ
Channel.QueueBind(Queue.DeadLetterQueueName, Exchange.DeadLetterExchangeName, Queue.DeadLetterQueueName);
}
Channel.QueueDeclare(
var result = Channel.QueueDeclare(
queue: Queue.QueueName,
durable: Queue.Durable,
exclusive: Queue.Exclusive,
@ -197,6 +198,17 @@ namespace Volo.Abp.RabbitMQ
}
catch (Exception ex)
{
if (ex is OperationInterruptedException operationInterruptedException &&
operationInterruptedException.ShutdownReason.ReplyCode == 406 &&
operationInterruptedException.Message.Contains("arg 'x-dead-letter-exchange'"))
{
Exchange.DeadLetterExchangeName = null;
Queue.DeadLetterQueueName = null;
Queue.Arguments.Remove("x-dead-letter-exchange");
Queue.Arguments.Remove("x-dead-letter-routing-key");
Logger.LogWarning("Unable to bind the dead letter queue to an existing queue. You can delete the queue or add policy. See: https://www.rabbitmq.com/parameters.html");
}
Logger.LogException(ex, LogLevel.Warning);
await ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning);
}

Loading…
Cancel
Save