Resolved #2540. Resolved #2539.

* Send IServiceProvider to DoWork method of the periodic background worker #2539
* Introduce AsyncPeriodicBackgroundWorkerBase #2540
pull/2545/head
Halil İbrahim Kalkan 5 years ago
parent d815b9cfee
commit 401376e109

@ -4,7 +4,6 @@ using Microsoft.Extensions.Options;
using System;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs
{
@ -21,7 +20,7 @@ namespace Volo.Abp.BackgroundJobs
Logger = NullLogger<BackgroundJobExecuter>.Instance;
}
public virtual void Execute(JobExecutionContext context)
public virtual async Task ExecuteAsync(JobExecutionContext context)
{
var job = context.ServiceProvider.GetService(context.JobType);
if (job == null)
@ -41,7 +40,7 @@ namespace Volo.Abp.BackgroundJobs
{
if (jobExecuteMethod.Name == nameof(IAsyncBackgroundJob<object>.ExecuteAsync))
{
AsyncHelper.RunSync(() => (Task) jobExecuteMethod.Invoke(job, new[] {context.JobArgs}));
await ((Task) jobExecuteMethod.Invoke(job, new[] {context.JobArgs})).ConfigureAwait(false);
}
else
{

@ -1,7 +1,9 @@
namespace Volo.Abp.BackgroundJobs
using System.Threading.Tasks;
namespace Volo.Abp.BackgroundJobs
{
public interface IBackgroundJobExecuter
{
void Execute(JobExecutionContext context);
Task ExecuteAsync(JobExecutionContext context);
}
}

@ -1,5 +1,6 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs.Hangfire
{
@ -25,7 +26,7 @@ namespace Volo.Abp.BackgroundJobs.Hangfire
{
var jobType = Options.GetJob(typeof(TArgs)).JobType;
var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);
JobExecuter.Execute(context);
AsyncHelper.RunSync(() => JobExecuter.ExecuteAsync(context));
}
}
}

@ -10,6 +10,7 @@ using Nito.AsyncEx;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Volo.Abp.RabbitMQ;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs.RabbitMQ
{
@ -181,7 +182,7 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
try
{
JobExecuter.Execute(context);
AsyncHelper.RunSync(() => JobExecuter.ExecuteAsync(context));
ChannelAccessor.Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (BackgroundJobExecutionException)

@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@ -9,92 +10,88 @@ using Volo.Abp.Timing;
namespace Volo.Abp.BackgroundJobs
{
public class BackgroundJobWorker : PeriodicBackgroundWorkerBase, IBackgroundJobWorker
public class BackgroundJobWorker : AsyncPeriodicBackgroundWorkerBase, IBackgroundJobWorker
{
protected AbpBackgroundJobOptions JobOptions { get; }
protected AbpBackgroundJobWorkerOptions WorkerOptions { get; }
protected IServiceScopeFactory ServiceScopeFactory { get; }
public BackgroundJobWorker(
AbpTimer timer,
IOptions<AbpBackgroundJobOptions> jobOptions,
IOptions<AbpBackgroundJobWorkerOptions> workerOptions,
IServiceScopeFactory serviceScopeFactory)
: base(timer)
: base(
timer,
serviceScopeFactory)
{
ServiceScopeFactory = serviceScopeFactory;
WorkerOptions = workerOptions.Value;
JobOptions = jobOptions.Value;
Timer.Period = WorkerOptions.JobPollPeriod;
}
protected override void DoWork()
protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
{
using (var scope = ServiceScopeFactory.CreateScope())
{
var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();
var store = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobStore>();
var waitingJobs = AsyncHelper.RunSync(() => store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount));
var waitingJobs = await store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount).ConfigureAwait(false);
if (!waitingJobs.Any())
{
return;
}
if (!waitingJobs.Any())
{
return;
}
var jobExecuter = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobExecuter>();
var clock = workerContext.ServiceProvider.GetRequiredService<IClock>();
var serializer = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobSerializer>();
var jobExecuter = scope.ServiceProvider.GetRequiredService<IBackgroundJobExecuter>();
var clock = scope.ServiceProvider.GetRequiredService<IClock>();
var serializer = scope.ServiceProvider.GetRequiredService<IBackgroundJobSerializer>();
foreach (var jobInfo in waitingJobs)
{
jobInfo.TryCount++;
jobInfo.LastTryTime = clock.Now;
foreach (var jobInfo in waitingJobs)
try
{
jobInfo.TryCount++;
jobInfo.LastTryTime = clock.Now;
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
var context = new JobExecutionContext(workerContext.ServiceProvider, jobConfiguration.JobType, jobArgs);
try
{
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs);
await jobExecuter.ExecuteAsync(context).ConfigureAwait(false);
try
{
jobExecuter.Execute(context);
await store.DeleteAsync(jobInfo.Id).ConfigureAwait(false);
}
catch (BackgroundJobExecutionException)
{
var nextTryTime = CalculateNextTryTime(jobInfo, clock);
AsyncHelper.RunSync(() => store.DeleteAsync(jobInfo.Id));
if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
catch (BackgroundJobExecutionException)
else
{
var nextTryTime = CalculateNextTryTime(jobInfo, clock);
if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
else
{
jobInfo.IsAbandoned = true;
}
TryUpdate(store, jobInfo);
jobInfo.IsAbandoned = true;
}
await TryUpdateAsync(store, jobInfo).ConfigureAwait(false);
}
catch (Exception ex)
{
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
TryUpdate(store, jobInfo);
}
}
catch (Exception ex)
{
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
await TryUpdateAsync(store, jobInfo).ConfigureAwait(false);
}
}
}
protected virtual void TryUpdate(IBackgroundJobStore store, BackgroundJobInfo jobInfo)
protected virtual async Task TryUpdateAsync(IBackgroundJobStore store, BackgroundJobInfo jobInfo)
{
try
{
AsyncHelper.RunSync(() => store.UpdateAsync(jobInfo));
await store.UpdateAsync(jobInfo).ConfigureAwait(false);
}
catch (Exception updateEx)
{

@ -0,0 +1,55 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundWorkers
{
public abstract class AsyncPeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpTimer Timer { get; }
protected AsyncPeriodicBackgroundWorkerBase(
AbpTimer timer,
IServiceScopeFactory serviceScopeFactory)
{
ServiceScopeFactory = serviceScopeFactory;
Timer = timer;
Timer.Elapsed += Timer_Elapsed;
}
public override async Task StartAsync(CancellationToken cancellationToken = default)
{
await base.StartAsync(cancellationToken).ConfigureAwait(false);
Timer.Start(cancellationToken);
}
public override async Task StopAsync(CancellationToken cancellationToken = default)
{
Timer.Stop(cancellationToken);
await base.StopAsync(cancellationToken).ConfigureAwait(false);
}
private void Timer_Elapsed(object sender, System.EventArgs e)
{
try
{
using (var scope = ServiceScopeFactory.CreateScope())
{
AsyncHelper.RunSync(
() => DoWorkAsync(new PeriodicBackgroundWorkerContext(scope.ServiceProvider))
);
}
}
catch (Exception ex)
{
Logger.LogException(ex);
}
}
protected abstract Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext);
}
}

@ -0,0 +1,14 @@
using System;
namespace Volo.Abp.BackgroundWorkers
{
public class PeriodicBackgroundWorkerContext
{
public IServiceProvider ServiceProvider { get; }
public PeriodicBackgroundWorkerContext(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
}
}
}

@ -1,6 +1,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Volo.Abp.Threading;
@ -11,14 +12,14 @@ namespace Volo.Abp.BackgroundWorkers
/// </summary>
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
protected readonly AbpTimer Timer;
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpTimer Timer { get; }
/// <summary>
/// Initializes a new instance of the <see cref="PeriodicBackgroundWorkerBase"/> class.
/// </summary>
/// <param name="timer">A timer.</param>
protected PeriodicBackgroundWorkerBase(AbpTimer timer)
protected PeriodicBackgroundWorkerBase(
AbpTimer timer,
IServiceScopeFactory serviceScopeFactory)
{
ServiceScopeFactory = serviceScopeFactory;
Timer = timer;
Timer.Elapsed += Timer_Elapsed;
}
@ -34,12 +35,15 @@ namespace Volo.Abp.BackgroundWorkers
Timer.Stop(cancellationToken);
await base.StopAsync(cancellationToken).ConfigureAwait(false);
}
private void Timer_Elapsed(object sender, System.EventArgs e)
{
try
{
DoWork();
using (var scope = ServiceScopeFactory.CreateScope())
{
DoWork(new PeriodicBackgroundWorkerContext(scope.ServiceProvider));
}
}
catch (Exception ex)
{
@ -50,6 +54,6 @@ namespace Volo.Abp.BackgroundWorkers
/// <summary>
/// Periodic works should be done by implementing this method.
/// </summary>
protected abstract void DoWork();
protected abstract void DoWork(PeriodicBackgroundWorkerContext workerContext);
}
}
Loading…
Cancel
Save