Refactor and improvements on the event bus

pull/594/head
Halil ibrahim Kalkan 6 years ago
parent 6d1ba3e1b3
commit 653c0edb7f

@ -37,6 +37,33 @@ namespace System.Collections.Generic
return true;
}
/// <summary>
/// Adds items to the collection which are not already in the collection.
/// </summary>
/// <param name="source">The collection</param>
/// <param name="items">Item to check and add</param>
/// <typeparam name="T">Type of the items in the collection</typeparam>
/// <returns>Returns the added items.</returns>
public static IEnumerable<T> AddIfNotContains<T>([NotNull] this ICollection<T> source, IEnumerable<T> items)
{
Check.NotNull(source, nameof(source));
var addedItems = new List<T>();
foreach (var item in items)
{
if (source.Contains(item))
{
continue;
}
source.Add(item);
addedItems.Add(item);
}
return addedItems;
}
/// <summary>
/// Adds an item to the collection if it's not already in the collection based on the given <paramref name="predicate"/>.
/// </summary>

@ -17,15 +17,15 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq
public class RabbitMqDistributedEventBus : EventBusBase, IDistributedEventBus, ITransientDependency
{
protected RabbitMqDistributedEventBusOptions Options { get; }
protected IChannelPool ChannelPool { get; }
protected IConnectionPool ConnectionPool { get; }
protected IRabbitMqSerializer Serializer { get; }
public RabbitMqDistributedEventBus(
IOptions<RabbitMqDistributedEventBusOptions> options,
IChannelPool channelPool,
IConnectionPool connectionPool,
IRabbitMqSerializer serializer)
{
ChannelPool = channelPool;
ConnectionPool = connectionPool;
Serializer = serializer;
Options = options.Value;
}
@ -57,18 +57,18 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq
public override Task PublishAsync(Type eventType, object eventData)
{
var eventName = eventType.FullName; //TODO: Get eventname from an attribute if available
var eventName = EventNameAttribute.GetName(eventType);
var body = Serializer.Serialize(eventData);
using (var channelAccessor = ChannelPool.Acquire(Guid.NewGuid().ToString()))
using (var channel = ConnectionPool.Get().CreateModel())
{
//TODO: Other properties like durable?
channelAccessor.Channel.ExchangeDeclare(Options.ExchangeName, "");
channel.ExchangeDeclare(Options.ExchangeName, "");
var properties = channelAccessor.Channel.CreateBasicProperties();
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; //persistent
channelAccessor.Channel.BasicPublish(
channel.BasicPublish(
exchange: Options.ExchangeName,
routingKey: eventName,
mandatory: true,

@ -1,8 +1,11 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Local;
using Volo.Abp.Modularity;
using Volo.Abp.Reflection;
namespace Volo.Abp.EventBus
{
@ -15,22 +18,29 @@ namespace Volo.Abp.EventBus
private static void AddEventHandlers(IServiceCollection services)
{
var handlers = new List<Type>();
var localHandlers = new List<Type>();
var distributedHandlers = new List<Type>();
services.OnRegistred(context =>
{
if (context.ImplementationType.GetInterfaces().Any(i => typeof(IEventHandler).IsAssignableFrom(i)))
if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IEventHandler<>)))
{
handlers.Add(context.ImplementationType);
localHandlers.Add(context.ImplementationType);
}
else if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IDistributedEventHandler<>)))
{
distributedHandlers.Add(context.ImplementationType);
}
});
services.Configure<EventBusOptions>(options =>
services.Configure<LocalEventBusOptions>(options =>
{
foreach (var handler in handlers)
{
options.Handlers.AddIfNotContains(handler);
}
options.Handlers.AddIfNotContains(localHandlers);
});
services.Configure<DistributedEventBusOptions>(options =>
{
options.Handlers.AddIfNotContains(distributedHandlers);
});
}
}

@ -0,0 +1,14 @@
using Volo.Abp.Collections;
namespace Volo.Abp.EventBus.Distributed
{
public class DistributedEventBusOptions
{
public ITypeList<IEventHandler> Handlers { get; }
public DistributedEventBusOptions()
{
Handlers = new TypeList<IEventHandler>();
}
}
}

@ -121,9 +121,7 @@ namespace Volo.Abp.EventBus.Local
{
var handlerType = eventHandlerWrapper.EventHandler.GetType();
if (ReflectionHelper.IsAssignableToGenericType(
handlerType,
typeof(IEventHandler<>)))
if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(IEventHandler<>)))
{
var method = typeof(IEventHandler<>) //TODO: to a static field
.MakeGenericType(eventType)
@ -134,9 +132,7 @@ namespace Volo.Abp.EventBus.Local
await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
}
else if (ReflectionHelper.IsAssignableToGenericType(
handlerType,
typeof(IDistributedEventHandler<>)))
else if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(IDistributedEventHandler<>)))
{
var method = typeof(IDistributedEventHandler<>) //TODO: to a static field
.MakeGenericType(eventType)

@ -0,0 +1,39 @@
using System;
using System.Linq;
using JetBrains.Annotations;
namespace Volo.Abp.EventBus
{
[AttributeUsage(AttributeTargets.Class)]
public class EventNameAttribute : Attribute, IEventNameProvider
{
public string Name { get; }
public EventNameAttribute([NotNull] string name)
{
Name = Check.NotNullOrWhiteSpace(name, nameof(name));
}
public static string GetName<TEvent>()
{
return GetName(typeof(TEvent));
}
public static string GetName([NotNull] Type eventType)
{
Check.NotNull(eventType, nameof(eventType));
return eventType
.GetCustomAttributes(true)
.OfType<IEventNameProvider>()
.FirstOrDefault()
?.Name
?? eventType.FullName;
}
}
public interface IEventNameProvider
{
string Name { get; }
}
}

@ -6,8 +6,6 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.Collections;
using Volo.Abp.DependencyInjection;
@ -26,14 +24,14 @@ namespace Volo.Abp.EventBus.Local
/// </summary>
public ILogger<LocalEventBus> Logger { get; set; }
protected EventBusOptions Options { get; }
protected LocalEventBusOptions Options { get; }
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected IServiceProvider ServiceProvider { get; }
public LocalEventBus(
IOptions<EventBusOptions> options,
IOptions<LocalEventBusOptions> options,
IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;

@ -1,12 +1,12 @@
using Volo.Abp.Collections;
namespace Volo.Abp.EventBus
namespace Volo.Abp.EventBus.Local
{
public class EventBusOptions
public class LocalEventBusOptions
{
public ITypeList<IEventHandler> Handlers { get; }
public EventBusOptions()
public LocalEventBusOptions()
{
Handlers = new TypeList<IEventHandler>();
}
Loading…
Cancel
Save