Add `CancellationToken` paramter to `AddAsync` and ``DoWorkAsync` methods.

Resolve #12419
pull/12420/head
maliming 3 years ago
parent eafdce12a9
commit 0550c41ba0
No known key found for this signature in database
GPG Key ID: 096224957E51C89E

@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Volo.Abp.BackgroundWorkers.Hangfire;
@ -13,7 +14,7 @@ public abstract class HangfireBackgroundWorkerBase : BackgroundWorkerBase, IHang
public string Queue { get; set; }
public abstract Task DoWorkAsync();
public abstract Task DoWorkAsync(CancellationToken cancellationToken = default);
protected HangfireBackgroundWorkerBase()
{

@ -33,19 +33,19 @@ public class HangfireBackgroundWorkerManager : IBackgroundWorkerManager, ISingle
return Task.CompletedTask;
}
public Task AddAsync(IBackgroundWorker worker)
public Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{
if (worker is IHangfireBackgroundWorker hangfireBackgroundWorker)
{
var unProxyWorker = ProxyHelper.UnProxy(hangfireBackgroundWorker);
if (hangfireBackgroundWorker.RecurringJobId.IsNullOrWhiteSpace())
{
RecurringJob.AddOrUpdate(() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(),
RecurringJob.AddOrUpdate(() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone, hangfireBackgroundWorker.Queue);
}
else
{
RecurringJob.AddOrUpdate(hangfireBackgroundWorker.RecurringJobId, () => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(),
RecurringJob.AddOrUpdate(hangfireBackgroundWorker.RecurringJobId, () => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone, hangfireBackgroundWorker.Queue);
}
}
@ -80,7 +80,7 @@ public class HangfireBackgroundWorkerManager : IBackgroundWorkerManager, ISingle
var adapterType = typeof(HangfirePeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker));
var workerAdapter = Activator.CreateInstance(adapterType) as IHangfireBackgroundWorker;
RecurringJob.AddOrUpdate(() => workerAdapter.DoWorkAsync(), GetCron(period.Value), workerAdapter.TimeZone, workerAdapter.Queue);
RecurringJob.AddOrUpdate(() => workerAdapter.DoWorkAsync(cancellationToken), GetCron(period.Value), workerAdapter.TimeZone, workerAdapter.Queue);
}
return Task.CompletedTask;

@ -1,4 +1,5 @@
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
@ -17,9 +18,9 @@ public class HangfirePeriodicBackgroundWorkerAdapter<TWorker> : HangfireBackgrou
_doWorkMethod = typeof(TWorker).GetMethod("DoWork", BindingFlags.Instance | BindingFlags.NonPublic);
}
public async override Task DoWorkAsync()
public async override Task DoWorkAsync(CancellationToken cancellationToken = default)
{
var workerContext = new PeriodicBackgroundWorkerContext(ServiceProvider);
var workerContext = new PeriodicBackgroundWorkerContext(ServiceProvider, cancellationToken);
var worker = ServiceProvider.GetRequiredService<TWorker>();
switch (worker)

@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Volo.Abp.BackgroundWorkers.Hangfire;
@ -13,7 +14,5 @@ public interface IHangfireBackgroundWorker : IBackgroundWorker
string Queue { get; set; }
Task DoWorkAsync();
Task DoWorkAsync(CancellationToken cancellationToken = default);
}

@ -4,7 +4,6 @@ using System.Threading.Tasks;
using Quartz;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DynamicProxy;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundWorkers.Quartz;
@ -34,12 +33,12 @@ public class QuartzBackgroundWorkerManager : IBackgroundWorkerManager, ISingleto
}
}
public virtual async Task AddAsync(IBackgroundWorker worker)
public virtual async Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{
await ReScheduleJobAsync(worker);
await ReScheduleJobAsync(worker, cancellationToken);
}
protected virtual async Task ReScheduleJobAsync(IBackgroundWorker worker)
protected virtual async Task ReScheduleJobAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{
if (worker is IQuartzBackgroundWorker quartzWork)
{
@ -52,7 +51,7 @@ public class QuartzBackgroundWorkerManager : IBackgroundWorkerManager, ISingleto
}
else
{
await DefaultScheduleJobAsync(quartzWork);
await DefaultScheduleJobAsync(quartzWork, cancellationToken);
}
}
else
@ -65,22 +64,22 @@ public class QuartzBackgroundWorkerManager : IBackgroundWorkerManager, ISingleto
if (workerAdapter?.Trigger != null)
{
await DefaultScheduleJobAsync(workerAdapter);
await DefaultScheduleJobAsync(workerAdapter, cancellationToken);
}
}
}
protected virtual async Task DefaultScheduleJobAsync(IQuartzBackgroundWorker quartzWork)
protected virtual async Task DefaultScheduleJobAsync(IQuartzBackgroundWorker quartzWork, CancellationToken cancellationToken = default)
{
if (await _scheduler.CheckExists(quartzWork.JobDetail.Key))
if (await _scheduler.CheckExists(quartzWork.JobDetail.Key, cancellationToken))
{
await _scheduler.AddJob(quartzWork.JobDetail, true, true);
await _scheduler.ResumeJob(quartzWork.JobDetail.Key);
await _scheduler.RescheduleJob(quartzWork.Trigger.Key, quartzWork.Trigger);
await _scheduler.AddJob(quartzWork.JobDetail, true, true, cancellationToken);
await _scheduler.ResumeJob(quartzWork.JobDetail.Key, cancellationToken);
await _scheduler.RescheduleJob(quartzWork.Trigger.Key, quartzWork.Trigger, cancellationToken);
}
else
{
await _scheduler.ScheduleJob(quartzWork.JobDetail, quartzWork.Trigger);
await _scheduler.ScheduleJob(quartzWork.JobDetail, quartzWork.Trigger, cancellationToken);
}
}
}

@ -67,10 +67,10 @@ public class QuartzPeriodicBackgroundWorkerAdapter<TWorker> : QuartzBackgroundWo
.Build();
}
public override async Task Execute(IJobExecutionContext context)
public async override Task Execute(IJobExecutionContext context)
{
var worker = (IBackgroundWorker) ServiceProvider.GetService(typeof(TWorker));
var workerContext = new PeriodicBackgroundWorkerContext(ServiceProvider);
var workerContext = new PeriodicBackgroundWorkerContext(ServiceProvider, context.CancellationToken);
switch (worker)
{

@ -12,6 +12,7 @@ public abstract class AsyncPeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpAsyncTimer Timer { get; }
protected CancellationToken StartCancellationToken { get; set; }
protected AsyncPeriodicBackgroundWorkerBase(
AbpAsyncTimer timer,
@ -22,13 +23,15 @@ public abstract class AsyncPeriodicBackgroundWorkerBase : BackgroundWorkerBase
Timer.Elapsed = Timer_Elapsed;
}
public override async Task StartAsync(CancellationToken cancellationToken = default)
public async override Task StartAsync(CancellationToken cancellationToken = default)
{
StartCancellationToken = cancellationToken;
await base.StartAsync(cancellationToken);
Timer.Start(cancellationToken);
}
public override async Task StopAsync(CancellationToken cancellationToken = default)
public async override Task StopAsync(CancellationToken cancellationToken = default)
{
Timer.Stop(cancellationToken);
await base.StopAsync(cancellationToken);
@ -36,16 +39,16 @@ public abstract class AsyncPeriodicBackgroundWorkerBase : BackgroundWorkerBase
private async Task Timer_Elapsed(AbpAsyncTimer timer)
{
await DoWorkAsync();
await DoWorkAsync(StartCancellationToken);
}
private async Task DoWorkAsync()
private async Task DoWorkAsync(CancellationToken cancellationToken = default)
{
using (var scope = ServiceScopeFactory.CreateScope())
{
try
{
await DoWorkAsync(new PeriodicBackgroundWorkerContext(scope.ServiceProvider));
await DoWorkAsync(new PeriodicBackgroundWorkerContext(scope.ServiceProvider, cancellationToken));
}
catch (Exception ex)
{

@ -26,13 +26,13 @@ public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDepen
_backgroundWorkers = new List<IBackgroundWorker>();
}
public virtual async Task AddAsync(IBackgroundWorker worker)
public virtual async Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{
_backgroundWorkers.Add(worker);
if (IsRunning)
{
await worker.StartAsync();
await worker.StartAsync(cancellationToken);
}
}

@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Microsoft.Extensions.DependencyInjection;
@ -7,17 +8,17 @@ namespace Volo.Abp.BackgroundWorkers;
public static class BackgroundWorkersApplicationInitializationContextExtensions
{
public async static Task<ApplicationInitializationContext> AddBackgroundWorkerAsync<TWorker>([NotNull] this ApplicationInitializationContext context)
public async static Task<ApplicationInitializationContext> AddBackgroundWorkerAsync<TWorker>([NotNull] this ApplicationInitializationContext context, CancellationToken cancellationToken = default)
where TWorker : IBackgroundWorker
{
Check.NotNull(context, nameof(context));
await context.AddBackgroundWorkerAsync(typeof(TWorker));
await context.AddBackgroundWorkerAsync(typeof(TWorker), cancellationToken: cancellationToken);
return context;
}
public async static Task<ApplicationInitializationContext> AddBackgroundWorkerAsync([NotNull] this ApplicationInitializationContext context, [NotNull] Type workerType)
public async static Task<ApplicationInitializationContext> AddBackgroundWorkerAsync([NotNull] this ApplicationInitializationContext context, [NotNull] Type workerType, CancellationToken cancellationToken = default)
{
Check.NotNull(context, nameof(context));
Check.NotNull(workerType, nameof(workerType));
@ -29,9 +30,7 @@ public static class BackgroundWorkersApplicationInitializationContextExtensions
await context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.AddAsync(
(IBackgroundWorker)context.ServiceProvider.GetRequiredService(workerType)
);
.AddAsync((IBackgroundWorker)context.ServiceProvider.GetRequiredService(workerType), cancellationToken);
return context;
}

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundWorkers;
@ -14,5 +15,6 @@ public interface IBackgroundWorkerManager : IRunnable
/// <param name="worker">
/// The worker. It should be resolved from IOC.
/// </param>
Task AddAsync(IBackgroundWorker worker);
/// <param name="cancellationToken"></param>
Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default);
}

@ -1,4 +1,5 @@
using System;
using System.Threading;
namespace Volo.Abp.BackgroundWorkers;
@ -6,8 +7,17 @@ public class PeriodicBackgroundWorkerContext
{
public IServiceProvider ServiceProvider { get; }
public CancellationToken CancellationToken { get; }
public PeriodicBackgroundWorkerContext(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
CancellationToken = default;
}
public PeriodicBackgroundWorkerContext(IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
ServiceProvider = serviceProvider;
CancellationToken = cancellationToken;
}
}

Loading…
Cancel
Save