|  |  |  | @ -4,6 +4,7 @@ using System.Threading; | 
			
		
	
		
			
				
					|  |  |  |  | using System.Threading.Tasks; | 
			
		
	
		
			
				
					|  |  |  |  | using Microsoft.Extensions.DependencyInjection; | 
			
		
	
		
			
				
					|  |  |  |  | using Microsoft.Extensions.Options; | 
			
		
	
		
			
				
					|  |  |  |  | using Nito.AsyncEx; | 
			
		
	
		
			
				
					|  |  |  |  | using Volo.Abp.DependencyInjection; | 
			
		
	
		
			
				
					|  |  |  |  | using Volo.Abp.Threading; | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  |  | @ -17,6 +18,8 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |         protected AbpBackgroundJobOptions Options { get; } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |         protected SemaphoreSlim SyncSemaphore { get; } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |         public JobQueueManager( | 
			
		
	
		
			
				
					|  |  |  |  |             IOptions<AbpBackgroundJobOptions> options, | 
			
		
	
		
			
				
					|  |  |  |  |             IServiceProvider serviceProvider) | 
			
		
	
	
		
			
				
					|  |  |  | @ -24,6 +27,7 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ | 
			
		
	
		
			
				
					|  |  |  |  |             ServiceProvider = serviceProvider; | 
			
		
	
		
			
				
					|  |  |  |  |             Options = options.Value; | 
			
		
	
		
			
				
					|  |  |  |  |             JobQueues = new ConcurrentDictionary<string, IRunnable>(); | 
			
		
	
		
			
				
					|  |  |  |  |             SyncSemaphore = new SemaphoreSlim(1, 1); | 
			
		
	
		
			
				
					|  |  |  |  |         } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |         public async Task StartAsync(CancellationToken cancellationToken = default) | 
			
		
	
	
		
			
				
					|  |  |  | @ -51,20 +55,31 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ | 
			
		
	
		
			
				
					|  |  |  |  |             JobQueues.Clear(); | 
			
		
	
		
			
				
					|  |  |  |  |         } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |         public IJobQueue<TArgs> Get<TArgs>() | 
			
		
	
		
			
				
					|  |  |  |  |         public async Task<IJobQueue<TArgs>> GetAsync<TArgs>() | 
			
		
	
		
			
				
					|  |  |  |  |         { | 
			
		
	
		
			
				
					|  |  |  |  |             var jobConfiguration = Options.GetJob(typeof(TArgs)); | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |             return (IJobQueue<TArgs>)JobQueues.GetOrAdd(jobConfiguration.JobName, _ => | 
			
		
	
		
			
				
					|  |  |  |  |             if (JobQueues.TryGetValue(jobConfiguration.JobName, out var jobQueue)) | 
			
		
	
		
			
				
					|  |  |  |  |             { | 
			
		
	
		
			
				
					|  |  |  |  |                 return (IJobQueue<TArgs>)jobQueue; | 
			
		
	
		
			
				
					|  |  |  |  |             } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |             using (await SyncSemaphore.LockAsync()) | 
			
		
	
		
			
				
					|  |  |  |  |             { | 
			
		
	
		
			
				
					|  |  |  |  |                 var jobQueue = (IRunnable) ServiceProvider | 
			
		
	
		
			
				
					|  |  |  |  |                     .GetRequiredService(typeof(IJobQueue<>) | 
			
		
	
		
			
				
					|  |  |  |  |                         .MakeGenericType(typeof(TArgs))); | 
			
		
	
		
			
				
					|  |  |  |  |                 if (JobQueues.TryGetValue(jobConfiguration.JobName, out jobQueue)) | 
			
		
	
		
			
				
					|  |  |  |  |                 { | 
			
		
	
		
			
				
					|  |  |  |  |                     return (IJobQueue<TArgs>)jobQueue; | 
			
		
	
		
			
				
					|  |  |  |  |                 } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |                 AsyncHelper.RunSync(() => jobQueue.StartAsync()); | 
			
		
	
		
			
				
					|  |  |  |  |                 jobQueue = (IJobQueue<TArgs>)ServiceProvider | 
			
		
	
		
			
				
					|  |  |  |  |                     .GetRequiredService(typeof(IJobQueue<>).MakeGenericType(typeof(TArgs))); | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |                 return jobQueue; | 
			
		
	
		
			
				
					|  |  |  |  |             }); | 
			
		
	
		
			
				
					|  |  |  |  |                 await jobQueue.StartAsync(); | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |                 JobQueues.TryAdd(jobConfiguration.JobName, jobQueue); | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |                 return (IJobQueue<TArgs>)jobQueue; | 
			
		
	
		
			
				
					|  |  |  |  |             } | 
			
		
	
		
			
				
					|  |  |  |  |         } | 
			
		
	
		
			
				
					|  |  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  |  | } | 
			
		
	
	
		
			
				
					|  |  |  | 
 |