Fixed #1464: Add sync methods to IBackgroundJobStore

pull/1467/head
Halil İbrahim Kalkan 6 years ago
parent e026dfd985
commit 4ce0381da0

@ -39,9 +39,9 @@ namespace Volo.Abp.Authorization
await invocation.ProceedAsync();
}
protected virtual Task AuthorizeAsync(IAbpMethodInvocation invocation)
protected virtual async Task AuthorizeAsync(IAbpMethodInvocation invocation)
{
return _methodInvocationAuthorizationService.CheckAsync(
await _methodInvocationAuthorizationService.CheckAsync(
new MethodInvocationAuthorizationContext(
invocation.Method
)

@ -36,9 +36,7 @@ namespace Volo.Abp.BackgroundJobs
{
var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();
var waitingJobs = AsyncHelper.RunSync(
() => store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount)
);
var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount);
if (!waitingJobs.Any())
{
@ -64,7 +62,7 @@ namespace Volo.Abp.BackgroundJobs
{
jobExecuter.Execute(context);
AsyncHelper.RunSync(() => store.DeleteAsync(jobInfo.Id));
store.Delete(jobInfo.Id);
}
catch (BackgroundJobExecutionException)
{
@ -96,7 +94,7 @@ namespace Volo.Abp.BackgroundJobs
{
try
{
store.UpdateAsync(jobInfo);
store.Update(jobInfo);
}
catch (Exception updateEx)
{
@ -107,9 +105,8 @@ namespace Volo.Abp.BackgroundJobs
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)
{
var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1));
var nextTryDate = jobInfo.LastTryTime.HasValue
? jobInfo.LastTryTime.Value.AddSeconds(nextWaitDuration)
: clock.Now.AddSeconds(nextWaitDuration);
var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??
clock.Now.AddSeconds(nextWaitDuration);
if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout)
{

@ -9,6 +9,13 @@ namespace Volo.Abp.BackgroundJobs
/// </summary>
public interface IBackgroundJobStore
{
/// <summary>
/// Gets a BackgroundJobInfo based on the given jobId.
/// </summary>
/// <param name="jobId">The Job Unique Identifier.</param>
/// <returns>The BackgroundJobInfo object.</returns>
BackgroundJobInfo Find(Guid jobId);
/// <summary>
/// Gets a BackgroundJobInfo based on the given jobId.
/// </summary>
@ -16,12 +23,27 @@ namespace Volo.Abp.BackgroundJobs
/// <returns>The BackgroundJobInfo object.</returns>
Task<BackgroundJobInfo> FindAsync(Guid jobId);
/// <summary>
/// Inserts a background job.
/// </summary>
/// <param name="jobInfo">Job information.</param>
void Insert(BackgroundJobInfo jobInfo);
/// <summary>
/// Inserts a background job.
/// </summary>
/// <param name="jobInfo">Job information.</param>
Task InsertAsync(BackgroundJobInfo jobInfo);
/// <summary>
/// Gets waiting jobs. It should get jobs based on these:
/// Conditions: !IsAbandoned And NextTryTime &lt;= Clock.Now.
/// Order by: Priority DESC, TryCount ASC, NextTryTime ASC.
/// Maximum result: <paramref name="maxResultCount"/>.
/// </summary>
/// <param name="maxResultCount">Maximum result count.</param>
List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount);
/// <summary>
/// Gets waiting jobs. It should get jobs based on these:
/// Conditions: !IsAbandoned And NextTryTime &lt;= Clock.Now.
@ -31,12 +53,24 @@ namespace Volo.Abp.BackgroundJobs
/// <param name="maxResultCount">Maximum result count.</param>
Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount);
/// <summary>
/// Deletes a job.
/// </summary>
/// <param name="jobId">The Job Unique Identifier.</param>
void Delete(Guid jobId);
/// <summary>
/// Deletes a job.
/// </summary>
/// <param name="jobId">The Job Unique Identifier.</param>
Task DeleteAsync(Guid jobId);
/// <summary>
/// Updates a job.
/// </summary>
/// <param name="jobInfo">Job information.</param>
void Update(BackgroundJobInfo jobInfo);
/// <summary>
/// Updates a job.
/// </summary>

@ -23,11 +23,21 @@ namespace Volo.Abp.BackgroundJobs
_jobs = new ConcurrentDictionary<Guid, BackgroundJobInfo>();
}
public BackgroundJobInfo Find(Guid jobId)
{
return _jobs.GetOrDefault(jobId);
}
public virtual Task<BackgroundJobInfo> FindAsync(Guid jobId)
{
return Task.FromResult(_jobs.GetOrDefault(jobId));
}
public void Insert(BackgroundJobInfo jobInfo)
{
_jobs[jobInfo.Id] = jobInfo;
}
public virtual Task InsertAsync(BackgroundJobInfo jobInfo)
{
_jobs[jobInfo.Id] = jobInfo;
@ -35,6 +45,17 @@ namespace Volo.Abp.BackgroundJobs
return Task.FromResult(0);
}
public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)
{
return _jobs.Values
.Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
.OrderByDescending(t => t.Priority)
.ThenBy(t => t.TryCount)
.ThenBy(t => t.NextTryTime)
.Take(maxResultCount)
.ToList();
}
public virtual Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount)
{
var waitingJobs = _jobs.Values
@ -48,6 +69,11 @@ namespace Volo.Abp.BackgroundJobs
return Task.FromResult(waitingJobs);
}
public void Delete(Guid jobId)
{
_jobs.TryRemove(jobId, out _);
}
public virtual Task DeleteAsync(Guid jobId)
{
_jobs.TryRemove(jobId, out _);
@ -55,6 +81,14 @@ namespace Volo.Abp.BackgroundJobs
return Task.FromResult(0);
}
public void Update(BackgroundJobInfo jobInfo)
{
if (jobInfo.IsAbandoned)
{
DeleteAsync(jobInfo.Id);
}
}
public virtual Task UpdateAsync(BackgroundJobInfo jobInfo)
{
if (jobInfo.IsAbandoned)

@ -1,7 +1,4 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using System;
using System.Threading.Tasks;
using System.Threading.Tasks;
using Volo.Abp.Aspects;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DynamicProxy;

@ -19,6 +19,13 @@ namespace Volo.Abp.BackgroundJobs
BackgroundJobRepository = backgroundJobRepository;
}
public BackgroundJobInfo Find(Guid jobId)
{
return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(
BackgroundJobRepository.Find(jobId)
);
}
public virtual async Task<BackgroundJobInfo> FindAsync(Guid jobId)
{
return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(
@ -26,6 +33,13 @@ namespace Volo.Abp.BackgroundJobs
);
}
public void Insert(BackgroundJobInfo jobInfo)
{
BackgroundJobRepository.Insert(
ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
);
}
public virtual async Task InsertAsync(BackgroundJobInfo jobInfo)
{
await BackgroundJobRepository.InsertAsync(
@ -33,6 +47,13 @@ namespace Volo.Abp.BackgroundJobs
);
}
public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)
{
return ObjectMapper.Map<List<BackgroundJobRecord>, List<BackgroundJobInfo>>(
BackgroundJobRepository.GetWaitingList(maxResultCount)
);
}
public virtual async Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount)
{
return ObjectMapper.Map<List<BackgroundJobRecord>, List<BackgroundJobInfo>>(
@ -40,11 +61,23 @@ namespace Volo.Abp.BackgroundJobs
);
}
public void Delete(Guid jobId)
{
BackgroundJobRepository.Delete(jobId);
}
public virtual async Task DeleteAsync(Guid jobId)
{
await BackgroundJobRepository.DeleteAsync(jobId);
}
public void Update(BackgroundJobInfo jobInfo)
{
BackgroundJobRepository.Update(
ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
);
}
public virtual async Task UpdateAsync(BackgroundJobInfo jobInfo)
{
await BackgroundJobRepository.UpdateAsync(

@ -7,6 +7,8 @@ namespace Volo.Abp.BackgroundJobs
{
public interface IBackgroundJobRepository : IBasicRepository<BackgroundJobRecord, Guid>
{
List<BackgroundJobRecord> GetWaitingList(int maxResultCount);
Task<List<BackgroundJobRecord>> GetWaitingListAsync(int maxResultCount);
}
}

@ -21,16 +21,27 @@ namespace Volo.Abp.BackgroundJobs.EntityFrameworkCore
Clock = clock;
}
public List<BackgroundJobRecord> GetWaitingList(int maxResultCount)
{
return GetWaitingListQuery(maxResultCount)
.ToList();
}
public async Task<List<BackgroundJobRecord>> GetWaitingListAsync(int maxResultCount)
{
return await GetWaitingListQuery(maxResultCount)
.ToListAsync();
}
private IQueryable<BackgroundJobRecord> GetWaitingListQuery(int maxResultCount)
{
var now = Clock.Now;
return await DbSet
return DbSet
.Where(t => !t.IsAbandoned && t.NextTryTime <= now)
.OrderByDescending(t => t.Priority)
.ThenBy(t => t.TryCount)
.ThenBy(t => t.NextTryTime)
.Take(maxResultCount)
.ToListAsync();
.Take(maxResultCount);
}
}
}

@ -21,16 +21,27 @@ namespace Volo.Abp.BackgroundJobs.MongoDB
Clock = clock;
}
public List<BackgroundJobRecord> GetWaitingList(int maxResultCount)
{
return GetWaitingListQuery(maxResultCount)
.ToList();
}
public async Task<List<BackgroundJobRecord>> GetWaitingListAsync(int maxResultCount)
{
return await GetWaitingListQuery(maxResultCount)
.ToListAsync();
}
private IMongoQueryable<BackgroundJobRecord> GetWaitingListQuery(int maxResultCount)
{
var now = Clock.Now;
return await GetMongoQueryable()
return GetMongoQueryable()
.Where(t => !t.IsAbandoned && t.NextTryTime <= now)
.OrderByDescending(t => t.Priority)
.ThenBy(t => t.TryCount)
.ThenBy(t => t.NextTryTime)
.Take(maxResultCount)
.ToListAsync();
.Take(maxResultCount);
}
}
}

Loading…
Cancel
Save