Refactored background job executer.

pull/395/head
Halil ibrahim Kalkan 7 years ago
parent 14e55acd15
commit 73fb92f436

@ -198,7 +198,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.BackgroundJobs", "
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.BackgroundWorkers", "src\Volo.Abp.BackgroundWorkers\Volo.Abp.BackgroundWorkers.csproj", "{6C3E76B8-C4DA-4E74-9F8B-A8BC4C831722}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BackgroundJobs.Tests", "test\Volo.Abp.BackgroundJobs.Tests\Volo.Abp.BackgroundJobs.Tests.csproj", "{D86548EA-7047-4623-8824-F6285CD254AA}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.BackgroundJobs.Tests", "test\Volo.Abp.BackgroundJobs.Tests\Volo.Abp.BackgroundJobs.Tests.csproj", "{D86548EA-7047-4623-8824-F6285CD254AA}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution

@ -1,21 +1,15 @@
using System;
using System.Runtime.Serialization;
using JetBrains.Annotations;
namespace Volo.Abp.BackgroundJobs
{
[Serializable]
public class BackgroundJobException : AbpException
{
[CanBeNull]
public BackgroundJobInfo BackgroundJob { get; set; }
public string JobName { get; set; }
[CanBeNull]
public object JobObject { get; set; }
public string JobArgs { get; set; }
/// <summary>
/// Creates a new <see cref="BackgroundJobException"/> object.
/// </summary>
public BackgroundJobException()
{

@ -5,8 +5,6 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;
using Volo.Abp.Timing;
namespace Volo.Abp.BackgroundJobs
{
@ -15,120 +13,58 @@ namespace Volo.Abp.BackgroundJobs
public ILogger<BackgroundJobExecuter> Logger { protected get; set; }
protected IServiceProvider ServiceProvider { get; }
protected IClock Clock { get; }
protected IBackgroundJobSerializer Serializer { get; }
protected IBackgroundJobStore Store { get; }
protected BackgroundJobOptions Options { get; }
public BackgroundJobExecuter(
IServiceProvider serviceProvider,
IClock clock,
IBackgroundJobSerializer serializer,
IBackgroundJobStore store,
IOptions<BackgroundJobOptions> options)
{
ServiceProvider = serviceProvider;
Clock = clock;
Serializer = serializer;
Options = options.Value;
Store = store;
Logger = NullLogger<BackgroundJobExecuter>.Instance;
}
public virtual void Execute(BackgroundJobInfo jobInfo)
public virtual void Execute(JobExecutionContext context)
{
//TODO: Refactor (split to multiple methods).
try
var jobType = Options.GetJobType(context.JobName);
using (var scope = ServiceProvider.CreateScope())
{
jobInfo.TryCount++;
jobInfo.LastTryTime = Clock.Now;
var job = scope.ServiceProvider.GetService(jobType);
if (job == null)
{
throw new AbpException("The job type is not registered to DI: " + jobType);
}
var jobType = Options.GetJobType(jobInfo.JobName);
var jobExecuteMethod = job.GetType().GetMethod("Execute");
Debug.Assert(jobExecuteMethod != null, nameof(jobExecuteMethod) + " != null");
var argsType = jobExecuteMethod.GetParameters()[0].ParameterType;
var argsObj = Serializer.Deserialize(context.JobArgs, argsType);
using (var scope = ServiceProvider.CreateScope())
try
{
var job = scope.ServiceProvider.GetService(jobType);
if (job == null)
{
throw new AbpException("The job type is not registered to DI: " + jobType);
}
jobExecuteMethod.Invoke(job, new[] { argsObj });
}
catch (Exception ex)
{
context.Result = JobExecutionResult.Failed;
var jobExecuteMethod = job.GetType().GetMethod("Execute");
Debug.Assert(jobExecuteMethod != null, nameof(jobExecuteMethod) + " != null");
var argsType = jobExecuteMethod.GetParameters()[0].ParameterType;
var argsObj = Serializer.Deserialize(jobInfo.JobArgs, argsType);
Logger.LogException(ex);
try
//TODO: Somehow trigger an event for the exception (may create an Volo.Abp.ExceptionHandling package)!
var backgroundJobException = new BackgroundJobException("A background job execution is failed. See inner exception for details.", ex)
{
jobExecuteMethod.Invoke(job, new[] { argsObj });
AsyncHelper.RunSync(() => Store.DeleteAsync(jobInfo.Id));
}
catch (Exception ex)
{
Logger.LogException(ex);
var nextTryTime = CalculateNextTryTime(jobInfo);
if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
else
{
jobInfo.IsAbandoned = true;
}
TryUpdate(jobInfo);
var backgroundJobException = new BackgroundJobException(
"A background job execution is failed. See inner exception for details. See BackgroundJob property to get information on the background job.",
ex
)
{
BackgroundJob = jobInfo,
JobObject = job
};
//TODO: Somehow trigger an event for the exception (may create an Volo.Abp.ExceptionHandling package)!
}
JobName = context.JobName,
JobArgs = context.JobArgs
};
}
}
catch (Exception ex)
{
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
TryUpdate(jobInfo);
}
}
protected virtual void TryUpdate(BackgroundJobInfo jobInfo)
{
try
{
Store.UpdateAsync(jobInfo);
}
catch (Exception updateEx)
{
Logger.LogException(updateEx);
}
}
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo) //TODO: Move to another place to override easier
{
var nextWaitDuration = Options.DefaultFirstWaitDuration * (Math.Pow(Options.DefaultWaitFactor, jobInfo.TryCount - 1));
var nextTryDate = jobInfo.LastTryTime.HasValue
? jobInfo.LastTryTime.Value.AddSeconds(nextWaitDuration)
: Clock.Now.AddSeconds(nextWaitDuration);
if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > Options.DefaultTimeout)
{
return null;
}
return nextTryDate;
}
}
}

