Implemented channel pooling.

pull/395/head
Halil ibrahim Kalkan 7 years ago
parent 3c87b72d75
commit ce444f8734

@ -111,7 +111,7 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
return Task.CompletedTask;
}
ChannelAccessor = ChannelPool.Acquire(QueueName);
ChannelAccessor = ChannelPool.Acquire(QueueName + ".JobQueue");
var queueOptions = RabbitMqOptions.Queues.GetOrDefault(QueueName)
?? new QueueOptions(QueueName);

@ -16,7 +16,10 @@ namespace Volo.Abp.RabbitMQ
public override void OnApplicationShutdown(ApplicationShutdownContext context)
{
context.ServiceProvider.GetRequiredService<IConnectionPool>().Dispose();
context.ServiceProvider
.GetRequiredService<IConnectionPool>()
.Dispose();
//TODO: Dispose channel pool when it's implemented!
}
}

@ -1,4 +1,7 @@
using RabbitMQ.Client;
using System;
using System.Collections.Concurrent;
using System.Threading;
using RabbitMQ.Client;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.RabbitMQ
@ -7,16 +10,43 @@ namespace Volo.Abp.RabbitMQ
{
protected IConnectionPool ConnectionPool { get; }
protected ConcurrentDictionary<string, ChannelPoolItem> Channels { get; }
public ChannelPool(IConnectionPool connectionPool)
{
ConnectionPool = connectionPool;
Channels = new ConcurrentDictionary<string, ChannelPoolItem>();
}
public virtual IChannelAccessor Acquire(string channelName = null)
{
//TODO: Pool channels!
channelName = channelName ?? "";
var poolItem = Channels.GetOrAdd(channelName, _ => new ChannelPoolItem
{
Channel = CreateChannel(channelName)
});
lock (poolItem)
{
while (poolItem.IsInUse)
{
Monitor.Wait(poolItem);
}
poolItem.IsInUse = true;
}
return new ChannelAccessor(
CreateChannel(channelName)
poolItem.Channel,
() =>
{
lock (poolItem)
{
poolItem.IsInUse = false;
Monitor.PulseAll(poolItem);
}
}
);
}
@ -29,15 +59,17 @@ namespace Volo.Abp.RabbitMQ
protected class ChannelAccessor : IChannelAccessor
{
public IModel Channel { get; }
private readonly Action _disposeAction;
public ChannelAccessor(IModel channel)
public ChannelAccessor(IModel channel, Action disposeAction)
{
_disposeAction = disposeAction;
Channel = channel;
}
public void Dispose()
{
Channel.Dispose();
_disposeAction.Invoke();
}
}
}

@ -0,0 +1,16 @@
using RabbitMQ.Client;
namespace Volo.Abp.RabbitMQ
{
public class ChannelPoolItem
{
public IModel Channel { get; set; }
public bool IsInUse
{
get => _isInUse;
set => _isInUse = value;
}
private volatile bool _isInUse;
}
}

@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
@ -7,7 +6,7 @@ using Volo.Abp.DependencyInjection;
namespace Volo.Abp.RabbitMQ
{
public class ConnectionPool : IConnectionPool, IDisposable, ISingletonDependency
public class ConnectionPool : IConnectionPool, ISingletonDependency
{
protected AbpRabbitMqOptions Options { get; }

@ -5,6 +5,11 @@ namespace Volo.Abp.RabbitMQ
{
public interface IChannelAccessor : IDisposable
{
/// <summary>
/// Reference to the channel.
/// Never dispose the <see cref="Channel"/> object.
/// Instead, dispose the <see cref="IChannelAccessor"/> after usage.
/// </summary>
IModel Channel { get; }
}
}

@ -9,7 +9,7 @@ namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs
{
if (RandomHelper.GetRandom(0, 100) < 70)
{
throw new ApplicationException("A sample exception from the WriteToConsoleGreenJob!");
//throw new ApplicationException("A sample exception from the WriteToConsoleGreenJob!");
}
var oldColor = Console.ForegroundColor;

Loading…
Cancel
Save