Resolved #590: Memory leak while checking background jobs periodically.

pull/625/head
Halil ibrahim Kalkan 6 years ago
parent 6a8832a8ab
commit 6501b4d811

@ -1,8 +1,7 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using System;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.BackgroundJobs
@ -11,14 +10,10 @@ namespace Volo.Abp.BackgroundJobs
{
public ILogger<BackgroundJobExecuter> Logger { protected get; set; }
protected IServiceProvider ServiceProvider { get; }
protected BackgroundJobOptions Options { get; }
public BackgroundJobExecuter(
IServiceProvider serviceProvider,
IOptions<BackgroundJobOptions> options)
public BackgroundJobExecuter(IOptions<BackgroundJobOptions> options)
{
ServiceProvider = serviceProvider;
Options = options.Value;
Logger = NullLogger<BackgroundJobExecuter>.Instance;
@ -26,34 +21,31 @@ namespace Volo.Abp.BackgroundJobs
public virtual void Execute(JobExecutionContext context)
{
using (var scope = ServiceProvider.CreateScope())
var job = context.ServiceProvider.GetService(context.JobType);
if (job == null)
{
var job = scope.ServiceProvider.GetService(context.JobType);
if (job == null)
{
throw new AbpException("The job type is not registered to DI: " + context.JobType);
}
throw new AbpException("The job type is not registered to DI: " + context.JobType);
}
var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute));
if (jobExecuteMethod == null)
{
throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType);
}
try
{
jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
}
catch (Exception ex)
var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute));
if (jobExecuteMethod == null)
{
throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType);
}
try
{
jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
}
catch (Exception ex)
{
Logger.LogException(ex);
throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
{
Logger.LogException(ex);
throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
{
JobType = context.JobType.AssemblyQualifiedName,
JobArgs = context.JobArgs
};
}
JobType = context.JobType.AssemblyQualifiedName,
JobArgs = context.JobArgs
};
}
}
}

@ -1,15 +1,19 @@
using System;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.BackgroundJobs
{
public class JobExecutionContext
public class JobExecutionContext : IServiceProviderAccessor
{
public IServiceProvider ServiceProvider { get; }
public Type JobType { get; }
public object JobArgs { get; }
public JobExecutionContext(Type jobType, object jobArgs)
public JobExecutionContext(IServiceProvider serviceProvider, Type jobType, object jobArgs)
{
ServiceProvider = serviceProvider;
JobType = jobType;
JobArgs = jobArgs;
}

@ -13,11 +13,20 @@ namespace Volo.Abp.BackgroundJobs.Hangfire
{
if (!delay.HasValue)
{
return Task.FromResult(BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(adapter => adapter.Execute(args)));
return Task.FromResult(
BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
adapter => adapter.Execute(args)
)
);
}
else
{
return Task.FromResult(BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(adapter => adapter.Execute(args), delay.Value));
return Task.FromResult(
BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
adapter => adapter.Execute(args),
delay.Value
)
);
}
}
}

