pull/8829/head
liangshiwei 5 years ago
parent e0e659ae09
commit 6bd2597b36

@ -80,7 +80,15 @@ namespace Volo.Abp.EventBus.Kafka
await TriggerHandlersAsync(eventType, eventData, errorContext =>
{
errorContext.SetProperty(KafkaEventErrorHandler.HeadersKey, message.Headers);
var retryAttempt = 0;
if (message.Headers.TryGetLastBytes(EventErrorHandlerBase.RetryAttemptKey, out var retryAttemptBytes))
{
retryAttempt = Serializer.Deserialize<int>(retryAttemptBytes);
}
errorContext.EventData = Serializer.Deserialize(message.Value, eventType);
errorContext.SetProperty(EventErrorHandlerBase.HeadersKey, message.Headers);
errorContext.SetProperty(EventErrorHandlerBase.RetryAttemptKey, retryAttempt);
});
}
@ -159,26 +167,28 @@ namespace Volo.Abp.EventBus.Kafka
public override async Task PublishAsync(Type eventType, object eventData)
{
await PublishAsync(eventType, eventData, new Headers {{"messageId", Serializer.Serialize(Guid.NewGuid())}});
await PublishAsync(eventType, eventData, new Headers {{"messageId", Serializer.Serialize(Guid.NewGuid())}}, null);
}
public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers)
public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments)
{
await PublishAsync(AbpKafkaEventBusOptions.TopicName, eventType, eventData, headers);
await PublishAsync(AbpKafkaEventBusOptions.TopicName, eventType, eventData, headers, headersArguments);
}
public virtual async Task PublishToDeadLetterAsync(Type eventType, object eventData, Headers headers)
public virtual async Task PublishToDeadLetterAsync(Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments)
{
await PublishAsync(DeadLetterTopicName, eventType, eventData, headers);
await PublishAsync(DeadLetterTopicName, eventType, eventData, headers, headersArguments);
}
private async Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers)
private async Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData);
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
SetEventMessageHeaders(headers, headersArguments);
await producer.ProduceAsync(
topicName,
new Message<string, byte[]>
@ -187,6 +197,20 @@ namespace Volo.Abp.EventBus.Kafka
});
}
private void SetEventMessageHeaders(Headers headers, Dictionary<string, object> headersArguments)
{
if (headersArguments == null)
{
return;
}
foreach (var header in headersArguments)
{
headers.Remove(header.Key);
headers.Add(header.Key, Serializer.Serialize(header.Value));
}
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(

@ -1,26 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using Volo.Abp.Data;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Kafka;
namespace Volo.Abp.EventBus.Kafka
{
public class KafkaEventErrorHandler : EventErrorHandlerBase, ISingletonDependency
{
public const string HeadersKey = "headers";
public const string RetryAttemptKey = "retryAttempt";
protected IKafkaSerializer Serializer { get; }
public KafkaEventErrorHandler(
IOptions<AbpEventBusOptions> options,
IKafkaSerializer serializer) : base(options)
IOptions<AbpEventBusOptions> options) : base(options)
{
Serializer = serializer;
}
protected override async Task Retry(EventExecutionErrorContext context)
@ -30,44 +23,22 @@ namespace Volo.Abp.EventBus.Kafka
await Task.Delay(Options.RetryStrategyOptions.IntervalMillisecond);
}
var headers = context.GetProperty(HeadersKey).As<Headers>();
var retryAttempt = 0;
if (headers.Any(x => x.Key == RetryAttemptKey))
{
retryAttempt = Serializer.Deserialize<int>(headers.GetLastBytes(RetryAttemptKey));
}
context.TryGetRetryAttempt(out var retryAttempt);
headers.Remove(RetryAttemptKey);
headers.Add(RetryAttemptKey, Serializer.Serialize(++retryAttempt));
await context.EventBus.As<KafkaDistributedEventBus>().PublishAsync(context.EventType, context.EventData, headers);
await context.EventBus.As<KafkaDistributedEventBus>().PublishAsync(
context.EventType,
context.EventData,
context.GetProperty(HeadersKey).As<Headers>(),
new Dictionary<string, object> {{RetryAttemptKey, ++retryAttempt}});
}
protected override async Task MoveToDeadLetter(EventExecutionErrorContext context)
{
var headers = context.GetProperty(HeadersKey).As<Headers>();
headers.Add("exceptions", Serializer.Serialize(context.Exceptions.Select(x => x.ToString()).ToList()));
await context.EventBus.As<KafkaDistributedEventBus>().PublishToDeadLetterAsync(context.EventType, context.EventData, headers);
}
protected override bool ShouldRetry(EventExecutionErrorContext context)
{
if (!base.ShouldRetry(context))
{
return false;
}
var headers = context.GetProperty(HeadersKey).As<Headers>();
if (headers.All(x => x.Key != RetryAttemptKey))
{
return true;
}
var retryAttempt = Serializer.Deserialize<int>(headers.GetLastBytes(RetryAttemptKey));
return Options.RetryStrategyOptions.MaxRetryAttempts > retryAttempt;
await context.EventBus.As<KafkaDistributedEventBus>().PublishToDeadLetterAsync(
context.EventType,
context.EventData,
context.GetProperty(HeadersKey).As<Headers>(),
new Dictionary<string, object> {{"exceptions", context.Exceptions.Select(x => x.ToString()).ToList()}});
}
}
}

