@ -6,6 +6,7 @@ using System.Threading.Tasks;
using Confluent.Kafka ;
using Microsoft.Extensions.DependencyInjection ;
using Microsoft.Extensions.Options ;
using Volo.Abp.Data ;
using Volo.Abp.DependencyInjection ;
using Volo.Abp.EventBus.Distributed ;
using Volo.Abp.Kafka ;
@ -18,6 +19,7 @@ namespace Volo.Abp.EventBus.Kafka
[ExposeServices(typeof(IDistributedEventBus), typeof(KafkaDistributedEventBus))]
public class KafkaDistributedEventBus : EventBusBase , IDistributedEventBus , ISingletonDependency
{
protected AbpEventBusOptions AbpEventBusOptions { get ; }
protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get ; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get ; }
protected IKafkaMessageConsumerFactory MessageConsumerFactory { get ; }
@ -26,6 +28,7 @@ namespace Volo.Abp.EventBus.Kafka
protected ConcurrentDictionary < Type , List < IEventHandlerFactory > > HandlerFactories { get ; }
protected ConcurrentDictionary < string , Type > EventTypes { get ; }
protected IKafkaMessageConsumer Consumer { get ; private set ; }
protected string DeadLetterTopicName { get ; }
public KafkaDistributedEventBus (
IServiceScopeFactory serviceScopeFactory ,
@ -34,14 +37,19 @@ namespace Volo.Abp.EventBus.Kafka
IKafkaMessageConsumerFactory messageConsumerFactory ,
IOptions < AbpDistributedEventBusOptions > abpDistributedEventBusOptions ,
IKafkaSerializer serializer ,
IProducerPool producerPool )
: base ( serviceScopeFactory , currentTenant )
IProducerPool producerPool ,
IEventErrorHandler errorHandler ,
IOptions < AbpEventBusOptions > abpEventBusOptions )
: base ( serviceScopeFactory , currentTenant , errorHandler )
{
AbpKafkaEventBusOptions = abpKafkaEventBusOptions . Value ;
AbpDistributedEventBusOptions = abpDistributedEventBusOptions . Value ;
AbpEventBusOptions = abpEventBusOptions . Value ;
MessageConsumerFactory = messageConsumerFactory ;
Serializer = serializer ;
ProducerPool = producerPool ;
DeadLetterTopicName =
AbpEventBusOptions . DeadLetterName ? ? AbpKafkaEventBusOptions . TopicName + "_dead_letter" ;
HandlerFactories = new ConcurrentDictionary < Type , List < IEventHandlerFactory > > ( ) ;
EventTypes = new ConcurrentDictionary < string , Type > ( ) ;
@ -51,9 +59,9 @@ namespace Volo.Abp.EventBus.Kafka
{
Consumer = MessageConsumerFactory . Create (
AbpKafkaEventBusOptions . TopicName ,
DeadLetterTopicName ,
AbpKafkaEventBusOptions . GroupId ,
AbpKafkaEventBusOptions . ConnectionName ) ;
Consumer . OnMessageReceived ( ProcessEventAsync ) ;
SubscribeHandlers ( AbpDistributedEventBusOptions . Handlers ) ;
@ -70,7 +78,18 @@ namespace Volo.Abp.EventBus.Kafka
var eventData = Serializer . Deserialize ( message . Value , eventType ) ;
await TriggerHandlersAsync ( eventType , eventData ) ;
await TriggerHandlersAsync ( eventType , eventData , errorContext = >
{
var retryAttempt = 0 ;
if ( message . Headers . TryGetLastBytes ( EventErrorHandlerBase . RetryAttemptKey , out var retryAttemptBytes ) )
{
retryAttempt = Serializer . Deserialize < int > ( retryAttemptBytes ) ;
}
errorContext . EventData = Serializer . Deserialize ( message . Value , eventType ) ;
errorContext . SetProperty ( EventErrorHandlerBase . HeadersKey , message . Headers ) ;
errorContext . SetProperty ( EventErrorHandlerBase . RetryAttemptKey , retryAttempt ) ;
} ) ;
}
public IDisposable Subscribe < TEvent > ( IDistributedEventHandler < TEvent > handler ) where TEvent : class
@ -147,20 +166,51 @@ namespace Volo.Abp.EventBus.Kafka
}
public override async Task PublishAsync ( Type eventType , object eventData )
{
await PublishAsync ( eventType , eventData , new Headers { { "messageId" , Serializer . Serialize ( Guid . NewGuid ( ) ) } } , null ) ;
}
public virtual async Task PublishAsync ( Type eventType , object eventData , Headers headers , Dictionary < string , object > headersArguments )
{
await PublishAsync ( AbpKafkaEventBusOptions . TopicName , eventType , eventData , headers , headersArguments ) ;
}
public virtual async Task PublishToDeadLetterAsync ( Type eventType , object eventData , Headers headers , Dictionary < string , object > headersArguments )
{
await PublishAsync ( DeadLetterTopicName , eventType , eventData , headers , headersArguments ) ;
}
private async Task PublishAsync ( string topicName , Type eventType , object eventData , Headers headers , Dictionary < string , object > headersArguments )
{
var eventName = EventNameAttribute . GetNameOrDefault ( eventType ) ;
var body = Serializer . Serialize ( eventData ) ;
var producer = ProducerPool . Get ( AbpKafkaEventBusOptions . ConnectionName ) ;
SetEventMessageHeaders ( headers , headersArguments ) ;
await producer . ProduceAsync (
AbpKafkaEventBusOptions . TopicName ,
topicName,
new Message < string , byte [ ] >
{
Key = eventName , Value = body
Key = eventName , Value = body , Headers = headers
} ) ;
}
private void SetEventMessageHeaders ( Headers headers , Dictionary < string , object > headersArguments )
{
if ( headersArguments = = null )
{
return ;
}
foreach ( var header in headersArguments )
{
headers . Remove ( header . Key ) ;
headers . Add ( header . Key , Serializer . Serialize ( header . Value ) ) ;
}
}
private List < IEventHandlerFactory > GetOrCreateHandlerFactories ( Type eventType )
{
return HandlerFactories . GetOrAdd (