@ -1,23 +1,32 @@
using Microsoft.Extensions.Options;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace Volo.Abp.BackgroundJobs.Hangfire
{
public class HangfireJobExecutionAdapter<TArgs>
{
protected BackgroundJobOptions Options { get; }
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected IBackgroundJobExecuter JobExecuter { get; }
public HangfireJobExecutionAdapter(IOptions<BackgroundJobOptions> options, IBackgroundJobExecuter jobExecuter)
public HangfireJobExecutionAdapter(
IOptions<BackgroundJobOptions> options,
IBackgroundJobExecuter jobExecuter,
IServiceScopeFactory serviceScopeFactory)
{
JobExecuter = jobExecuter;
ServiceScopeFactory = serviceScopeFactory;
Options = options.Value;
}
public void Execute(TArgs args)
{
var jobType = Options.GetJob(typeof(TArgs)).JobType;
var context = new JobExecutionContext(jobType, args);
JobExecuter.Execute(context);
using (var scope = ServiceScopeFactory.CreateScope())
{
var jobType = Options.GetJob(typeof(TArgs)).JobType;
var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);
JobExecuter.Execute(context);
}
}
}
}

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
@ -28,6 +29,7 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
protected IChannelPool ChannelPool { get; }
protected IRabbitMqSerializer Serializer { get; }
protected IBackgroundJobExecuter JobExecuter { get; }
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AsyncLock SyncObj = new AsyncLock();
protected bool IsDiposed { get; private set; }
@ -37,12 +39,14 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
IOptions<RabbitMqBackgroundJobOptions> rabbitMqBackgroundJobOptions,
IChannelPool channelPool,
IRabbitMqSerializer serializer,
IBackgroundJobExecuter jobExecuter)
IBackgroundJobExecuter jobExecuter,
IServiceScopeFactory serviceScopeFactory)
{
BackgroundJobOptions = backgroundJobOptions.Value;
RabbitMqBackgroundJobOptions = rabbitMqBackgroundJobOptions.Value;
Serializer = serializer;
JobExecuter = jobExecuter;
ServiceScopeFactory = serviceScopeFactory;
ChannelPool = channelPool;
JobConfiguration = BackgroundJobOptions.GetJob(typeof(TArgs));
@ -167,25 +171,29 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
protected virtual void MessageReceived(object sender, BasicDeliverEventArgs ea)
{
var context = new JobExecutionContext(
JobConfiguration.JobType,
Serializer.Deserialize(ea.Body, typeof(TArgs))
);
try
{
JobExecuter.Execute(context);
ChannelAccessor.Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (BackgroundJobExecutionException)
using (var scope = ServiceScopeFactory.CreateScope())
{
//TODO: Reject like that?
ChannelAccessor.Channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
}
catch (Exception)
{
//TODO: Reject like that?
ChannelAccessor.Channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
var context = new JobExecutionContext(
scope.ServiceProvider,
JobConfiguration.JobType,
Serializer.Deserialize(ea.Body, typeof(TArgs))
);
try
{
JobExecuter.Execute(context);
ChannelAccessor.Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (BackgroundJobExecutionException)
{
//TODO: Reject like that?
ChannelAccessor.Channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
}
catch (Exception)
{
//TODO: Reject like that?
ChannelAccessor.Channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
}
}
}

@ -1,4 +1,5 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
@ -11,26 +12,26 @@ namespace Volo.Abp.BackgroundJobs
public class BackgroundJobWorker : PeriodicBackgroundWorkerBase, IBackgroundJobWorker, ISingletonDependency
{
protected IBackgroundJobExecuter JobExecuter { get; }
protected IBackgroundJobStore Store { get; }
protected BackgroundJobOptions JobOptions { get; }
protected BackgroundJobWorkerOptions WorkerOptions { get; }
protected IClock Clock { get; }
protected IBackgroundJobSerializer Serializer { get; }
protected IServiceScopeFactory ServiceScopeFactory { get; }
public BackgroundJobWorker(
IBackgroundJobStore store,
AbpTimer timer,
IBackgroundJobExecuter jobExecuter,
IBackgroundJobSerializer serializer,
IOptions<BackgroundJobOptions> jobOptions,
IOptions<BackgroundJobWorkerOptions> workerOptions,
IClock clock)
IClock clock,
IServiceScopeFactory serviceScopeFactory)
: base(timer)
{
JobExecuter = jobExecuter;
Serializer = serializer;
Clock = clock;
Store = store;
ServiceScopeFactory = serviceScopeFactory;
WorkerOptions = workerOptions.Value;
JobOptions = jobOptions.Value;
Timer.Period = WorkerOptions.JobPollPeriod;
@ -38,53 +39,60 @@ namespace Volo.Abp.BackgroundJobs
protected override void DoWork()
{
var waitingJobs = AsyncHelper.RunSync(() => Store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount));
foreach (var jobInfo in waitingJobs)
using (var scope = ServiceScopeFactory.CreateScope())
{
jobInfo.TryCount++;
jobInfo.LastTryTime = Clock.Now;
var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();
var waitingJobs = AsyncHelper.RunSync(
() => store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount)
);
try
foreach (var jobInfo in waitingJobs)
{
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
var jobArgs = Serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
var context = new JobExecutionContext(jobConfiguration.JobType, jobArgs);
jobInfo.TryCount++;
jobInfo.LastTryTime = Clock.Now;
try
{
JobExecuter.Execute(context);
AsyncHelper.RunSync(() => Store.DeleteAsync(jobInfo.Id));
}
catch (BackgroundJobExecutionException)
{
var nextTryTime = CalculateNextTryTime(jobInfo);
if (nextTryTime.HasValue)
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
var jobArgs = Serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs);
try
{
jobInfo.NextTryTime = nextTryTime.Value;
JobExecuter.Execute(context);
AsyncHelper.RunSync(() => store.DeleteAsync(jobInfo.Id));
}
else
catch (BackgroundJobExecutionException)
{
jobInfo.IsAbandoned = true;
}
var nextTryTime = CalculateNextTryTime(jobInfo);
if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
else
{
jobInfo.IsAbandoned = true;
}
TryUpdate(jobInfo);
TryUpdate(store, jobInfo);
}
}
catch (Exception ex)
{
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
TryUpdate(store, jobInfo);
}
}
catch (Exception ex)
{
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
TryUpdate(jobInfo);
}
}
}
protected virtual void TryUpdate(BackgroundJobInfo jobInfo)
protected virtual void TryUpdate(IBackgroundJobStore store, BackgroundJobInfo jobInfo)
{
try
{
Store.UpdateAsync(jobInfo);
store.UpdateAsync(jobInfo);
}
catch (Exception updateEx)
{

@ -25,6 +25,7 @@ namespace Volo.Abp.BackgroundJobs
_backgroundJobExecuter.Execute(
new JobExecutionContext(
ServiceProvider,
typeof(MyJob),
new MyJobArgs("42")
)

Loading…
Cancel
Save