Merge pull request #14608 from abpframework/liangshiwei/backgroundworkers

Enhance Background Workers integration
pull/14612/head
liangshiwei 3 years ago committed by GitHub
commit 8390790930
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -12,78 +12,69 @@ using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundWorkers.Hangfire;
[Dependency(ReplaceServices = true)]
public class HangfireBackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency
public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISingletonDependency
{
private AbpHangfireBackgroundJobServer _backgroundJobServer;
private readonly IServiceProvider _serviceProvider;
protected AbpHangfireBackgroundJobServer BackgroundJobServer { get; set; }
protected IServiceProvider ServiceProvider { get; }
public HangfireBackgroundWorkerManager(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
ServiceProvider = serviceProvider;
}
public Task StartAsync(CancellationToken cancellationToken = default)
public async override Task StartAsync(CancellationToken cancellationToken = default)
{
_backgroundJobServer = _serviceProvider.GetRequiredService<AbpHangfireBackgroundJobServer>();
return Task.CompletedTask;
BackgroundJobServer = ServiceProvider.GetRequiredService<AbpHangfireBackgroundJobServer>();
await base.StartAsync(cancellationToken);
}
public Task StopAsync(CancellationToken cancellationToken = default)
public async override Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
public Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{
if (worker is IHangfireBackgroundWorker hangfireBackgroundWorker)
switch (worker)
{
var unProxyWorker = ProxyHelper.UnProxy(hangfireBackgroundWorker);
if (hangfireBackgroundWorker.RecurringJobId.IsNullOrWhiteSpace())
case IHangfireBackgroundWorker hangfireBackgroundWorker:
{
RecurringJob.AddOrUpdate(() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone, hangfireBackgroundWorker.Queue);
}
else
{
RecurringJob.AddOrUpdate(hangfireBackgroundWorker.RecurringJobId, () => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone, hangfireBackgroundWorker.Queue);
}
}
else
{
int? period;
if (worker is AsyncPeriodicBackgroundWorkerBase or PeriodicBackgroundWorkerBase)
{
var timer = worker.GetType()
.GetProperty("Timer", BindingFlags.Instance | BindingFlags.NonPublic)?.GetValue(worker);
if (worker is AsyncPeriodicBackgroundWorkerBase)
var unProxyWorker = ProxyHelper.UnProxy(hangfireBackgroundWorker);
if (hangfireBackgroundWorker.RecurringJobId.IsNullOrWhiteSpace())
{
period = ((AbpAsyncTimer)timer)?.Period;
RecurringJob.AddOrUpdate(
() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone,
hangfireBackgroundWorker.Queue);
}
else
{
period = ((AbpTimer)timer)?.Period;
RecurringJob.AddOrUpdate(hangfireBackgroundWorker.RecurringJobId,
() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone,
hangfireBackgroundWorker.Queue);
}
break;
}
else
case AsyncPeriodicBackgroundWorkerBase or PeriodicBackgroundWorkerBase:
{
return Task.CompletedTask;
}
var timer = worker.GetType()
.GetProperty("Timer", BindingFlags.Instance | BindingFlags.NonPublic)?.GetValue(worker);
if (period == null)
{
return Task.CompletedTask;
}
var period = worker is AsyncPeriodicBackgroundWorkerBase ? ((AbpAsyncTimer)timer)?.Period : ((AbpTimer)timer)?.Period;
var adapterType = typeof(HangfirePeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker));
var workerAdapter = Activator.CreateInstance(adapterType) as IHangfireBackgroundWorker;
if (period == null)
{
return;
}
RecurringJob.AddOrUpdate(() => workerAdapter.DoWorkAsync(cancellationToken), GetCron(period.Value), workerAdapter.TimeZone, workerAdapter.Queue);
}
var adapterType = typeof(HangfirePeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker));
var workerAdapter = Activator.CreateInstance(adapterType) as IHangfireBackgroundWorker;
return Task.CompletedTask;
RecurringJob.AddOrUpdate(() => workerAdapter.DoWorkAsync(cancellationToken), GetCron(period.Value), workerAdapter.TimeZone, workerAdapter.Queue);
break;
}
default:
await base.AddAsync(worker, cancellationToken);
break;
}
}
protected virtual string GetCron(int period)
@ -105,9 +96,10 @@ public class HangfireBackgroundWorkerManager : IBackgroundWorkerManager, ISingle
}
else
{
throw new AbpException($"Cannot convert period: {period} to cron expression, use HangfireBackgroundWorkerBase to define worker");
throw new AbpException(
$"Cannot convert period: {period} to cron expression, use HangfireBackgroundWorkerBase to define worker");
}
return cron;
}
}
}

