From 647dd63e7e24f57dedb5bb47c3ce788fc12b66b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Halil=20=C4=B0brahim=20Kalkan?= Date: Tue, 17 Dec 2019 19:48:55 +0300 Subject: [PATCH] Make IJobQueueManager.Get method async to reduce AsyncHelper usage --- .../RabbitMQ/IJobQueueManager.cs | 5 +-- .../RabbitMQ/JobQueueManager.cs | 31 ++++++++++++++----- .../RabbitMQ/RabbitMqBackgroundJobManager.cs | 7 ++--- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueueManager.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueueManager.cs index da560d1ed2..8cafb8c247 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueueManager.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueueManager.cs @@ -1,9 +1,10 @@ -using Volo.Abp.Threading; +using System.Threading.Tasks; +using Volo.Abp.Threading; namespace Volo.Abp.BackgroundJobs.RabbitMQ { public interface IJobQueueManager : IRunnable { - IJobQueue Get(); + Task> GetAsync(); } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueManager.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueManager.cs index 73b7c21d2a..f07564ad5a 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueManager.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueManager.cs @@ -4,6 +4,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; +using Nito.AsyncEx; using Volo.Abp.DependencyInjection; using Volo.Abp.Threading; @@ -17,6 +18,8 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ protected AbpBackgroundJobOptions Options { get; } + protected SemaphoreSlim SyncSemaphore { get; } + public JobQueueManager( IOptions options, IServiceProvider serviceProvider) @@ -24,6 +27,7 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ ServiceProvider = serviceProvider; Options = options.Value; JobQueues = new ConcurrentDictionary(); + SyncSemaphore = new SemaphoreSlim(1, 1); } public async Task StartAsync(CancellationToken cancellationToken = default) @@ -51,20 +55,31 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ JobQueues.Clear(); } - public IJobQueue Get() + public async Task> GetAsync() { var jobConfiguration = Options.GetJob(typeof(TArgs)); - return (IJobQueue)JobQueues.GetOrAdd(jobConfiguration.JobName, _ => + if (JobQueues.TryGetValue(jobConfiguration.JobName, out var jobQueue)) + { + return (IJobQueue)jobQueue; + } + + using (await SyncSemaphore.LockAsync()) { - var jobQueue = (IRunnable) ServiceProvider - .GetRequiredService(typeof(IJobQueue<>) - .MakeGenericType(typeof(TArgs))); + if (JobQueues.TryGetValue(jobConfiguration.JobName, out jobQueue)) + { + return (IJobQueue)jobQueue; + } - AsyncHelper.RunSync(() => jobQueue.StartAsync()); + jobQueue = (IJobQueue)ServiceProvider + .GetRequiredService(typeof(IJobQueue<>).MakeGenericType(typeof(TArgs))); - return jobQueue; - }); + await jobQueue.StartAsync(); + + JobQueues.TryAdd(jobConfiguration.JobName, jobQueue); + + return (IJobQueue)jobQueue; + } } } } diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/RabbitMqBackgroundJobManager.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/RabbitMqBackgroundJobManager.cs index c2976e43fb..f70d553128 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/RabbitMqBackgroundJobManager.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/RabbitMqBackgroundJobManager.cs @@ -14,14 +14,13 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ _jobQueueManager = jobQueueManager; } - public Task EnqueueAsync( + public async Task EnqueueAsync( TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { - return _jobQueueManager - .Get() - .EnqueueAsync(args, priority, delay); + var jobQueue = await _jobQueueManager.GetAsync(); + return await jobQueue.EnqueueAsync(args, priority, delay); } } }