@ -49,6 +49,7 @@ namespace Volo.Abp.BackgroundJobs
{
JobTypes = new Dictionary<string, Type>();
JobPollPeriod = 5000;
DefaultFirstWaitDuration = 60;
DefaultTimeout = 172800;
DefaultWaitFactor = 2.0;

@ -1,7 +1,10 @@
using Microsoft.Extensions.Options;
using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;
using Volo.Abp.Timing;
namespace Volo.Abp.BackgroundJobs
{
@ -10,7 +13,8 @@ namespace Volo.Abp.BackgroundJobs
protected IBackgroundJobExecuter JobExecuter { get; }
protected IBackgroundJobStore Store { get; }
protected BackgroundJobOptions Options { get; }
protected IClock Clock { get; }
/// <summary>
/// Initializes a new instance of the <see cref="BackgroundJobManager"/> class.
/// </summary>
@ -18,10 +22,12 @@ namespace Volo.Abp.BackgroundJobs
IBackgroundJobStore store,
AbpTimer timer,
IBackgroundJobExecuter jobExecuter,
IOptions<BackgroundJobOptions> options)
IOptions<BackgroundJobOptions> options,
IClock clock)
: base(timer)
{
JobExecuter = jobExecuter;
Clock = clock;
Store = store;
Options = options.Value;
Timer.Period = Options.JobPollPeriod;
@ -31,10 +37,71 @@ namespace Volo.Abp.BackgroundJobs
{
var waitingJobs = AsyncHelper.RunSync(() => Store.GetWaitingJobsAsync(Options.MaxJobFetchCount));
foreach (var job in waitingJobs)
foreach (var jobInfo in waitingJobs)
{
jobInfo.TryCount++;
jobInfo.LastTryTime = Clock.Now;
var context = new JobExecutionContext(jobInfo.JobName, jobInfo.JobArgs);
try
{
JobExecuter.Execute(context);
if (context.Result == JobExecutionResult.Success)
{
AsyncHelper.RunSync(() => Store.DeleteAsync(jobInfo.Id));
}
else if (context.Result == JobExecutionResult.Failed)
{
var nextTryTime = CalculateNextTryTime(jobInfo);
if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
else
{
jobInfo.IsAbandoned = true;
}
TryUpdate(jobInfo);
}
}
catch (Exception ex)
{
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
TryUpdate(jobInfo);
}
}
}
protected virtual void TryUpdate(BackgroundJobInfo jobInfo)
{
try
{
Store.UpdateAsync(jobInfo);
}
catch (Exception updateEx)
{
Logger.LogException(updateEx);
}
}
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo) //TODO: Move to another place to override easier
{
var nextWaitDuration = Options.DefaultFirstWaitDuration * (Math.Pow(Options.DefaultWaitFactor, jobInfo.TryCount - 1));
var nextTryDate = jobInfo.LastTryTime.HasValue
? jobInfo.LastTryTime.Value.AddSeconds(nextWaitDuration)
: Clock.Now.AddSeconds(nextWaitDuration);
if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > Options.DefaultTimeout)
{
JobExecuter.Execute(job);
return null;
}
return nextTryDate;
}
}
}

