From a30f50ed9a11301fff9cf618670b37e8e884439b Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Fri, 11 Nov 2022 16:50:46 +0800 Subject: [PATCH] Enhance Background Workers integration --- .../HangfireBackgroundWorkerManager.cs | 94 +++++++++---------- .../Quartz/QuartzBackgroundWorkerManager.cs | 80 +++++++++------- 2 files changed, 90 insertions(+), 84 deletions(-) diff --git a/framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs b/framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs index 0dc11e7e0a..594f01343e 100644 --- a/framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs +++ b/framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs @@ -12,78 +12,69 @@ using Volo.Abp.Threading; namespace Volo.Abp.BackgroundWorkers.Hangfire; [Dependency(ReplaceServices = true)] -public class HangfireBackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency +public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISingletonDependency { - private AbpHangfireBackgroundJobServer _backgroundJobServer; - private readonly IServiceProvider _serviceProvider; + protected AbpHangfireBackgroundJobServer BackgroundJobServer { get; set; } + protected IServiceProvider ServiceProvider { get; } public HangfireBackgroundWorkerManager(IServiceProvider serviceProvider) { - _serviceProvider = serviceProvider; + ServiceProvider = serviceProvider; } - public Task StartAsync(CancellationToken cancellationToken = default) + public async override Task StartAsync(CancellationToken cancellationToken = default) { - _backgroundJobServer = _serviceProvider.GetRequiredService(); - return Task.CompletedTask; + BackgroundJobServer = ServiceProvider.GetRequiredService(); + await base.StartAsync(cancellationToken); } - public Task StopAsync(CancellationToken cancellationToken = default) + public async override Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default) { - return Task.CompletedTask; - } - - public Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default) - { - if (worker is IHangfireBackgroundWorker hangfireBackgroundWorker) + switch (worker) { - var unProxyWorker = ProxyHelper.UnProxy(hangfireBackgroundWorker); - if (hangfireBackgroundWorker.RecurringJobId.IsNullOrWhiteSpace()) + case IHangfireBackgroundWorker hangfireBackgroundWorker: { - RecurringJob.AddOrUpdate(() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken), - hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone, hangfireBackgroundWorker.Queue); - } - else - { - RecurringJob.AddOrUpdate(hangfireBackgroundWorker.RecurringJobId, () => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken), - hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone, hangfireBackgroundWorker.Queue); - } - } - else - { - int? period; - - if (worker is AsyncPeriodicBackgroundWorkerBase or PeriodicBackgroundWorkerBase) - { - var timer = worker.GetType() - .GetProperty("Timer", BindingFlags.Instance | BindingFlags.NonPublic)?.GetValue(worker); - - if (worker is AsyncPeriodicBackgroundWorkerBase) + var unProxyWorker = ProxyHelper.UnProxy(hangfireBackgroundWorker); + if (hangfireBackgroundWorker.RecurringJobId.IsNullOrWhiteSpace()) { - period = ((AbpAsyncTimer)timer)?.Period; + RecurringJob.AddOrUpdate( + () => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken), + hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone, + hangfireBackgroundWorker.Queue); } else { - period = ((AbpTimer)timer)?.Period; + RecurringJob.AddOrUpdate(hangfireBackgroundWorker.RecurringJobId, + () => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken), + hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone, + hangfireBackgroundWorker.Queue); } + + break; } - else + case AsyncPeriodicBackgroundWorkerBase or PeriodicBackgroundWorkerBase: { - return Task.CompletedTask; - } + var timer = worker.GetType() + .GetProperty("Timer", BindingFlags.Instance | BindingFlags.NonPublic)?.GetValue(worker); - if (period == null) - { - return Task.CompletedTask; - } + var period = worker is AsyncPeriodicBackgroundWorkerBase ? ((AbpAsyncTimer)timer)?.Period : ((AbpTimer)timer)?.Period; - var adapterType = typeof(HangfirePeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker)); - var workerAdapter = Activator.CreateInstance(adapterType) as IHangfireBackgroundWorker; + if (period == null) + { + return; + } - RecurringJob.AddOrUpdate(() => workerAdapter.DoWorkAsync(cancellationToken), GetCron(period.Value), workerAdapter.TimeZone, workerAdapter.Queue); - } + var adapterType = typeof(HangfirePeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker)); + var workerAdapter = Activator.CreateInstance(adapterType) as IHangfireBackgroundWorker; - return Task.CompletedTask; + RecurringJob.AddOrUpdate(() => workerAdapter.DoWorkAsync(cancellationToken), GetCron(period.Value), workerAdapter.TimeZone, workerAdapter.Queue); + + break; + } + default: + await base.AddAsync(worker, cancellationToken); + break; + } } protected virtual string GetCron(int period) @@ -105,9 +96,10 @@ public class HangfireBackgroundWorkerManager : IBackgroundWorkerManager, ISingle } else { - throw new AbpException($"Cannot convert period: {period} to cron expression, use HangfireBackgroundWorkerBase to define worker"); + throw new AbpException( + $"Cannot convert period: {period} to cron expression, use HangfireBackgroundWorkerBase to define worker"); } return cron; } -} +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerManager.cs b/framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerManager.cs index e1e3a86140..9854379c79 100644 --- a/framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerManager.cs +++ b/framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerManager.cs @@ -8,78 +8,92 @@ using Volo.Abp.DynamicProxy; namespace Volo.Abp.BackgroundWorkers.Quartz; [Dependency(ReplaceServices = true)] -public class QuartzBackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency +public class QuartzBackgroundWorkerManager : BackgroundWorkerManager, ISingletonDependency { - private readonly IScheduler _scheduler; + protected IScheduler Scheduler { get; } public QuartzBackgroundWorkerManager(IScheduler scheduler) { - _scheduler = scheduler; + Scheduler = scheduler; } - public virtual async Task StartAsync(CancellationToken cancellationToken = default) + public async override Task StartAsync(CancellationToken cancellationToken = default) { - if (_scheduler.IsStarted && _scheduler.InStandbyMode) + if (Scheduler.IsStarted && Scheduler.InStandbyMode) { - await _scheduler.Start(cancellationToken); + await Scheduler.Start(cancellationToken); } + + await base.StartAsync(cancellationToken); } - public virtual async Task StopAsync(CancellationToken cancellationToken = default) + public async override Task StopAsync(CancellationToken cancellationToken = default) { - if (_scheduler.IsStarted && !_scheduler.InStandbyMode) + if (Scheduler.IsStarted && !Scheduler.InStandbyMode) { - await _scheduler.Standby(cancellationToken); + await Scheduler.Standby(cancellationToken); } + + await base.StopAsync(cancellationToken); } - public virtual async Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default) + public async override Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default) { await ReScheduleJobAsync(worker, cancellationToken); } protected virtual async Task ReScheduleJobAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default) { - if (worker is IQuartzBackgroundWorker quartzWork) + switch (worker) { - Check.NotNull(quartzWork.Trigger, nameof(quartzWork.Trigger)); - Check.NotNull(quartzWork.JobDetail, nameof(quartzWork.JobDetail)); - - if (quartzWork.ScheduleJob != null) + case IQuartzBackgroundWorker quartzWork: { - await quartzWork.ScheduleJob.Invoke(_scheduler); + Check.NotNull(quartzWork.Trigger, nameof(quartzWork.Trigger)); + Check.NotNull(quartzWork.JobDetail, nameof(quartzWork.JobDetail)); + + if (quartzWork.ScheduleJob != null) + { + await quartzWork.ScheduleJob.Invoke(Scheduler); + } + else + { + await DefaultScheduleJobAsync(quartzWork, cancellationToken); + } + + break; } - else + case AsyncPeriodicBackgroundWorkerBase or PeriodicBackgroundWorkerBase: { - await DefaultScheduleJobAsync(quartzWork, cancellationToken); - } - } - else - { - var adapterType = typeof(QuartzPeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker)); + var adapterType = typeof(QuartzPeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker)); - var workerAdapter = Activator.CreateInstance(adapterType) as IQuartzBackgroundWorkerAdapter; + var workerAdapter = Activator.CreateInstance(adapterType) as IQuartzBackgroundWorkerAdapter; - workerAdapter?.BuildWorker(worker); + workerAdapter?.BuildWorker(worker); - if (workerAdapter?.Trigger != null) - { - await DefaultScheduleJobAsync(workerAdapter, cancellationToken); + if (workerAdapter?.Trigger != null) + { + await DefaultScheduleJobAsync(workerAdapter, cancellationToken); + } + + break; } + default: + await base.AddAsync(worker, cancellationToken); + break; } } protected virtual async Task DefaultScheduleJobAsync(IQuartzBackgroundWorker quartzWork, CancellationToken cancellationToken = default) { - if (await _scheduler.CheckExists(quartzWork.JobDetail.Key, cancellationToken)) + if (await Scheduler.CheckExists(quartzWork.JobDetail.Key, cancellationToken)) { - await _scheduler.AddJob(quartzWork.JobDetail, true, true, cancellationToken); - await _scheduler.ResumeJob(quartzWork.JobDetail.Key, cancellationToken); - await _scheduler.RescheduleJob(quartzWork.Trigger.Key, quartzWork.Trigger, cancellationToken); + await Scheduler.AddJob(quartzWork.JobDetail, true, true, cancellationToken); + await Scheduler.ResumeJob(quartzWork.JobDetail.Key, cancellationToken); + await Scheduler.RescheduleJob(quartzWork.Trigger.Key, quartzWork.Trigger, cancellationToken); } else { - await _scheduler.ScheduleJob(quartzWork.JobDetail, quartzWork.Trigger, cancellationToken); + await Scheduler.ScheduleJob(quartzWork.JobDetail, quartzWork.Trigger, cancellationToken); } } }