Implement Distribute Events Bus with Azure

Implement a new instance of the IDistributedEventBus that uses the
AzureServiceBus module.

Added a service that will replace the existing instance in the service
registry for the distributed event bus with the Azure instance.

Added configuration options for the Azure Event Bus.
pull/8375/head
Gideon de Swardt 5 years ago
parent 5abd71f6c7
commit 08e7aedbd9

@ -385,6 +385,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.AspNetCore.Mvc.UI.
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.AzureServiceBus", "src\Volo.Abp.AzureServiceBus\Volo.Abp.AzureServiceBus.csproj", "{808EC18E-C8CC-4F5C-82B6-984EADBBF85D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.EventBus.Azure", "src\Volo.Abp.EventBus.Azure\Volo.Abp.EventBus.Azure.csproj", "{FB27F78E-F10E-4810-9B8E-BCD67DCFC8A2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -1147,6 +1149,10 @@ Global
{808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Release|Any CPU.Build.0 = Release|Any CPU
{FB27F78E-F10E-4810-9B8E-BCD67DCFC8A2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FB27F78E-F10E-4810-9B8E-BCD67DCFC8A2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FB27F78E-F10E-4810-9B8E-BCD67DCFC8A2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FB27F78E-F10E-4810-9B8E-BCD67DCFC8A2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -1341,6 +1347,7 @@ Global
{B4B6B7DE-9798-4007-B1DF-7EE7929E392A} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{E9CE58DB-0789-4D18-8B63-474F7D7B14B4} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{808EC18E-C8CC-4F5C-82B6-984EADBBF85D} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{FB27F78E-F10E-4810-9B8E-BCD67DCFC8A2} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {BB97ECF4-9A84-433F-A80B-2A3285BDD1D5}

@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait ContinueOnCapturedContext="false" />
</Weavers>

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" />
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>

@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
<ProjectReference Include="..\Volo.Abp.AzureServiceBus\Volo.Abp.AzureServiceBus.csproj" />
</ItemGroup>
</Project>

@ -0,0 +1,11 @@
namespace Volo.Abp.EventBus.Azure
{
public class AbpAzureEventBusOptions
{
public string ConnectionName { get; set; }
public string SubscriberName { get; set; }
public string TopicName { get; set; }
}
}

@ -0,0 +1,28 @@
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.AzureServiceBus;
using Volo.Abp.Modularity;
namespace Volo.Abp.EventBus.Azure
{
[DependsOn(
typeof(AbpEventBusModule),
typeof(AbpAzureServiceBusModule)
)]
public class AbpEventBusAzureModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
Configure<AbpAzureEventBusOptions>(configuration.GetSection("Azure:EventBus"));
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context
.ServiceProvider
.GetRequiredService<AzureDistributedEventBus>()
.Initialize();
}
}
}

@ -0,0 +1,189 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.AzureServiceBus;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
namespace Volo.Abp.EventBus.Azure
{
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(AzureDistributedEventBus))]
public class AzureDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency
{
private readonly AbpAzureEventBusOptions _options;
private readonly AbpDistributedEventBusOptions _distributedEventBusOptions;
private readonly IAzureServiceBusMessageConsumerFactory _messageConsumerFactory;
private readonly IPublisherPool _publisherPool;
private readonly IAzureServiceBusSerializer _serializer;
private readonly ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories;
private readonly ConcurrentDictionary<string, Type> _eventTypes;
private IAzureServiceBusMessageConsumer _consumer;
public AzureDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IOptions<AbpAzureEventBusOptions> abpAzureEventBusOptions,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IAzureServiceBusSerializer serializer,
IAzureServiceBusMessageConsumerFactory messageConsumerFactory,
IPublisherPool publisherPool)
: base(serviceScopeFactory, currentTenant)
{
_options = abpAzureEventBusOptions.Value;
_distributedEventBusOptions = abpDistributedEventBusOptions.Value;
_serializer = serializer;
_messageConsumerFactory = messageConsumerFactory;
_publisherPool = publisherPool;
_handlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
_eventTypes = new ConcurrentDictionary<string, Type>();
}
public void Initialize()
{
_consumer = _messageConsumerFactory.CreateMessageConsumer(
_options.TopicName,
_options.SubscriberName,
_options.ConnectionName);
_consumer.OnMessageReceived(ProcessEventAsync);
SubscribeHandlers(_distributedEventBusOptions.Handlers);
}
private async Task ProcessEventAsync(ServiceBusReceivedMessage message)
{
var eventName = message.Subject;
var eventType = _eventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return;
}
var eventData = _serializer.Deserialize(message.Body, eventType);
await TriggerHandlersAsync(eventType, eventData);
}
public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
{
Check.NotNull(action, nameof(action));
GetOrCreateHandlerFactories(typeof(TEvent))
.Locking(factories =>
{
factories.RemoveAll(
factory =>
{
var singleInstanceFactory = factory as SingleInstanceHandlerFactory;
if (singleInstanceFactory == null)
{
return false;
}
var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEvent>;
if (actionHandler == null)
{
return false;
}
return actionHandler.Action == action;
});
});
}
public override void Unsubscribe(Type eventType, IEventHandler handler)
{
GetOrCreateHandlerFactories(eventType)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory handlerFactory &&
handlerFactory.HandlerInstance == handler
);
});
}
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
GetOrCreateHandlerFactories(eventType)
.Locking(factories => factories.Remove(factory));
}
public override void UnsubscribeAll(Type eventType)
{
GetOrCreateHandlerFactories(eventType)
.Locking(factories => factories.Clear());
}
public override async Task PublishAsync(Type eventType, object eventData)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = _serializer.Serialize(eventData);
var message = new ServiceBusMessage(body)
{
Subject = eventName
};
var publisher = await _publisherPool.GetAsync(
_options.TopicName,
_options.ConnectionName);
await publisher.SendMessageAsync(message);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
return _handlerFactories
.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))
.Select(handlerFactory =>
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value))
.ToArray();
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
return handlerEventType == targetEventType || handlerEventType.IsAssignableFrom(targetEventType);
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return _handlerFactories.GetOrAdd(
eventType,
type =>
{
var eventName = EventNameAttribute.GetNameOrDefault(type);
_eventTypes[eventName] = type;
return new List<IEventHandlerFactory>();
}
);
}
}
}
Loading…
Cancel
Save