@ -6,7 +6,6 @@ using System.Threading.Tasks;
using Confluent.Kafka ;
using Microsoft.Extensions.DependencyInjection ;
using Microsoft.Extensions.Options ;
using Volo.Abp.Data ;
using Volo.Abp.DependencyInjection ;
using Volo.Abp.EventBus.Distributed ;
using Volo.Abp.Guids ;
@ -22,7 +21,6 @@ namespace Volo.Abp.EventBus.Kafka
[ExposeServices(typeof(IDistributedEventBus), typeof(KafkaDistributedEventBus))]
public class KafkaDistributedEventBus : DistributedEventBusBase , ISingletonDependency
{
protected AbpEventBusOptions AbpEventBusOptions { get ; }
protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get ; }
protected IKafkaMessageConsumerFactory MessageConsumerFactory { get ; }
protected IKafkaSerializer Serializer { get ; }
@ -30,7 +28,6 @@ namespace Volo.Abp.EventBus.Kafka
protected ConcurrentDictionary < Type , List < IEventHandlerFactory > > HandlerFactories { get ; }
protected ConcurrentDictionary < string , Type > EventTypes { get ; }
protected IKafkaMessageConsumer Consumer { get ; private set ; }
protected string DeadLetterTopicName { get ; }
public KafkaDistributedEventBus (
IServiceScopeFactory serviceScopeFactory ,
@ -41,26 +38,20 @@ namespace Volo.Abp.EventBus.Kafka
IOptions < AbpDistributedEventBusOptions > abpDistributedEventBusOptions ,
IKafkaSerializer serializer ,
IProducerPool producerPool ,
IEventErrorHandler errorHandler ,
IOptions < AbpEventBusOptions > abpEventBusOptions ,
IGuidGenerator guidGenerator ,
IClock clock )
: base (
serviceScopeFactory ,
currentTenant ,
unitOfWorkManager ,
errorHandler ,
abpDistributedEventBusOptions ,
guidGenerator ,
clock )
{
AbpKafkaEventBusOptions = abpKafkaEventBusOptions . Value ;
AbpEventBusOptions = abpEventBusOptions . Value ;
MessageConsumerFactory = messageConsumerFactory ;
Serializer = serializer ;
ProducerPool = producerPool ;
DeadLetterTopicName =
AbpEventBusOptions . DeadLetterName ? ? AbpKafkaEventBusOptions . TopicName + "_dead_letter" ;
HandlerFactories = new ConcurrentDictionary < Type , List < IEventHandlerFactory > > ( ) ;
EventTypes = new ConcurrentDictionary < string , Type > ( ) ;
@ -70,7 +61,6 @@ namespace Volo.Abp.EventBus.Kafka
{
Consumer = MessageConsumerFactory . Create (
AbpKafkaEventBusOptions . TopicName ,
DeadLetterTopicName ,
AbpKafkaEventBusOptions . GroupId ,
AbpKafkaEventBusOptions . ConnectionName ) ;
Consumer . OnMessageReceived ( ProcessEventAsync ) ;
@ -88,12 +78,12 @@ namespace Volo.Abp.EventBus.Kafka
}
string messageId = null ;
if ( message . Headers . TryGetLastBytes ( "messageId" , out var messageIdBytes ) )
{
messageId = System . Text . Encoding . UTF8 . GetString ( messageIdBytes ) ;
}
if ( await AddToInboxAsync ( messageId , eventName , eventType , message . Value ) )
{
return ;
@ -101,18 +91,7 @@ namespace Volo.Abp.EventBus.Kafka
var eventData = Serializer . Deserialize ( message . Value , eventType ) ;
await TriggerHandlersAsync ( eventType , eventData , errorContext = >
{
var retryAttempt = 0 ;
if ( message . Headers . TryGetLastBytes ( EventErrorHandlerBase . RetryAttemptKey , out var retryAttemptBytes ) )
{
retryAttempt = Serializer . Deserialize < int > ( retryAttemptBytes ) ;
}
errorContext . EventData = Serializer . Deserialize ( message . Value , eventType ) ;
errorContext . SetProperty ( EventErrorHandlerBase . HeadersKey , message . Headers ) ;
errorContext . SetProperty ( EventErrorHandlerBase . RetryAttemptKey , retryAttempt ) ;
} ) ;
await TriggerHandlersAsync ( eventType , eventData ) ;
}
public override IDisposable Subscribe ( Type eventType , IEventHandlerFactory factory )
@ -226,7 +205,7 @@ namespace Volo.Abp.EventBus.Kafka
{
return ;
}
var eventData = Serializer . Deserialize ( incomingEvent . EventData , eventType ) ;
var exceptions = new List < Exception > ( ) ;
await TriggerHandlersAsync ( eventType , eventData , exceptions , inboxConfig ) ;
@ -252,11 +231,6 @@ namespace Volo.Abp.EventBus.Kafka
) ;
}
public virtual async Task PublishToDeadLetterAsync ( Type eventType , object eventData , Headers headers , Dictionary < string , object > headersArguments )
{
await PublishAsync ( DeadLetterTopicName , eventType , eventData , headers , headersArguments ) ;
}
private Task PublishAsync ( string topicName , Type eventType , object eventData , Headers headers , Dictionary < string , object > headersArguments )
{
var eventName = EventNameAttribute . GetNameOrDefault ( eventType ) ;
@ -264,7 +238,7 @@ namespace Volo.Abp.EventBus.Kafka
return PublishAsync ( topicName , eventName , body , headers , headersArguments ) ;
}
private async Task PublishAsync ( string topicName , string eventName , byte [ ] body , Headers headers , Dictionary < string , object > headersArguments )
{
var producer = ProducerPool . Get ( AbpKafkaEventBusOptions . ConnectionName ) ;