Implement EventErrorHandler for kafka

pull/8829/head
liangshiwei 5 years ago
parent 759d908206
commit f6198eeb80

@ -20,6 +20,7 @@ namespace Volo.Abp.EventBus.Kafka
[ExposeServices(typeof(IDistributedEventBus), typeof(KafkaDistributedEventBus))]
public class KafkaDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency
{
protected AbpEventBusOptions AbpEventBusOptions { get; }
protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
protected IKafkaMessageConsumerFactory MessageConsumerFactory { get; }
@ -28,6 +29,7 @@ namespace Volo.Abp.EventBus.Kafka
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected IKafkaMessageConsumer Consumer { get; private set; }
protected string DeadLetterTopicName { get; }
public KafkaDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
@ -37,14 +39,17 @@ namespace Volo.Abp.EventBus.Kafka
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IKafkaSerializer serializer,
IProducerPool producerPool,
IEventErrorHandler errorHandler)
IEventErrorHandler errorHandler,
IOptions<AbpEventBusOptions> abpEventBusOptions)
: base(serviceScopeFactory, currentTenant, errorHandler)
{
AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value;
AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value;
AbpEventBusOptions = abpEventBusOptions.Value;
MessageConsumerFactory = messageConsumerFactory;
Serializer = serializer;
ProducerPool = producerPool;
DeadLetterTopicName = AbpEventBusOptions.DeadLetterQueue ?? AbpKafkaEventBusOptions.TopicName + "_error";
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
@ -56,8 +61,6 @@ namespace Volo.Abp.EventBus.Kafka
AbpKafkaEventBusOptions.TopicName,
AbpKafkaEventBusOptions.GroupId,
AbpKafkaEventBusOptions.ConnectionName);
Consumer.Consume();
Consumer.OnMessageReceived(ProcessEventAsync);
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
@ -157,6 +160,16 @@ namespace Volo.Abp.EventBus.Kafka
}
public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers)
{
await PublishAsync(AbpKafkaEventBusOptions.TopicName, eventType, eventData, headers);
}
public virtual async Task PublishToDeadLetterAsync(Type eventType, object eventData, Headers headers)
{
await PublishAsync(DeadLetterTopicName, eventType, eventData, headers);
}
private async Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData);
@ -164,7 +177,7 @@ namespace Volo.Abp.EventBus.Kafka
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
await producer.ProduceAsync(
AbpKafkaEventBusOptions.TopicName,
topicName,
new Message<string, byte[]>
{
Key = eventName, Value = body, Headers = headers

@ -14,26 +14,14 @@ namespace Volo.Abp.EventBus.Kafka
protected IKafkaSerializer Serializer { get; }
protected KafkaDistributedEventBus EventBus { get; }
protected IProducerPool ProducerPool { get; }
protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; }
protected string ErrorTopicName { get; }
public KafkaEventErrorHandler(
IOptions<AbpEventBusOptions> options,
IKafkaSerializer serializer,
KafkaDistributedEventBus eventBus,
IKafkaMessageConsumerFactory consumerFactory,
IProducerPool producerPool,
IOptions<AbpKafkaEventBusOptions> abpKafkaEventBusOptions) : base(options)
KafkaDistributedEventBus eventBus) : base(options)
{
Serializer = serializer;
EventBus = eventBus;
ProducerPool = producerPool;
AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value;
ErrorTopicName = options.Value.ErrorQueue ?? abpKafkaEventBusOptions.Value.TopicName + "_error";
consumerFactory.Create(ErrorTopicName, string.Empty, abpKafkaEventBusOptions.Value.ConnectionName);
}
protected override async Task Retry(EventExecutionErrorContext context)
@ -52,19 +40,12 @@ namespace Volo.Abp.EventBus.Kafka
await EventBus.PublishAsync(context.EventType, context.EventData, headers);
}
protected override async Task MoveToErrorQueue(EventExecutionErrorContext context)
protected override async Task MoveToDeadLetter(EventExecutionErrorContext context)
{
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
var eventName = EventNameAttribute.GetNameOrDefault(context.EventType);
var body = Serializer.Serialize(context.EventData);
await producer.ProduceAsync(
AbpKafkaEventBusOptions.TopicName,
new Message<string, byte[]>
{
Key = eventName, Value = body,
Headers = new Headers {{"exceptions", Serializer.Serialize(context.Exceptions)}}
});
await EventBus.PublishToDeadLetterAsync(context.EventType, context.EventData, new Headers
{
{"exceptions", Serializer.Serialize(context.Exceptions)}
});
}
protected override bool ShouldRetry(EventExecutionErrorContext context)
@ -75,14 +56,13 @@ namespace Volo.Abp.EventBus.Kafka
}
var headers = context.GetProperty<Headers>(HeadersKey);
var index = 1;
if (headers == null)
{
return true;
}
index = Serializer.Deserialize<int>(headers.GetLastBytes(RetryIndexKey));
var index = Serializer.Deserialize<int>(headers.GetLastBytes(RetryIndexKey));
return Options.RetryStrategyOptions.Count < index;
}