@ -8,78 +8,92 @@ using Volo.Abp.DynamicProxy;
namespace Volo.Abp.BackgroundWorkers.Quartz;
[Dependency(ReplaceServices = true)]
public class QuartzBackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency
public class QuartzBackgroundWorkerManager : BackgroundWorkerManager, ISingletonDependency
{
private readonly IScheduler _scheduler;
protected IScheduler Scheduler { get; }
public QuartzBackgroundWorkerManager(IScheduler scheduler)
{
_scheduler = scheduler;
Scheduler = scheduler;
}
public virtual async Task StartAsync(CancellationToken cancellationToken = default)
public async override Task StartAsync(CancellationToken cancellationToken = default)
{
if (_scheduler.IsStarted && _scheduler.InStandbyMode)
if (Scheduler.IsStarted && Scheduler.InStandbyMode)
{
await _scheduler.Start(cancellationToken);
await Scheduler.Start(cancellationToken);
}
await base.StartAsync(cancellationToken);
}
public virtual async Task StopAsync(CancellationToken cancellationToken = default)
public async override Task StopAsync(CancellationToken cancellationToken = default)
{
if (_scheduler.IsStarted && !_scheduler.InStandbyMode)
if (Scheduler.IsStarted && !Scheduler.InStandbyMode)
{
await _scheduler.Standby(cancellationToken);
await Scheduler.Standby(cancellationToken);
}
await base.StopAsync(cancellationToken);
}
public virtual async Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
public async override Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{
await ReScheduleJobAsync(worker, cancellationToken);
}
protected virtual async Task ReScheduleJobAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{
if (worker is IQuartzBackgroundWorker quartzWork)
switch (worker)
{
Check.NotNull(quartzWork.Trigger, nameof(quartzWork.Trigger));
Check.NotNull(quartzWork.JobDetail, nameof(quartzWork.JobDetail));
if (quartzWork.ScheduleJob != null)
case IQuartzBackgroundWorker quartzWork:
{
await quartzWork.ScheduleJob.Invoke(_scheduler);
Check.NotNull(quartzWork.Trigger, nameof(quartzWork.Trigger));
Check.NotNull(quartzWork.JobDetail, nameof(quartzWork.JobDetail));
if (quartzWork.ScheduleJob != null)
{
await quartzWork.ScheduleJob.Invoke(Scheduler);
}
else
{
await DefaultScheduleJobAsync(quartzWork, cancellationToken);
}
break;
}
else
case AsyncPeriodicBackgroundWorkerBase or PeriodicBackgroundWorkerBase:
{
await DefaultScheduleJobAsync(quartzWork, cancellationToken);
}
}
else
{
var adapterType = typeof(QuartzPeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker));
var adapterType = typeof(QuartzPeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker));
var workerAdapter = Activator.CreateInstance(adapterType) as IQuartzBackgroundWorkerAdapter;
var workerAdapter = Activator.CreateInstance(adapterType) as IQuartzBackgroundWorkerAdapter;
workerAdapter?.BuildWorker(worker);
workerAdapter?.BuildWorker(worker);
if (workerAdapter?.Trigger != null)
{
await DefaultScheduleJobAsync(workerAdapter, cancellationToken);
if (workerAdapter?.Trigger != null)
{
await DefaultScheduleJobAsync(workerAdapter, cancellationToken);
}
break;
}
default:
await base.AddAsync(worker, cancellationToken);
break;
}
}
protected virtual async Task DefaultScheduleJobAsync(IQuartzBackgroundWorker quartzWork, CancellationToken cancellationToken = default)
{
if (await _scheduler.CheckExists(quartzWork.JobDetail.Key, cancellationToken))
if (await Scheduler.CheckExists(quartzWork.JobDetail.Key, cancellationToken))
{
await _scheduler.AddJob(quartzWork.JobDetail, true, true, cancellationToken);
await _scheduler.ResumeJob(quartzWork.JobDetail.Key, cancellationToken);
await _scheduler.RescheduleJob(quartzWork.Trigger.Key, quartzWork.Trigger, cancellationToken);
await Scheduler.AddJob(quartzWork.JobDetail, true, true, cancellationToken);
await Scheduler.ResumeJob(quartzWork.JobDetail.Key, cancellationToken);
await Scheduler.RescheduleJob(quartzWork.Trigger.Key, quartzWork.Trigger, cancellationToken);
}
else
{
await _scheduler.ScheduleJob(quartzWork.JobDetail, quartzWork.Trigger, cancellationToken);
await Scheduler.ScheduleJob(quartzWork.JobDetail, quartzWork.Trigger, cancellationToken);
}
}
}

Loading…
Cancel
Save