@ -98,7 +98,16 @@ namespace Volo.Abp.EventBus.RabbitMq
await TriggerHandlersAsync(eventType, eventData, errorContext =>
{
errorContext.SetProperty("headers", ea.BasicProperties);
var retryAttempt = 0;
if (ea.BasicProperties.Headers != null &&
ea.BasicProperties.Headers.ContainsKey(EventErrorHandlerBase.RetryAttemptKey))
{
retryAttempt = (int)ea.BasicProperties.Headers[EventErrorHandlerBase.RetryAttemptKey];
}
errorContext.EventData = Serializer.Deserialize(ea.Body.ToArray(), eventType);
errorContext.SetProperty(EventErrorHandlerBase.HeadersKey, ea.BasicProperties);
errorContext.SetProperty(EventErrorHandlerBase.RetryAttemptKey, retryAttempt);
});
}
@ -185,8 +194,9 @@ namespace Volo.Abp.EventBus.RabbitMq
await PublishAsync(eventType, eventData, null);
}
public Task PublishAsync(Type eventType, object eventData, IBasicProperties properties)
public Task PublishAsync(Type eventType, object eventData, IBasicProperties properties, Dictionary<string, object> headersArguments = null)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData);
@ -205,6 +215,8 @@ namespace Volo.Abp.EventBus.RabbitMq
properties.MessageId = Guid.NewGuid().ToString("N");
}
SetEventMessageHeaders(properties, headersArguments);
channel.BasicPublish(
exchange: AbpRabbitMqEventBusOptions.ExchangeName,
routingKey: eventName,
@ -217,6 +229,21 @@ namespace Volo.Abp.EventBus.RabbitMq
return Task.CompletedTask;
}
private void SetEventMessageHeaders(IBasicProperties properties, Dictionary<string, object> headersArguments)
{
if (headersArguments == null)
{
return;
}
properties.Headers ??= new Dictionary<string, object>();
foreach (var header in headersArguments)
{
properties.Headers[header.Key] = header.Value;
}
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(

@ -11,9 +11,6 @@ namespace Volo.Abp.EventBus.RabbitMq
{
public class RabbitMqEventErrorHandler : EventErrorHandlerBase, ISingletonDependency
{
public const string HeadersKey = "headers";
public const string RetryAttemptKey = "retryAttempt";
public RabbitMqEventErrorHandler(
IOptions<AbpEventBusOptions> options)
: base(options)
@ -27,51 +24,24 @@ namespace Volo.Abp.EventBus.RabbitMq
await Task.Delay(Options.RetryStrategyOptions.IntervalMillisecond);
}
var properties = context.GetProperty(HeadersKey).As<IBasicProperties>();
var headers = properties.Headers ?? new Dictionary<string, object>();
var retryAttempt = 0;
if (headers.ContainsKey(RetryAttemptKey))
{
retryAttempt = (int) headers[RetryAttemptKey];
}
headers[RetryAttemptKey] = ++retryAttempt;
headers["exceptions"] = context.Exceptions.Select(x => x.ToString()).ToList();
properties.Headers = headers;
await context.EventBus.As<RabbitMqDistributedEventBus>().PublishAsync(context.EventType, context.EventData, properties);
context.TryGetRetryAttempt(out var retryAttempt);
await context.EventBus.As<RabbitMqDistributedEventBus>().PublishAsync(
context.EventType,
context.EventData,
context.GetProperty(HeadersKey).As<IBasicProperties>(),
new Dictionary<string, object>
{
{RetryAttemptKey, ++retryAttempt},
{"exceptions", context.Exceptions.Select(x => x.ToString()).ToList()}
});
}
protected override Task MoveToDeadLetter(EventExecutionErrorContext context)
{
if (context.Exceptions.Count == 1)
{
context.Exceptions[0].ReThrow();
}
throw new AggregateException(
"More than one error has occurred while triggering the event: " + context.EventType,
context.Exceptions);
}
protected override bool ShouldRetry(EventExecutionErrorContext context)
{
if (!base.ShouldRetry(context))
{
return false;
}
var properties = context.GetProperty(HeadersKey).As<IBasicProperties>();
if (properties.Headers == null || !properties.Headers.ContainsKey(RetryAttemptKey))
{
return true;
}
var retryAttempt = (int) properties.Headers[RetryAttemptKey];
ThrowOriginalExceptions(context);
return Options.RetryStrategyOptions.MaxRetryAttempts > retryAttempt;
return Task.CompletedTask;
}
}
}