@ -8,7 +8,7 @@ namespace Volo.Abp.EventBus
public Func<Type, bool> ErrorHandleSelector { get; set; }
public string ErrorQueue { get; set; }
public string DeadLetterQueue { get; set; }
public AbpEventBusRetryStrategyOptions RetryStrategyOptions { get; set; }

@ -1,7 +1,5 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Volo.Abp.Data;
namespace Volo.Abp.EventBus
{
@ -9,7 +7,7 @@ namespace Volo.Abp.EventBus
{
protected AbpEventBusOptions Options { get; }
public EventErrorHandlerBase(IOptions<AbpEventBusOptions> options)
protected EventErrorHandlerBase(IOptions<AbpEventBusOptions> options)
{
Options = options.Value;
}
@ -27,12 +25,12 @@ namespace Volo.Abp.EventBus
return;
}
await MoveToErrorQueue(context);
await MoveToDeadLetter(context);
}
protected abstract Task Retry(EventExecutionErrorContext context);
protected abstract Task MoveToErrorQueue(EventExecutionErrorContext context);
protected abstract Task MoveToDeadLetter(EventExecutionErrorContext context);
protected virtual bool ShouldHandle(EventExecutionErrorContext context)
{
@ -51,7 +49,7 @@ namespace Volo.Abp.EventBus
protected virtual bool ShouldRetry(EventExecutionErrorContext context)
{
return Options.RetryStrategyOptions == null && false;
return Options.RetryStrategyOptions != null;
}
}
}

@ -34,7 +34,7 @@ namespace Volo.Abp.EventBus.Local
new LocalEventMessage(messageId, context.EventData, context.EventType));
}
protected override Task MoveToErrorQueue(EventExecutionErrorContext context)
protected override Task MoveToDeadLetter(EventExecutionErrorContext context)
{
if (context.Exceptions.Count == 1)
{

@ -7,7 +7,5 @@ namespace Volo.Abp.Kafka
public interface IKafkaMessageConsumer
{
void OnMessageReceived(Func<Message<string, byte[]>, Task> callback);
void Consume();
}
}

@ -8,11 +8,13 @@
/// not disposed until end of the application.
/// </summary>
/// <param name="topicName"></param>
/// <param name="deadLetterTopicName"></param>
/// <param name="groupId"></param>
/// <param name="connectionName"></param>
/// <returns></returns>
IKafkaMessageConsumer Create(
string topicName,
string deadLetterTopicName,
string groupId,
string connectionName = null);
}

@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
@ -36,6 +37,9 @@ namespace Volo.Abp.Kafka
protected string TopicName { get; private set; }
protected string DeadLetterTopicName { get; private set; }
public KafkaMessageConsumer(
IConsumerPool consumerPool,
IExceptionNotifier exceptionNotifier,
@ -53,16 +57,20 @@ namespace Volo.Abp.Kafka
public virtual void Initialize(
[NotNull] string topicName,
[NotNull] string deadLetterTopicName,
[NotNull] string groupId,
string connectionName = null)
{
Check.NotNull(topicName, nameof(topicName));
Check.NotNull(deadLetterTopicName, nameof(deadLetterTopicName));
Check.NotNull(groupId, nameof(groupId));
TopicName = topicName;
DeadLetterTopicName = deadLetterTopicName;
ConnectionName = connectionName ?? KafkaConnections.DefaultConnectionName;
GroupId = groupId;
AsyncHelper.RunSync(CreateTopicAsync);
Consume();
}
public virtual void OnMessageReceived(Func<Message<string, byte[]>, Task> callback)
@ -74,22 +82,34 @@ namespace Volo.Abp.Kafka
{
using (var adminClient = new AdminClientBuilder(Options.Connections.GetOrDefault(ConnectionName)).Build())
{
var topic = new TopicSpecification
var topics = new List<TopicSpecification>
{
Name = TopicName,
NumPartitions = 1,
ReplicationFactor = 1
new()
{
Name = TopicName,
NumPartitions = 1,
ReplicationFactor = 1
},
new()
{
Name = DeadLetterTopicName,
NumPartitions = 1,
ReplicationFactor = 1
}
};
Options.ConfigureTopic?.Invoke(topic);
topics.ForEach(topic =>
{
Options.ConfigureTopic?.Invoke(topic);
});
try
{
await adminClient.CreateTopicsAsync(new[] {topic});
await adminClient.CreateTopicsAsync(topics);
}
catch (CreateTopicsException e)
{
if(e.Results.First().Error.Code != ErrorCode.TopicAlreadyExists)
if(e.Results.Any(x => x.Error.Code != ErrorCode.TopicAlreadyExists))
{
throw;
}
@ -97,7 +117,7 @@ namespace Volo.Abp.Kafka
}
}
public virtual void Consume()
protected virtual void Consume()
{
Consumer = ConsumerPool.Get(GroupId, ConnectionName);

@ -16,6 +16,7 @@ namespace Volo.Abp.Kafka
public IKafkaMessageConsumer Create(
string topicName,
string deadLetterTopicName,
string groupId,
string connectionName = null)
{

Loading…
Cancel
Save