Add AbpAsyncTimer.

pull/6417/head
maliming 5 years ago
parent 18afc512fd
commit db59f2773e

@ -17,7 +17,7 @@ namespace Volo.Abp.BackgroundJobs
protected AbpBackgroundJobWorkerOptions WorkerOptions { get; }
public BackgroundJobWorker(
AbpTimer timer,
AbpAsyncTimer timer,
IOptions<AbpBackgroundJobOptions> jobOptions,
IOptions<AbpBackgroundJobWorkerOptions> workerOptions,
IServiceScopeFactory serviceScopeFactory)

@ -34,7 +34,7 @@ namespace Volo.Abp.BackgroundWorkers.Quartz
throw new ArgumentException($"{nameof(worker)} type is different from the generic type");
}
var timer = (AbpTimer) worker.GetType().GetProperty("Timer", BindingFlags.Instance | BindingFlags.NonPublic)?.GetValue(worker);
var timer = (AbpAsyncTimer) worker.GetType().GetProperty("Timer", BindingFlags.Instance | BindingFlags.NonPublic)?.GetValue(worker);
period = timer?.Period;
}
else

@ -11,15 +11,15 @@ namespace Volo.Abp.BackgroundWorkers
public abstract class AsyncPeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpTimer Timer { get; }
protected AbpAsyncTimer Timer { get; }
protected AsyncPeriodicBackgroundWorkerBase(
AbpTimer timer,
AbpAsyncTimer timer,
IServiceScopeFactory serviceScopeFactory)
{
ServiceScopeFactory = serviceScopeFactory;
Timer = timer;
Timer.Elapsed += Timer_Elapsed;
Timer.Elapsed = Timer_Elapsed;
}
public async override Task StartAsync(CancellationToken cancellationToken = default)
@ -34,10 +34,9 @@ namespace Volo.Abp.BackgroundWorkers
await base.StopAsync(cancellationToken);
}
private void Timer_Elapsed(object sender, System.EventArgs e)
private async Task Timer_Elapsed(AbpAsyncTimer timer)
{
// Discard the result
_ = DoWorkAsync();
await DoWorkAsync();
}
private async Task DoWorkAsync()

@ -47,9 +47,8 @@ namespace Volo.Abp.BackgroundWorkers
}
catch (Exception ex)
{
_ = scope.ServiceProvider
.GetRequiredService<IExceptionNotifier>()
.NotifyAsync(new ExceptionNotificationContext(ex));
var exceptionNotifier = scope.ServiceProvider.GetRequiredService<IExceptionNotifier>();
AsyncHelper.RunSync(() => exceptionNotifier.NotifyAsync(new ExceptionNotificationContext(ex)));
Logger.LogException(ex);
}

@ -20,7 +20,7 @@ namespace Volo.Abp.RabbitMQ
protected IExceptionNotifier ExceptionNotifier { get; }
protected AbpTimer Timer { get; }
protected AbpAsyncTimer Timer { get; }
protected ExchangeDeclareConfiguration Exchange { get; private set; }
@ -38,7 +38,7 @@ namespace Volo.Abp.RabbitMQ
public RabbitMqMessageConsumer(
IConnectionPool connectionPool,
AbpTimer timer,
AbpAsyncTimer timer,
IExceptionNotifier exceptionNotifier)
{
ConnectionPool = connectionPool;
@ -50,7 +50,7 @@ namespace Volo.Abp.RabbitMQ
Callbacks = new ConcurrentBag<Func<IModel, BasicDeliverEventArgs, Task>>();
Timer.Period = 5000; //5 sec.
Timer.Elapsed += Timer_Elapsed;
Timer.Elapsed = Timer_Elapsed;
Timer.RunOnStart = true;
}
@ -77,7 +77,7 @@ namespace Volo.Abp.RabbitMQ
await TrySendQueueBindCommandsAsync();
}
protected virtual void TrySendQueueBindCommands()
protected virtual async Task TrySendQueueBindCommandsAsync()
{
try
{
@ -119,40 +119,33 @@ namespace Volo.Abp.RabbitMQ
catch (Exception ex)
{
Logger.LogException(ex, LogLevel.Warning);
_ = ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning);
await ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning);
}
}
protected virtual Task TrySendQueueBindCommandsAsync()
{
TrySendQueueBindCommands();
return Task.CompletedTask;
}
public virtual void OnMessageReceived(Func<IModel, BasicDeliverEventArgs, Task> callback)
{
Callbacks.Add(callback);
}
protected virtual void Timer_Elapsed(object sender, EventArgs e)
protected virtual async Task Timer_Elapsed(AbpAsyncTimer timer)
{
if (Channel == null || Channel.IsOpen == false)
{
TryCreateChannel();
TrySendQueueBindCommands();
await TryCreateChannelAsync();
await TrySendQueueBindCommandsAsync();
}
}
protected virtual void TryCreateChannel()
protected virtual async Task TryCreateChannelAsync()
{
DisposeChannel();
await DisposeChannelAsync();
try
{
var channel = ConnectionPool
.Get(ConnectionName)
.CreateModel();
channel.ExchangeDeclare(
exchange: Exchange.ExchangeName,
type: Exchange.Type,
@ -172,7 +165,7 @@ namespace Volo.Abp.RabbitMQ
var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, basicDeliverEventArgs) =>
{
await HandleIncomingMessage(channel, basicDeliverEventArgs);
await HandleIncomingMessageAsync(channel, basicDeliverEventArgs);
};
channel.BasicConsume(
@ -186,11 +179,11 @@ namespace Volo.Abp.RabbitMQ
catch (Exception ex)
{
Logger.LogException(ex, LogLevel.Warning);
_ = ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning);
await ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning);
}
}
protected virtual async Task HandleIncomingMessage(IModel channel, BasicDeliverEventArgs basicDeliverEventArgs)
protected virtual async Task HandleIncomingMessageAsync(IModel channel, BasicDeliverEventArgs basicDeliverEventArgs)
{
try
{
@ -204,7 +197,25 @@ namespace Volo.Abp.RabbitMQ
catch (Exception ex)
{
Logger.LogException(ex);
_ = ExceptionNotifier.NotifyAsync(ex);
await ExceptionNotifier.NotifyAsync(ex);
}
}
protected virtual async Task DisposeChannelAsync()
{
if (Channel == null)
{
return;
}
try
{
Channel.Dispose();
}
catch (Exception ex)
{
Logger.LogException(ex, LogLevel.Warning);
await ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning);
}
}
@ -222,7 +233,7 @@ namespace Volo.Abp.RabbitMQ
catch (Exception ex)
{
Logger.LogException(ex, LogLevel.Warning);
_ = ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning);
AsyncHelper.RunSync(() => ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning));
}
}

