Created EventBusBase

pull/594/head
Halil ibrahim Kalkan 6 years ago
parent f440446254
commit 6d1ba3e1b3

@ -1,17 +1,20 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.RabbitMQ;
using RabbitMQ.Client;
using Volo.Abp.EventBus.Local;
namespace Volo.Abp.EventBus.Distributed.RabbitMq
{
/* Inspired from the implementation of "eShopOnContainers"
* TODO: Implement Retry system
/* TODO: Implement Retry system
* TODO: Should be improved
*/
public class RabbitMqDistributedEventBus : IDistributedEventBus, ITransientDependency
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(RabbitMqDistributedEventBus))]
public class RabbitMqDistributedEventBus : EventBusBase, IDistributedEventBus, ITransientDependency
{
protected RabbitMqDistributedEventBusOptions Options { get; }
protected IChannelPool ChannelPool { get; }
@ -26,79 +29,33 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq
Serializer = serializer;
Options = options.Value;
}
public IDisposable Subscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class
{
throw new NotImplementedException();
}
public IDisposable Subscribe<TEvent>(IEventHandler<TEvent> handler) where TEvent : class
{
throw new NotImplementedException();
}
public IDisposable Subscribe<TEvent, THandler>() where TEvent : class where THandler : IEventHandler, new()
{
throw new NotImplementedException();
}
public IDisposable Subscribe(Type eventType, IEventHandler handler)
{
throw new NotImplementedException();
}
public IDisposable Subscribe<TEvent>(IEventHandlerFactory factory) where TEvent : class
{
throw new NotImplementedException();
}
public IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
throw new NotImplementedException();
}
public void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class
{
throw new NotImplementedException();
}
public void Unsubscribe<TEvent>(IEventHandler<TEvent> handler) where TEvent : class
{
throw new NotImplementedException();
}
public void Unsubscribe(Type eventType, IEventHandler handler)
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
throw new NotImplementedException();
}
public void Unsubscribe<TEvent>(IEventHandlerFactory factory) where TEvent : class
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
{
throw new NotImplementedException();
}
public void Unsubscribe(Type eventType, IEventHandlerFactory factory)
public override void Unsubscribe(Type eventType, IEventHandler handler)
{
throw new NotImplementedException();
}
public void UnsubscribeAll<TEvent>() where TEvent : class
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
throw new NotImplementedException();
}
public void UnsubscribeAll(Type eventType)
public override void UnsubscribeAll(Type eventType)
{
throw new NotImplementedException();
}
public Task PublishAsync<TEvent>(TEvent eventData)
where TEvent : class
{
return PublishAsync(typeof(TEvent), eventData);
}
public Task PublishAsync(Type eventType, object eventData)
public override Task PublishAsync(Type eventType, object eventData)
{
var eventName = eventType.FullName; //TODO: Get eventname from an attribute if available
var body = Serializer.Serialize(eventData);
@ -112,7 +69,7 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq
properties.DeliveryMode = 2; //persistent
channelAccessor.Channel.BasicPublish(
exchange: Options.ExchangeName,
exchange: Options.ExchangeName,
routingKey: eventName,
mandatory: true,
basicProperties: properties,
@ -122,5 +79,10 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq
return Task.CompletedTask;
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
throw new NotImplementedException();
}
}
}

@ -2,6 +2,8 @@
{
public class RabbitMqDistributedEventBusOptions
{
public string ClientName { get; set; }
public string ExchangeName { get; set; }
}
}