@ -2,6 +2,6 @@
{
public interface IBackgroundJobExecuter
{
void Execute(BackgroundJobInfo jobInfo);
void Execute(JobExecutionContext context);
}
}

@ -0,0 +1,18 @@
namespace Volo.Abp.BackgroundJobs
{
public class JobExecutionContext
{
public string JobName { get; }
public string JobArgs { get; }
public JobExecutionResult Result { get; set; }
public JobExecutionContext(string jobName, string jobArgs)
{
JobName = jobName;
JobArgs = jobArgs;
Result = JobExecutionResult.Success;
}
}
}

@ -0,0 +1,8 @@
namespace Volo.Abp.BackgroundJobs
{
public enum JobExecutionResult
{
Success,
Failed
}
}

@ -1,5 +1,6 @@
using System.Threading.Tasks;
using Shouldly;
using Volo.Abp.Json;
using Xunit;
namespace Volo.Abp.BackgroundJobs
@ -7,14 +8,12 @@ namespace Volo.Abp.BackgroundJobs
public class BackgroundJobExecuter_Tests : BackgroundJobsTestBase
{
private readonly IBackgroundJobExecuter _backgroundJobExecuter;
private readonly IBackgroundJobManager _backgroundJobManager;
private readonly IBackgroundJobStore _backgroundJobStore;
private readonly IJsonSerializer _jsonSerializer;
public BackgroundJobExecuter_Tests()
{
_backgroundJobExecuter = GetRequiredService<IBackgroundJobExecuter>();
_backgroundJobManager = GetRequiredService<IBackgroundJobManager>();
_backgroundJobStore = GetRequiredService<IBackgroundJobStore>();
_jsonSerializer = GetRequiredService<IJsonSerializer>();
}
[Fact]
@ -25,21 +24,18 @@ namespace Volo.Abp.BackgroundJobs
var jobObject = GetRequiredService<MyJob>();
jobObject.ExecutedValues.ShouldBeEmpty();
var jobId = await _backgroundJobManager.EnqueueAsync(new MyJobArgs("42"));
var job = await _backgroundJobStore.FindAsync(jobId);
job.ShouldNotBeNull();
//Act
_backgroundJobExecuter.Execute(job);
_backgroundJobExecuter.Execute(
new JobExecutionContext(
BackgroundJobNameAttribute.GetName<MyJobArgs>(),
_jsonSerializer.Serialize(new MyJobArgs("42"))
)
);
//Assert
jobObject.ExecutedValues.ShouldContain("42");
job = await _backgroundJobStore.FindAsync(jobId);
job.ShouldBeNull(); //Because it's deleted after the execution
}
}
}
Loading…
Cancel
Save