pull/8829/head
liangshiwei 5 years ago
parent 3d2348134e
commit fbedc63bc3

@ -32,7 +32,7 @@ namespace Volo.Abp.EventBus.Kafka
var headers = context.GetProperty<Headers>(HeadersKey) ?? new Headers();
var index = 1;
var index = 0;
if (headers.Any(x => x.Key == RetryIndexKey))
{
index = Serializer.Deserialize<int>(headers.GetLastBytes(RetryIndexKey));

@ -7,6 +7,7 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Volo.Abp.Data;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.MultiTenancy;
@ -65,19 +66,16 @@ namespace Volo.Abp.EventBus.RabbitMq
Consumer = MessageConsumerFactory.Create(
new ExchangeDeclareConfiguration(
AbpRabbitMqEventBusOptions.ExchangeName,
AbpRabbitMqEventBusOptions.ExchangeName + suffix,
type: "direct",
durable: true
),
new QueueDeclareConfiguration(
AbpRabbitMqEventBusOptions.ClientName,
AbpEventBusOptions.DeadLetterName ?? AbpRabbitMqEventBusOptions.ClientName + suffix,
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>
{
{"x-dead-letter-exchange", AbpRabbitMqEventBusOptions.ExchangeName + suffix},
{"x-dead-letter-routing-key", AbpEventBusOptions.DeadLetterName ?? AbpRabbitMqEventBusOptions.ClientName + suffix}
}
autoDelete: false
),
AbpRabbitMqEventBusOptions.ConnectionName
);
@ -98,7 +96,10 @@ namespace Volo.Abp.EventBus.RabbitMq
var eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType);
await TriggerHandlersAsync(eventType, eventData);
await TriggerHandlersAsync(eventType, eventData, errorContext =>
{
errorContext.SetProperty("headers", ea.BasicProperties);
});
}
public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
@ -179,35 +180,12 @@ namespace Volo.Abp.EventBus.RabbitMq
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
}
public override Task PublishAsync(Type eventType, object eventData)
public override async Task PublishAsync(Type eventType, object eventData)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData);
using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
{
channel.ExchangeDeclare(
AbpRabbitMqEventBusOptions.ExchangeName,
"direct",
durable: true
);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
channel.BasicPublish(
exchange: AbpRabbitMqEventBusOptions.ExchangeName,
routingKey: eventName,
mandatory: true,
basicProperties: properties,
body: body
);
}
return Task.CompletedTask;
await PublishAsync(eventType, eventData, null);
}
public Task PublishAsync(Type eventType, object eventData, Dictionary<string, object> headers)
public Task PublishAsync(Type eventType, object eventData, IBasicProperties properties)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData);
@ -220,9 +198,12 @@ namespace Volo.Abp.EventBus.RabbitMq
durable: true
);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
properties.Headers = headers;
if (properties == null)
{
properties = channel.CreateBasicProperties();
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
properties.MessageId = Guid.NewGuid().ToString("N");
}
channel.BasicPublish(
exchange: AbpRabbitMqEventBusOptions.ExchangeName,
@ -253,9 +234,11 @@ namespace Volo.Abp.EventBus.RabbitMq
{
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
foreach (var handlerFactory in
HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
{
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
handlerFactoryList.Add(
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
return handlerFactoryList.ToArray();

@ -1,7 +1,9 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using Volo.Abp.Data;
using Volo.Abp.DependencyInjection;
@ -25,23 +27,20 @@ namespace Volo.Abp.EventBus.RabbitMq
await Task.Delay(Options.RetryStrategyOptions.IntervalMillisecond);
}
var headers = context.GetProperty<Dictionary<string, object>>(HeadersKey) ??
new Dictionary<string, object>();
var properties = context.GetProperty(HeadersKey).As<IBasicProperties>();
var headers = properties.Headers ?? new Dictionary<string, object>();
var index = 1;
var index = 0;
if (headers.ContainsKey(RetryIndexKey))
{
index = (int) headers[RetryIndexKey];
headers[RetryIndexKey] = ++index;
}
else
{
headers[RetryIndexKey] = index;
}
headers["exceptions"] = context.Exceptions;
headers[RetryIndexKey] = ++index;
headers["exceptions"] = context.Exceptions.Select(x => x.ToString()).ToList();
properties.Headers = headers;
await context.EventBus.As<RabbitMqDistributedEventBus>().PublishAsync(context.EventType, context.EventData, headers);
await context.EventBus.As<RabbitMqDistributedEventBus>().PublishAsync(context.EventType, context.EventData, properties);
}
protected override Task MoveToDeadLetter(EventExecutionErrorContext context)
@ -63,14 +62,14 @@ namespace Volo.Abp.EventBus.RabbitMq
return false;
}
var headers = context.GetProperty<Dictionary<string, object>>(HeadersKey);
var properties = context.GetProperty(HeadersKey).As<IBasicProperties>();
if (headers == null || !headers.ContainsKey(RetryIndexKey))
if (properties.Headers == null || !properties.Headers.ContainsKey(RetryIndexKey))
{
return true;
}
var index = (int) headers[RetryIndexKey];
var index = (int) properties.Headers[RetryIndexKey];
return Options.RetryStrategyOptions.MaxRetryAttempts > index;
}

@ -52,7 +52,7 @@ namespace Volo.Abp.EventBus
return Options.ErrorHandleSelector.Invoke(context.EventType);
}
return false;
return true;
}
protected virtual bool ShouldRetry(EventExecutionErrorContext context)