@ -1,10 +1,12 @@
using System;
using System.Threading.Tasks;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.EventBus.Rebus
{
/// <summary>
/// Rebus will automatic retries and error handling: https://github.com/rebus-org/Rebus/wiki/Automatic-retries-and-error-handling
/// </summary>
public class RebusEventErrorHandler : EventErrorHandlerBase, ISingletonDependency
{
public RebusEventErrorHandler(
@ -15,30 +17,16 @@ namespace Volo.Abp.EventBus.Rebus
protected override Task Retry(EventExecutionErrorContext context)
{
Throw(context);
ThrowOriginalExceptions(context);
return Task.CompletedTask;
}
protected override Task MoveToDeadLetter(EventExecutionErrorContext context)
{
Throw(context);
ThrowOriginalExceptions(context);
return Task.CompletedTask;
}
private void Throw(EventExecutionErrorContext context)
{
// Rebus will automatic retries and error handling: https://github.com/rebus-org/Rebus/wiki/Automatic-retries-and-error-handling
if (context.Exceptions.Count == 1)
{
context.Exceptions[0].ReThrow();
}
throw new AggregateException(
"More than one error has occurred while triggering the event: " + context.EventType,
context.Exceptions);
}
}
}

@ -16,6 +16,7 @@
<ItemGroup>
<ProjectReference Include="..\Volo.Abp.EventBus.Abstractions\Volo.Abp.EventBus.Abstractions.csproj" />
<ProjectReference Include="..\Volo.Abp.Json\Volo.Abp.Json.csproj" />
<ProjectReference Include="..\Volo.Abp.MultiTenancy\Volo.Abp.MultiTenancy.csproj" />
</ItemGroup>

@ -4,6 +4,7 @@ using System.Collections.Generic;
using Volo.Abp.EventBus.Abstractions;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Local;
using Volo.Abp.Json;
using Volo.Abp.Modularity;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Reflection;
@ -12,7 +13,8 @@ namespace Volo.Abp.EventBus
{
[DependsOn(
typeof(AbpEventBusAbstractionsModule),
typeof(AbpMultiTenancyModule))]
typeof(AbpMultiTenancyModule),
typeof(AbpJsonModule))]
public class AbpEventBusModule : AbpModule
{
public override void PreConfigureServices(ServiceConfigurationContext context)

@ -8,7 +8,6 @@ using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Collections;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Local;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Reflection;
@ -104,7 +103,7 @@ namespace Volo.Abp.EventBus
if (exceptions.Any())
{
var context = new EventExecutionErrorContext(exceptions, eventData, eventType, this);
var context = new EventExecutionErrorContext(exceptions, eventType, this);
onErrorAction?.Invoke(context);
await ErrorHandler.Handle(context);
}

@ -6,6 +6,9 @@ namespace Volo.Abp.EventBus
{
public abstract class EventErrorHandlerBase : IEventErrorHandler
{
public const string HeadersKey = "headers";
public const string RetryAttemptKey = "retryAttempt";
protected AbpEventBusOptions Options { get; }
protected EventErrorHandlerBase(IOptions<AbpEventBusOptions> options)
@ -17,14 +20,7 @@ namespace Volo.Abp.EventBus
{
if (!ShouldHandle(context))
{
if (context.Exceptions.Count == 1)
{
context.Exceptions[0].ReThrow();
}
throw new AggregateException(
"More than one error has occurred while triggering the event: " + context.EventType,
context.Exceptions);
ThrowOriginalExceptions(context);
}
if (ShouldRetry(context))
@ -47,17 +43,34 @@ namespace Volo.Abp.EventBus
return false;
}
if (Options.ErrorHandleSelector != null)
return Options.ErrorHandleSelector == null || Options.ErrorHandleSelector.Invoke(context.EventType);
}
protected virtual bool ShouldRetry(EventExecutionErrorContext context)
{
if (Options.RetryStrategyOptions == null)
{
return false;
}
if (!context.TryGetRetryAttempt(out var retryAttempt))
{
return Options.ErrorHandleSelector.Invoke(context.EventType);
return false;
}
return true;
return Options.RetryStrategyOptions.MaxRetryAttempts > retryAttempt;
}
protected virtual bool ShouldRetry(EventExecutionErrorContext context)
protected virtual void ThrowOriginalExceptions(EventExecutionErrorContext context)
{
return Options.RetryStrategyOptions != null;
if (context.Exceptions.Count == 1)
{
context.Exceptions[0].ReThrow();
}
throw new AggregateException(
"More than one error has occurred while triggering the event: " + context.EventType,
context.Exceptions);
}
}
}

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using Volo.Abp.Data;
using Volo.Abp.ObjectExtending;
namespace Volo.Abp.EventBus
@ -8,18 +9,30 @@ namespace Volo.Abp.EventBus
{
public IReadOnlyList<Exception> Exceptions { get; }
public object EventData { get; }
public object EventData { get; set; }
public Type EventType { get; }
public IEventBus EventBus { get; }
public EventExecutionErrorContext(List<Exception> exceptions, object eventData, Type eventType, IEventBus eventBus)
public EventExecutionErrorContext(List<Exception> exceptions, Type eventType, IEventBus eventBus)
{
Exceptions = exceptions;
EventData = eventData;
EventType = eventType;
EventBus = eventBus;
}
public bool TryGetRetryAttempt(out int retryAttempt)
{
retryAttempt = 0;
if (!this.HasProperty(EventErrorHandlerBase.RetryAttemptKey))
{
return false;
}
retryAttempt = this.GetProperty<int>(EventErrorHandlerBase.RetryAttemptKey);
return true;
}
}
}