@ -0,0 +1,212 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Reflection;
namespace Volo.Abp.EventBus.Local
{
public abstract class EventBusBase : IEventBus
{
/// <inheritdoc/>
public virtual IDisposable Subscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class
{
return Subscribe(typeof(TEvent), new ActionEventHandler<TEvent>(action));
}
/// <inheritdoc/>
public virtual IDisposable Subscribe<TEvent>(IEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}
/// <inheritdoc/>
public virtual IDisposable Subscribe<TEvent, THandler>()
where TEvent : class
where THandler : IEventHandler, new()
{
return Subscribe(typeof(TEvent), new TransientEventHandlerFactory<THandler>());
}
/// <inheritdoc/>
public virtual IDisposable Subscribe(Type eventType, IEventHandler handler)
{
return Subscribe(eventType, new SingleInstanceHandlerFactory(handler));
}
/// <inheritdoc/>
public virtual IDisposable Subscribe<TEvent>(IEventHandlerFactory factory) where TEvent : class
{
return Subscribe(typeof(TEvent), factory);
}
public abstract IDisposable Subscribe(Type eventType, IEventHandlerFactory factory);
public abstract void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class;
/// <inheritdoc/>
public virtual void Unsubscribe<TEvent>(IEventHandler<TEvent> handler) where TEvent : class
{
Unsubscribe(typeof(TEvent), handler);
}
public abstract void Unsubscribe(Type eventType, IEventHandler handler);
/// <inheritdoc/>
public virtual void Unsubscribe<TEvent>(IEventHandlerFactory factory) where TEvent : class
{
Unsubscribe(typeof(TEvent), factory);
}
public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory);
/// <inheritdoc/>
public virtual void UnsubscribeAll<TEvent>() where TEvent : class
{
UnsubscribeAll(typeof(TEvent));
}
/// <inheritdoc/>
public abstract void UnsubscribeAll(Type eventType);
/// <inheritdoc/>
public virtual Task PublishAsync<TEvent>(TEvent eventData) where TEvent : class
{
return PublishAsync(typeof(TEvent), eventData);
}
/// <inheritdoc/>
public abstract Task PublishAsync(Type eventType, object eventData);
protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData, List<Exception> exceptions)
{
await new SynchronizationContextRemover();
foreach (var handlerFactories in GetHandlerFactories(eventType))
{
foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
{
await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData, exceptions);
}
}
//Implements generic argument inheritance. See IEventDataWithInheritableGenericArgument
if (eventType.GetTypeInfo().IsGenericType &&
eventType.GetGenericArguments().Length == 1 &&
typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
{
var genericArg = eventType.GetGenericArguments()[0];
var baseArg = genericArg.GetTypeInfo().BaseType;
if (baseArg != null)
{
var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg);
var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs);
await PublishAsync(baseEventType, baseEventData);
}
}
}
protected abstract IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType);
protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData, List<Exception> exceptions)
{
using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
{
try
{
var handlerType = eventHandlerWrapper.EventHandler.GetType();
if (ReflectionHelper.IsAssignableToGenericType(
handlerType,
typeof(IEventHandler<>)))
{
var method = typeof(IEventHandler<>) //TODO: to a static field
.MakeGenericType(eventType)
.GetMethod(
nameof(IEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
}
else if (ReflectionHelper.IsAssignableToGenericType(
handlerType,
typeof(IDistributedEventHandler<>)))
{
var method = typeof(IDistributedEventHandler<>) //TODO: to a static field
.MakeGenericType(eventType)
.GetMethod(
nameof(IDistributedEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
}
else
{
throw new AbpException("The object instance is not an event handler. Object type: " + handlerType.AssemblyQualifiedName);
}
}
catch (TargetInvocationException ex)
{
exceptions.Add(ex.InnerException);
}
catch (Exception ex)
{
exceptions.Add(ex);
}
}
}
protected class EventTypeWithEventHandlerFactories
{
public Type EventType { get; }
public List<IEventHandlerFactory> EventHandlerFactories { get; }
public EventTypeWithEventHandlerFactories(Type eventType, List<IEventHandlerFactory> eventHandlerFactories)
{
EventType = eventType;
EventHandlerFactories = eventHandlerFactories;
}
}
// Reference from
// https://blogs.msdn.microsoft.com/benwilli/2017/02/09/an-alternative-to-configureawaitfalse-everywhere/
protected struct SynchronizationContextRemover : INotifyCompletion
{
public bool IsCompleted
{
get { return SynchronizationContext.Current == null; }
}
public void OnCompleted(Action continuation)
{
var prevContext = SynchronizationContext.Current;
try
{
SynchronizationContext.SetSynchronizationContext(null);
continuation();
}
finally
{
SynchronizationContext.SetSynchronizationContext(prevContext);
}
}
public SynchronizationContextRemover GetAwaiter()
{
return this;
}
public void GetResult()
{
}
}
}
}

@ -11,8 +11,6 @@ using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.Collections;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Reflection;
using Volo.Abp.Threading;
namespace Volo.Abp.EventBus.Local
@ -21,7 +19,7 @@ namespace Volo.Abp.EventBus.Local
/// Implements EventBus as Singleton pattern.
/// </summary>
[ExposeServices(typeof(ILocalEventBus), typeof(LocalEventBus))]
public class LocalEventBus : ILocalEventBus, ISingletonDependency
public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
/// <summary>
/// Reference to the Logger.
@ -68,39 +66,7 @@ namespace Volo.Abp.EventBus.Local
}
/// <inheritdoc/>
public IDisposable Subscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class
{
return Subscribe(typeof(TEvent), new ActionEventHandler<TEvent>(action));
}
/// <inheritdoc/>
public IDisposable Subscribe<TEvent>(IEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}
/// <inheritdoc/>
public IDisposable Subscribe<TEvent, THandler>()
where TEvent : class
where THandler : IEventHandler, new()
{
return Subscribe(typeof(TEvent), new TransientEventHandlerFactory<THandler>());
}
/// <inheritdoc/>
public IDisposable Subscribe(Type eventType, IEventHandler handler)
{
return Subscribe(eventType, new SingleInstanceHandlerFactory(handler));
}
/// <inheritdoc/>
public IDisposable Subscribe<TEvent>(IEventHandlerFactory factory) where TEvent : class
{
return Subscribe(typeof(TEvent), factory);
}
/// <inheritdoc/>
public IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
GetOrCreateHandlerFactories(eventType)
.Locking(factories => factories.Add(factory));
@ -109,7 +75,7 @@ namespace Volo.Abp.EventBus.Local
}
/// <inheritdoc/>
public void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
{
Check.NotNull(action, nameof(action));
@ -137,13 +103,7 @@ namespace Volo.Abp.EventBus.Local
}
/// <inheritdoc/>
public void Unsubscribe<TEvent>(IEventHandler<TEvent> handler) where TEvent : class
{
Unsubscribe(typeof(TEvent), handler);
}
/// <inheritdoc/>
public void Unsubscribe(Type eventType, IEventHandler handler)
public override void Unsubscribe(Type eventType, IEventHandler handler)
{
GetOrCreateHandlerFactories(eventType)
.Locking(factories =>
@ -157,67 +117,22 @@ namespace Volo.Abp.EventBus.Local
}
/// <inheritdoc/>
public void Unsubscribe<TEvent>(IEventHandlerFactory factory) where TEvent : class
{
Unsubscribe(typeof(TEvent), factory);
}
/// <inheritdoc/>
public void Unsubscribe(Type eventType, IEventHandlerFactory factory)
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public void UnsubscribeAll<TEvent>() where TEvent : class
{
UnsubscribeAll(typeof(TEvent));
}
/// <inheritdoc/>
public void UnsubscribeAll(Type eventType)
public override void UnsubscribeAll(Type eventType)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
}
/// <inheritdoc/>
public Task PublishAsync<TEvent>(TEvent eventData) where TEvent : class
public override async Task PublishAsync(Type eventType, object eventData)
{
return PublishAsync(typeof(TEvent), eventData);
}
/// <inheritdoc/>
public async Task PublishAsync(Type eventType, object eventData)
{
//TODO: Aggregate all exceptions (including the recursive call)!
var exceptions = new List<Exception>();
await new SynchronizationContextRemover();
foreach (var handlerFactories in GetHandlerFactories(eventType))
{
foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
{
await TriggerAsyncHandlingException(handlerFactory, handlerFactories.EventType, eventData, exceptions);
}
}
//Implements generic argument inheritance. See IEventDataWithInheritableGenericArgument
if (eventType.GetTypeInfo().IsGenericType &&
eventType.GetGenericArguments().Length == 1 &&
typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
{
var genericArg = eventType.GetGenericArguments()[0];
var baseArg = genericArg.GetTypeInfo().BaseType;
if (baseArg != null)
{
var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg);
var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs);
await PublishAsync(baseEventType, baseEventData);
}
}
await TriggerHandlersAsync(eventType, eventData, exceptions);
if (exceptions.Any())
{
@ -230,57 +145,7 @@ namespace Volo.Abp.EventBus.Local
}
}
private async Task TriggerAsyncHandlingException(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData, List<Exception> exceptions)
{
using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
{
try
{
var handlerType = eventHandlerWrapper.EventHandler.GetType();
if (ReflectionHelper.IsAssignableToGenericType(
handlerType,
typeof(IEventHandler<>)))
{
var method = typeof(IEventHandler<>) //TODO: to a static field
.MakeGenericType(eventType)
.GetMethod(
nameof(IEventHandler<object>.HandleEventAsync),
new[] {eventType}
);
await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
}
else if (ReflectionHelper.IsAssignableToGenericType(
handlerType,
typeof(IDistributedEventHandler<>)))
{
var method = typeof(IDistributedEventHandler<>) //TODO: to a static field
.MakeGenericType(eventType)
.GetMethod(
nameof(IDistributedEventHandler<object>.HandleEventAsync),
new[] {eventType}
);
await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
}
else
{
throw new AbpException("The object instance is not an event handler. Object type: " + handlerType.AssemblyQualifiedName);
}
}
catch (TargetInvocationException ex)
{
exceptions.Add(ex.InnerException);
}
catch (Exception ex)
{
exceptions.Add(ex);
}
}
}
public virtual IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
@ -313,51 +178,5 @@ namespace Volo.Abp.EventBus.Local
return false;
}
public class EventTypeWithEventHandlerFactories
{
public Type EventType { get; }
public List<IEventHandlerFactory> EventHandlerFactories { get; }
public EventTypeWithEventHandlerFactories(Type eventType, List<IEventHandlerFactory> eventHandlerFactories)
{
EventType = eventType;
EventHandlerFactories = eventHandlerFactories;
}
}
// Reference from
// https://blogs.msdn.microsoft.com/benwilli/2017/02/09/an-alternative-to-configureawaitfalse-everywhere/
private struct SynchronizationContextRemover : INotifyCompletion
{
public bool IsCompleted
{
get { return SynchronizationContext.Current == null; }
}
public void OnCompleted(Action continuation)
{
var prevContext = SynchronizationContext.Current;
try
{
SynchronizationContext.SetSynchronizationContext(null);
continuation();
}
finally
{
SynchronizationContext.SetSynchronizationContext(prevContext);
}
}
public SynchronizationContextRemover GetAwaiter()
{
return this;
}
public void GetResult()
{
}
}
}
}
Loading…
Cancel
Save