Merge pull request #15282 from ahmetfarukulu/bug/bg-job-cancellation

CancellationToken parameter added for background jobs
pull/15841/head
Halil İbrahim Kalkan 3 years ago committed by GitHub
commit a995aff5a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -75,6 +75,42 @@ This job simply uses `IEmailSender` to send emails (see [email sending document]
A background job should not hide exceptions. If it throws an exception, the background job is automatically re-tried after a calculated waiting time. Hide exceptions only if you don't want to re-run the background job for the current argument.
#### Cancelling Background Jobs
If your background task is cancellable, then you can use the standard [Cancellation Token](Cancellation-Token-Provider.md) system to obtain a `CancellationToken` to cancel your job when requested. See the following example that uses the `ICancellationTokenProvider` to obtain the cancellation token:
```csharp
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;
namespace MyProject
{
public class LongRunningJob : AsyncBackgroundJob<LongRunningJobArgs>, ITransientDependency
{
private readonly ICancellationTokenProvider _cancellationTokenProvider;
public LongRunningJob(ICancellationTokenProvider cancellationTokenProvider)
{
_cancellationTokenProvider = cancellationTokenProvider;
}
public override async Task ExecuteAsync(LongRunningJobArgs args)
{
foreach (var id in args.Ids)
{
_cancellationTokenProvider.Token.ThrowIfCancellationRequested();
await ProcessAsync(id); // code omitted for brevity
}
}
}
}
```
> A cancellation operation might be needed if the application is shutting down and we don't want to block the application in the background job. This example throws an exception if the cancellation is requested. So, the job will be retried the next time the application starts. If you don't want that, just return from the `ExecuteAsync` method without throwing any exception (you can simply check the `_cancellationTokenProvider.Token.IsCancellationRequested` property).
#### Job Name
Each background job has a name. Job names are used in several places. For example, RabbitMQ provider uses job names to determine the RabbitMQ Queue names.

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@ -6,7 +7,7 @@ namespace Volo.Abp.BackgroundJobs;
public abstract class AsyncBackgroundJob<TArgs> : IAsyncBackgroundJob<TArgs>
{
//TODO: Add UOW, Localization and other useful properties..?
//TODO: Add UOW, Localization, CancellationTokenProvider and other useful properties..?
public ILogger<AsyncBackgroundJob<TArgs>> Logger { get; set; }

@ -1,3 +1,4 @@
using System.Threading;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@ -5,7 +6,7 @@ namespace Volo.Abp.BackgroundJobs;
public abstract class BackgroundJob<TArgs> : IBackgroundJob<TArgs>
{
//TODO: Add UOW, Localization and other useful properties..?
//TODO: Add UOW, Localization, CancellationTokenProvider and other useful properties..?
public ILogger<BackgroundJob<TArgs>> Logger { get; set; }

@ -7,6 +7,7 @@ using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.DependencyInjection;
using Volo.Abp.ExceptionHandling;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs;
@ -46,13 +47,19 @@ public class BackgroundJobExecuter : IBackgroundJobExecuter, ITransientDependenc
{
using(CurrentTenant.Change(GetJobArgsTenantId(context.JobArgs)))
{
if (jobExecuteMethod.Name == nameof(IAsyncBackgroundJob<object>.ExecuteAsync))
{
await ((Task)jobExecuteMethod.Invoke(job, new[] { context.JobArgs }));
}
else
var cancellationTokenProvider =
context.ServiceProvider.GetRequiredService<ICancellationTokenProvider>();
using (cancellationTokenProvider.Use(context.CancellationToken))
{
jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
if (jobExecuteMethod.Name == nameof(IAsyncBackgroundJob<object>.ExecuteAsync))
{
await ((Task)jobExecuteMethod.Invoke(job, new[] { context.JobArgs }));
}
else
{
jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
}
}
}

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace Volo.Abp.BackgroundJobs;

@ -1,4 +1,6 @@
namespace Volo.Abp.BackgroundJobs;
using System.Threading;
namespace Volo.Abp.BackgroundJobs;
/// <summary>
/// Defines interface of a background job.

@ -1,4 +1,5 @@
using System;
using System.Threading;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.BackgroundJobs;
@ -11,10 +12,17 @@ public class JobExecutionContext : IServiceProviderAccessor
public object JobArgs { get; }
public JobExecutionContext(IServiceProvider serviceProvider, Type jobType, object jobArgs)
public CancellationToken CancellationToken { get; }
public JobExecutionContext(
IServiceProvider serviceProvider,
Type jobType,
object jobArgs,
CancellationToken cancellationToken = default)
{
ServiceProvider = serviceProvider;
JobType = jobType;
JobArgs = jobArgs;
CancellationToken = cancellationToken;
}
}

@ -12,22 +12,22 @@ namespace Volo.Abp.BackgroundJobs.Hangfire;
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
protected AbpBackgroundJobOptions Options { get; }
public HangfireBackgroundJobManager(IOptions<AbpBackgroundJobOptions> options)
{
Options = options.Value;
}
public virtual Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
TimeSpan? delay = null)
{
return Task.FromResult(delay.HasValue
? BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
adapter => adapter.ExecuteAsync(GetQueueName(typeof(TArgs)),args),
adapter => adapter.ExecuteAsync(GetQueueName(typeof(TArgs)), args, default),
delay.Value
)
: BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
adapter => adapter.ExecuteAsync(GetQueueName(typeof(TArgs)) ,args)
adapter => adapter.ExecuteAsync(GetQueueName(typeof(TArgs)), args, default)
));
}

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
@ -21,8 +22,8 @@ public class HangfireJobExecutionAdapter<TArgs>
Options = options.Value;
}
[Queue("{0}")]
public async Task ExecuteAsync(string queue, TArgs args)
[Queue("{0}")]
public async Task ExecuteAsync(string queue, TArgs args, CancellationToken cancellationToken = default)
{
if (!Options.IsJobExecutionEnabled)
{
@ -38,7 +39,7 @@ public class HangfireJobExecutionAdapter<TArgs>
using (var scope = ServiceScopeFactory.CreateScope())
{
var jobType = Options.GetJob(typeof(TArgs)).JobType;
var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);
var context = new JobExecutionContext(scope.ServiceProvider, jobType, args, cancellationToken: cancellationToken);
await JobExecuter.ExecuteAsync(context);
}
}