@ -6,6 +6,8 @@ namespace Volo.Abp.RabbitMQ
{
public string ExchangeName { get; }
public string DeadLetterExchangeName { get; }
public string Type { get; }
public bool Durable { get; set; }
@ -15,16 +17,18 @@ namespace Volo.Abp.RabbitMQ
public IDictionary<string, object> Arguments { get; }
public ExchangeDeclareConfiguration(
string exchangeName,
string type,
bool durable = false,
string exchangeName,
string deadLetterExchangeName,
string type,
bool durable = false,
bool autoDelete = false)
{
ExchangeName = exchangeName;
DeadLetterExchangeName = deadLetterExchangeName;
Type = type;
Durable = durable;
AutoDelete = autoDelete;
Arguments = new Dictionary<string, object>();
}
}
}
}

@ -8,6 +8,8 @@ namespace Volo.Abp.RabbitMQ
{
[NotNull] public string QueueName { get; }
[NotNull] public string DeadLetterQueueName { get; }
public bool Durable { get; set; }
public bool Exclusive { get; set; }
@ -18,12 +20,14 @@ namespace Volo.Abp.RabbitMQ
public QueueDeclareConfiguration(
[NotNull] string queueName,
[NotNull] string deadLetterQueueName,
bool durable = true,
bool exclusive = false,
bool autoDelete = false,
Dictionary<string, object> arguments = null)
{
QueueName = queueName;
DeadLetterQueueName = deadLetterQueueName;
Durable = durable;
Exclusive = exclusive;
AutoDelete = autoDelete;

@ -156,21 +156,26 @@ namespace Volo.Abp.RabbitMQ
arguments: Exchange.Arguments
);
if (Queue.Arguments.ContainsKey("x-dead-letter-exchange") &&
Queue.Arguments.ContainsKey("x-dead-letter-routing-key"))
if (!Exchange.DeadLetterExchangeName.IsNullOrWhiteSpace() &&
!Queue.DeadLetterQueueName.IsNullOrWhiteSpace())
{
Channel.ExchangeDeclare(
Exchange.Arguments["x-dead-letter-exchange"].ToString(),
Exchange.DeadLetterExchangeName,
Exchange.Type,
Exchange.Durable,
Exchange.AutoDelete
);
Channel.QueueDeclare(
Queue.Arguments["x-dead-letter-routing-key"].ToString(),
Queue.DeadLetterQueueName,
Queue.Durable,
Queue.Exclusive,
Queue.AutoDelete);
Queue.Arguments["x-dead-letter-exchange"] = Exchange.DeadLetterExchangeName;
Queue.Arguments["x-dead-letter-routing-key"] = Queue.DeadLetterQueueName;
Channel.QueueBind(Queue.DeadLetterQueueName, Exchange.DeadLetterExchangeName, Queue.DeadLetterQueueName);
}
Channel.QueueDeclare(

Loading…
Cancel
Save