Refactor rabbitmq eventbus

pull/660/head
Halil ibrahim Kalkan 7 years ago
parent 1ebe284f41
commit 9322bf03b3

@ -26,9 +26,10 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq
protected DistributedEventBusOptions DistributedEventBusOptions { get; }
protected IConnectionPool ConnectionPool { get; }
protected IRabbitMqSerializer Serializer { get; }
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; } //TODO: Accessing to the List<IEventHandlerFactory> may not be thread-safe!
//TODO: Accessing to the List<IEventHandlerFactory> may not be thread-safe!
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected IHybridServiceScopeFactory ServiceScopeFactory { get; }
protected IRabbitMqMessageConsumerFactory MessageConsumerFactory { get; }
protected IRabbitMqMessageConsumer Consumer { get; }
@ -39,10 +40,10 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq
IHybridServiceScopeFactory serviceScopeFactory,
IOptions<DistributedEventBusOptions> distributedEventBusOptions,
IRabbitMqMessageConsumerFactory messageConsumerFactory)
: base(serviceScopeFactory)
{
ConnectionPool = connectionPool;
Serializer = serializer;
ServiceScopeFactory = serviceScopeFactory;
MessageConsumerFactory = messageConsumerFactory;
DistributedEventBusOptions = distributedEventBusOptions.Value;
RabbitMqDistributedEventBusOptions = options.Value;
@ -50,8 +51,6 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
Subscribe(DistributedEventBusOptions.Handlers);
Consumer = MessageConsumerFactory.Create(
new ExchangeDeclareConfiguration(
RabbitMqDistributedEventBusOptions.ExchangeName,
@ -67,27 +66,8 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq
);
Consumer.OnMessageReceived(ProcessEventAsync);
}
protected virtual void Subscribe(ITypeList<IEventHandler> handlers)
{
foreach (var handler in handlers)
{
var interfaces = handler.GetInterfaces();
foreach (var @interface in interfaces)
{
if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
{
continue;
}
var genericArgs = @interface.GetGenericArguments();
if (genericArgs.Length == 1)
{
Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceScopeFactory, handler));
}
}
}
SubscribeHandlers(DistributedEventBusOptions.Handlers);
}
private async Task ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea)
@ -186,11 +166,14 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq
using (var channel = ConnectionPool.Get(RabbitMqDistributedEventBusOptions.ConnectionName).CreateModel())
{
//TODO: Other properties like durable?
channel.ExchangeDeclare(RabbitMqDistributedEventBusOptions.ExchangeName, "");
channel.ExchangeDeclare(
RabbitMqDistributedEventBusOptions.ExchangeName,
"direct"
//TODO: Other properties like durable?
);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; //persistent
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
channel.BasicPublish(
exchange: RabbitMqDistributedEventBusOptions.ExchangeName,

@ -5,6 +5,8 @@ using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.Collections;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Reflection;
@ -12,6 +14,13 @@ namespace Volo.Abp.EventBus
{
public abstract class EventBusBase : IEventBus
{
protected IHybridServiceScopeFactory ServiceScopeFactory { get; }
protected EventBusBase(IHybridServiceScopeFactory serviceScopeFactory)
{
ServiceScopeFactory = serviceScopeFactory;
}
/// <inheritdoc/>
public virtual IDisposable Subscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class
{
@ -122,6 +131,27 @@ namespace Volo.Abp.EventBus
}
}
protected virtual void SubscribeHandlers(ITypeList<IEventHandler> handlers)
{
foreach (var handler in handlers)
{
var interfaces = handler.GetInterfaces();
foreach (var @interface in interfaces)
{
if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
{
continue;
}
var genericArgs = @interface.GetGenericArguments();
if (genericArgs.Length == 1)
{
Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceScopeFactory, handler));
}
}
}
}
protected abstract IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType);
protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData, List<Exception> exceptions)

@ -5,9 +5,7 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Volo.Abp.Collections;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;
@ -28,39 +26,16 @@ namespace Volo.Abp.EventBus.Local
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected IHybridServiceScopeFactory ServiceScopeFactory { get; }
public LocalEventBus(
IOptions<LocalEventBusOptions> options,
IHybridServiceScopeFactory serviceScopeFactory)
: base(serviceScopeFactory)
{
ServiceScopeFactory = serviceScopeFactory;
Options = options.Value;
Logger = NullLogger<LocalEventBus>.Instance;
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
Subscribe(Options.Handlers);
}
public virtual void Subscribe(ITypeList<IEventHandler> handlers)
{
foreach (var handler in handlers)
{
var interfaces = handler.GetInterfaces();
foreach (var @interface in interfaces)
{
if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
{
continue;
}
var genericArgs = @interface.GetGenericArguments();
if (genericArgs.Length == 1)
{
Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceScopeFactory, handler));
}
}
}
SubscribeHandlers(Options.Handlers);
}
/// <inheritdoc/>

@ -0,0 +1,12 @@
namespace Volo.Abp.RabbitMQ
{
public static class RabbitMqConsts
{
public static class DeliveryModes
{
public const int NonPersistent = 1;
public const int Persistent = 2;
}
}
}
Loading…
Cancel
Save