@ -11,6 +11,7 @@ using Volo.Abp.Data;
using Volo.Abp.DependencyInjection;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
using Volo.Abp.Json;
namespace Volo.Abp.EventBus.Local
{
@ -29,13 +30,17 @@ namespace Volo.Abp.EventBus.Local
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected IJsonSerializer Serializer { get; }
public LocalEventBus(
IOptions<AbpLocalEventBusOptions> options,
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IEventErrorHandler errorHandler)
IEventErrorHandler errorHandler,
IJsonSerializer serializer)
: base(serviceScopeFactory, currentTenant, errorHandler)
{
Serializer = serializer;
Options = options.Value;
Logger = NullLogger<LocalEventBus>.Instance;
@ -126,9 +131,11 @@ namespace Volo.Abp.EventBus.Local
public virtual async Task PublishAsync(LocalEventMessage localEventMessage)
{
var rawEventData = Serializer.Serialize(localEventMessage.EventData);
await TriggerHandlersAsync(localEventMessage.EventType, localEventMessage.EventData, errorContext =>
{
errorContext.SetProperty("messageId", localEventMessage.MessageId);
errorContext.EventData = Serializer.Deserialize(localEventMessage.EventType, rawEventData);
errorContext.SetProperty(nameof(LocalEventMessage.MessageId), localEventMessage.MessageId);
});
}

@ -7,6 +7,7 @@ using Volo.Abp.DependencyInjection;
namespace Volo.Abp.EventBus.Local
{
[ExposeServices(typeof(LocalEventErrorHandler), typeof(IEventErrorHandler))]
public class LocalEventErrorHandler : EventErrorHandlerBase, ISingletonDependency
{
protected Dictionary<Guid, int> RetryTracking { get; }
@ -25,7 +26,10 @@ namespace Volo.Abp.EventBus.Local
await Task.Delay(Options.RetryStrategyOptions.IntervalMillisecond);
}
var messageId = context.GetProperty<Guid>("messageId");
var messageId = context.GetProperty<Guid>(nameof(LocalEventMessage.MessageId));
context.TryGetRetryAttempt(out var retryAttempt);
RetryTracking[messageId] = ++retryAttempt;
await context.EventBus.As<LocalEventBus>().PublishAsync(new LocalEventMessage(messageId, context.EventData, context.EventType));
@ -34,36 +38,23 @@ namespace Volo.Abp.EventBus.Local
protected override Task MoveToDeadLetter(EventExecutionErrorContext context)
{
if (context.Exceptions.Count == 1)
{
context.Exceptions[0].ReThrow();
}
ThrowOriginalExceptions(context);
throw new AggregateException(
"More than one error has occurred while triggering the event: " + context.EventType,
context.Exceptions);
return Task.CompletedTask;
}
protected override bool ShouldRetry(EventExecutionErrorContext context)
{
if (!base.ShouldRetry(context))
{
return false;
}
var messageId = context.GetProperty<Guid>(nameof(LocalEventMessage.MessageId));
context.SetProperty(RetryAttemptKey, RetryTracking.GetOrDefault(messageId));
var messageId = context.GetProperty<Guid>("messageId");
var index = RetryTracking.GetOrDefault(messageId);
if (Options.RetryStrategyOptions.MaxRetryAttempts <= index)
if (base.ShouldRetry(context))
{
RetryTracking.Remove(messageId);
return false;
return true;
}
RetryTracking[messageId] = ++index;
return true;
RetryTracking.Remove(messageId);
return false;
}
}
}