@ -0,0 +1,127 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Volo.Abp.DependencyInjection;
using Volo.Abp.ExceptionHandling;
namespace Volo.Abp.Threading
{
/// <summary>
/// A robust timer implementation that ensures no overlapping occurs. It waits exactly specified <see cref="Period"/> between ticks.
/// </summary>
public class AbpAsyncTimer : ITransientDependency
{
/// <summary>
/// This func is raised periodically according to Period of Timer.
/// </summary>
public Func<AbpAsyncTimer, Task> Elapsed = _ => Task.CompletedTask;
/// <summary>
/// Task period of timer (as milliseconds).
/// </summary>
public int Period { get; set; }
/// <summary>
/// Indicates whether timer raises Elapsed event on Start method of Timer for once.
/// Default: False.
/// </summary>
public bool RunOnStart { get; set; }
public ILogger<AbpAsyncTimer> Logger { get; set; }
public IExceptionNotifier ExceptionNotifier { get; set; }
private readonly Timer _taskTimer;
private volatile bool _performingTasks;
private volatile bool _isRunning;
public AbpAsyncTimer()
{
ExceptionNotifier = NullExceptionNotifier.Instance;
Logger = NullLogger<AbpAsyncTimer>.Instance;
_taskTimer = new Timer(
TimerCallBack,
null,
Timeout.Infinite,
Timeout.Infinite
);
}
public void Start(CancellationToken cancellationToken = default)
{
if (Period <= 0)
{
throw new AbpException("Period should be set before starting the timer!");
}
lock (_taskTimer)
{
_taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
_isRunning = true;
}
}
public void Stop(CancellationToken cancellationToken = default)
{
lock (_taskTimer)
{
_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
while (_performingTasks)
{
Monitor.Wait(_taskTimer);
}
_isRunning = false;
}
}
/// <summary>
/// This method is called by _taskTimer.
/// </summary>
/// <param name="state">Not used argument</param>
private void TimerCallBack(object state)
{
lock (_taskTimer)
{
if (!_isRunning || _performingTasks)
{
return;
}
_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
_performingTasks = true;
}
_ = Timer_Elapsed();
}
private async Task Timer_Elapsed()
{
try
{
await Elapsed(this);
}
catch(Exception ex)
{
Logger.LogException(ex);
await ExceptionNotifier.NotifyAsync(ex);
}
finally
{
lock (_taskTimer)
{
_performingTasks = false;
if (_isRunning)
{
_taskTimer.Change(Period, Timeout.Infinite);
}
Monitor.Pulse(_taskTimer);
}
}
}
}
}

@ -101,7 +101,7 @@ namespace Volo.Abp.Threading
catch(Exception ex)
{
Logger.LogException(ex);
_ = ExceptionNotifier.NotifyAsync(ex);
AsyncHelper.RunSync(() => ExceptionNotifier.NotifyAsync(ex));
}
finally
{

Loading…
Cancel
Save