@ -40,7 +40,7 @@ public class QuartzJobExecutionAdapter<TArgs> : IJob
{
var args = JsonSerializer.Deserialize<TArgs>(context.JobDetail.JobDataMap.GetString(nameof(TArgs)));
var jobType = Options.GetJob(typeof(TArgs)).JobType;
var jobContext = new JobExecutionContext(scope.ServiceProvider, jobType, args);
var jobContext = new JobExecutionContext(scope.ServiceProvider, jobType, args, cancellationToken: context.CancellationToken);
try
{
await JobExecuter.ExecuteAsync(jobContext);

@ -68,7 +68,8 @@ public class BackgroundJobWorker : AsyncPeriodicBackgroundWorkerBase, IBackgroun
var context = new JobExecutionContext(
workerContext.ServiceProvider,
jobConfiguration.JobType,
jobArgs);
jobArgs,
workerContext.CancellationToken);
try
{

@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.BackgroundJobs;
using Volo.Abp.DependencyInjection;

@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
@ -59,7 +60,7 @@ public class BackgroundJobExecuter_Tests : BackgroundJobsTestBase
jobObject.ExecutedValues.ShouldContain("42");
}
[Fact]
public async Task Should_Change_TenantId_If_EventData_Is_MultiTenant()
{
@ -77,7 +78,7 @@ public class BackgroundJobExecuter_Tests : BackgroundJobsTestBase
new MyJobArgs("42", tenantId)
)
);
await _backgroundJobExecuter.ExecuteAsync(
new JobExecutionContext(
ServiceProvider,
@ -91,4 +92,48 @@ public class BackgroundJobExecuter_Tests : BackgroundJobsTestBase
jobObject.TenantId.ShouldBe(tenantId);
asyncJobObject.TenantId.ShouldBe(tenantId);
}
[Fact]
public async Task Should_Cancel_Job()
{
//Arrange
var cts = new CancellationTokenSource();
cts.Cancel();
var jobObject = GetRequiredService<MyJob>();
jobObject.ExecutedValues.ShouldBeEmpty();
//Act
await _backgroundJobExecuter.ExecuteAsync(
new JobExecutionContext(
ServiceProvider,
typeof(MyJob),
new MyJobArgs("42"),
cts.Token
)
);
//Assert
jobObject.Canceled.ShouldBeTrue();
//Arrange
var asyncCts = new CancellationTokenSource();
asyncCts.Cancel();
var asyncJobObject = GetRequiredService<MyAsyncJob>();
asyncJobObject.ExecutedValues.ShouldBeEmpty();
//Act
await _backgroundJobExecuter.ExecuteAsync(
new JobExecutionContext(
ServiceProvider,
typeof(MyAsyncJob),
new MyAsyncJobArgs("42"),
asyncCts.Token
)
);
//Assert
asyncJobObject.Canceled.ShouldBeTrue();
}
}