@ -10,11 +10,10 @@ namespace Volo.Abp.EventBus.Local
[Fact]
public async Task Should_Not_Handle_Exception()
{
MySimpleEventData data = null;
var retryAttempt = 0;
LocalEventBus.Subscribe<MySimpleEventData>(eventData =>
{
++eventData.Value;
data = eventData;
retryAttempt++;
throw new Exception("This exception is intentionally thrown!");
});
@ -23,20 +22,21 @@ namespace Volo.Abp.EventBus.Local
await LocalEventBus.PublishAsync(new MySimpleEventData(1));
});
data.Value.ShouldBe(2);
retryAttempt.ShouldBe(1);
appException.Message.ShouldBe("This exception is intentionally thrown!");
}
[Fact]
public async Task Should_Handle_Exception()
{
MyExceptionHandleEventData data = null;
var retryAttempt = 0;
LocalEventBus.Subscribe<MyExceptionHandleEventData>(eventData =>
{
++eventData.RetryAttempts;
data = eventData;
eventData.Value.ShouldBe(0);
if (eventData.RetryAttempts < 2)
retryAttempt++;
eventData.Value++;
if (retryAttempt < 2)
{
throw new Exception("This exception is intentionally thrown!");
}
@ -46,17 +46,20 @@ namespace Volo.Abp.EventBus.Local
});
await LocalEventBus.PublishAsync(new MyExceptionHandleEventData(0));
data.RetryAttempts.ShouldBe(2);
retryAttempt.ShouldBe(2);
}
[Fact]
public async Task Should_Throw_Exception_After_Error_Handle()
{
MyExceptionHandleEventData data = null;
var retryAttempt = 0;
LocalEventBus.Subscribe<MyExceptionHandleEventData>(eventData =>
{
++eventData.RetryAttempts;
data = eventData;
eventData.Value.ShouldBe(0);
retryAttempt++;
eventData.Value++;
throw new Exception("This exception is intentionally thrown!");
});
@ -65,7 +68,7 @@ namespace Volo.Abp.EventBus.Local
await LocalEventBus.PublishAsync(new MyExceptionHandleEventData(0));
});
data.RetryAttempts.ShouldBe(4);
retryAttempt.ShouldBe(4);
appException.Message.ShouldBe("This exception is intentionally thrown!");
}
}

@ -2,11 +2,11 @@
{
public class MyExceptionHandleEventData
{
public int RetryAttempts { get; set; }
public int Value { get; set; }
public MyExceptionHandleEventData(int retryAttempts)
public MyExceptionHandleEventData(int value)
{
RetryAttempts = retryAttempts;
Value = value;
}
}
}

Loading…
Cancel
Save