@ -1,26 +1,39 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs;
public class MyAsyncJob : AsyncBackgroundJob<MyAsyncJobArgs>, ISingletonDependency
{
public List<string> ExecutedValues { get; } = new List<string>();
public Guid? TenantId { get; set; }
private readonly ICurrentTenant _currentTenant;
private readonly ICancellationTokenProvider _cancellationTokenProvider;
public MyAsyncJob(ICurrentTenant currentTenant)
public bool Canceled { get; set; }
public MyAsyncJob(
ICurrentTenant currentTenant,
ICancellationTokenProvider cancellationTokenProvider)
{
_currentTenant = currentTenant;
_cancellationTokenProvider = cancellationTokenProvider;
}
public override Task ExecuteAsync(MyAsyncJobArgs args)
{
if (_cancellationTokenProvider.Token.IsCancellationRequested)
{
Canceled = true;
}
ExecutedValues.Add(args.Value);
TenantId = _currentTenant.Id;
return Task.CompletedTask;

@ -1,25 +1,38 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Volo.Abp.DependencyInjection;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs;
public class MyJob : BackgroundJob<MyJobArgs>, ISingletonDependency
{
public List<string> ExecutedValues { get; } = new List<string>();
public Guid? TenantId { get; set; }
private readonly ICurrentTenant _currentTenant;
public MyJob(ICurrentTenant currentTenant)
private readonly ICancellationTokenProvider _cancellationTokenProvider;
public bool Canceled { get; set; }
public MyJob(
ICurrentTenant currentTenant,
ICancellationTokenProvider cancellationTokenProvider)
{
_currentTenant = currentTenant;
_cancellationTokenProvider = cancellationTokenProvider;
}
public override void Execute(MyJobArgs args)
{
if (_cancellationTokenProvider.Token.IsCancellationRequested)
{
Canceled = true;
}
ExecutedValues.Add(args.Value);
TenantId = _currentTenant.Id;
}

@ -1,23 +1,40 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs.DemoApp.HangFire;
class Program
{
static void Main(string[] args)
async static Task Main(string[] args)
{
using (var application = AbpApplicationFactory.Create<DemoAppHangfireModule>(options =>
using (var application = await AbpApplicationFactory.CreateAsync<DemoAppHangfireModule>(options =>
{
options.UseAutofac();
}))
{
options.UseAutofac();
}))
{
application.Initialize();
await application.InitializeAsync();
await CancelableBackgroundJobAsync(application.ServiceProvider);
Console.WriteLine("Started: " + typeof(Program).Namespace);
Console.WriteLine("Press ENTER to stop the application..!");
Console.ReadLine();
application.Shutdown();
await application.ShutdownAsync();
}
}
private async static Task CancelableBackgroundJobAsync(IServiceProvider serviceProvider)
{
var backgroundJobManager = serviceProvider.GetRequiredService<IBackgroundJobManager>();
var jobId = await backgroundJobManager.EnqueueAsync(new LongRunningJobArgs { Value = "test-1" });
await backgroundJobManager.EnqueueAsync(new LongRunningJobArgs { Value = "test-2" });
Thread.Sleep(1000);
BackgroundJob.Delete(jobId);
}
}

@ -1,23 +1,41 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Quartz;
using Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs.DemoApp.Quartz;
class Program
{
static void Main(string[] args)
async static Task Main(string[] args)
{
using (var application = AbpApplicationFactory.Create<DemoAppQuartzModule>(options =>
using (var application = await AbpApplicationFactory.CreateAsync<DemoAppQuartzModule>(options =>
{
options.UseAutofac();
}))
{
application.Initialize();
await application.InitializeAsync();
await CancelableBackgroundJobAsync(application.ServiceProvider);
Console.WriteLine("Started: " + typeof(Program).Namespace);
Console.WriteLine("Press ENTER to stop the application..!");
Console.ReadLine();
application.Shutdown();
await application.ShutdownAsync();
}
}
private async static Task CancelableBackgroundJobAsync(IServiceProvider serviceProvider)
{
var backgroundJobManager = serviceProvider.GetRequiredService<IBackgroundJobManager>();
var jobId = await backgroundJobManager.EnqueueAsync(new LongRunningJobArgs {Value = "test-1"});
await backgroundJobManager.EnqueueAsync(new LongRunningJobArgs { Value = "test-2" });
Thread.Sleep(1000);
var scheduler = serviceProvider.GetRequiredService<IScheduler>();
await scheduler.Interrupt(new JobKey(jobId.Split('.')[1],jobId.Split('.')[0]));
}
}

@ -1,23 +1,24 @@
using System;
using System.Threading.Tasks;
namespace Volo.Abp.BackgroundJobs.DemoApp.RabbitMq;
class Program
{
static void Main(string[] args)
async static Task Main(string[] args)
{
using (var application = AbpApplicationFactory.Create<DemoAppRabbitMqModule>(options =>
using (var application = await AbpApplicationFactory.CreateAsync<DemoAppRabbitMqModule>(options =>
{
options.UseAutofac();
}))
{
application.Initialize();
await application.InitializeAsync();
Console.WriteLine("Started: " + typeof(Program).Namespace);
Console.WriteLine("Press ENTER to stop the application..!");
Console.ReadLine();
application.Shutdown();
await application.ShutdownAsync();
}
}
}

@ -0,0 +1,49 @@
using System;
using System.Threading;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs
{
public class LongRunningJob : BackgroundJob<LongRunningJobArgs>, ITransientDependency
{
private readonly ICancellationTokenProvider _cancellationTokenProvider;
public LongRunningJob(ICancellationTokenProvider cancellationTokenProvider)
{
_cancellationTokenProvider = cancellationTokenProvider;
}
public override void Execute(LongRunningJobArgs args)
{
lock (Console.Out)
{
var oldColor = Console.ForegroundColor;
try
{
Console.WriteLine($"Long running {args.Value} start: {DateTime.Now}");
for (var i = 1; i <= 10; i++)
{
_cancellationTokenProvider.Token.ThrowIfCancellationRequested();
Thread.Sleep(1000);
Console.WriteLine($"{args.Value} step-{i} done: {DateTime.Now}");
}
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"Long running {args.Value} completed: {DateTime.Now}");
}
catch (OperationCanceledException)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"Long running {args.Value} cancelled!!!");
}
finally
{
Console.ForegroundColor = oldColor;
}
}
}
}
}

@ -0,0 +1,8 @@
namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs
{
[BackgroundJobName("LongJob")]
public class LongRunningJobArgs
{
public string Value { get; set; }
}
}

@ -1,4 +1,5 @@
using System;
using System.Threading;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs

@ -1,4 +1,5 @@
using System;
using System.Threading;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs

@ -1,4 +1,5 @@
using Volo.Abp.Autofac;
using System.Threading.Tasks;
using Volo.Abp.Autofac;
using Volo.Abp.BackgroundJobs.DemoApp.Shared;
using Volo.Abp.BackgroundJobs.EntityFrameworkCore;
using Volo.Abp.EntityFrameworkCore;
@ -27,19 +28,21 @@ public class DemoAppModule : AbpModule
Configure<AbpBackgroundJobWorkerOptions>(options =>
{
//Configure for fast running
options.JobPollPeriod = 1000;
//Configure for fast running
options.JobPollPeriod = 1000;
options.DefaultFirstWaitDuration = 1;
options.DefaultWaitFactor = 1;
});
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
public override Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
//TODO: Configure console logging
//context
// .ServiceProvider
// .GetRequiredService<ILoggerFactory>()
// .AddConsole(LogLevel.Debug);
return Task.CompletedTask;
}
}

@ -1,23 +1,23 @@
using System;
using System.Threading.Tasks;
namespace Volo.Abp.BackgroundJobs.DemoApp;
class Program
{
static void Main(string[] args)
async static Task Main(string[] args)
{
using (var application = AbpApplicationFactory.Create<DemoAppModule>(options =>
using (var application = await AbpApplicationFactory.CreateAsync<DemoAppModule>(options =>
{
options.UseAutofac();
}))
{
application.Initialize();
await application.InitializeAsync();
Console.WriteLine("Started: " + typeof(Program).Namespace);
Console.WriteLine("Press ENTER to stop the application..!");
Console.ReadLine();
application.Shutdown();
await application.ShutdownAsync();
}
}
}

Loading…
Cancel
Save