From db59f2773e22e3e4448e64e1b956754d553c6300 Mon Sep 17 00:00:00 2001 From: maliming Date: Thu, 3 Dec 2020 14:45:24 +0800 Subject: [PATCH] Add AbpAsyncTimer. --- .../Abp/BackgroundJobs/BackgroundJobWorker.cs | 4 +- .../Quartz/QuartzBackgroundWorkerAdapter.cs | 2 +- .../AsyncPeriodicBackgroundWorkerBase.cs | 11 +- .../PeriodicBackgroundWorkerBase.cs | 5 +- .../Abp/RabbitMQ/RabbitMqMessageConsumer.cs | 55 +++++--- .../Volo/Abp/Threading/AbpAsyncTimer.cs | 127 ++++++++++++++++++ .../Volo/Abp/Threading/AbpTimer.cs | 2 +- 7 files changed, 171 insertions(+), 35 deletions(-) create mode 100644 framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpAsyncTimer.cs diff --git a/framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/BackgroundJobWorker.cs b/framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/BackgroundJobWorker.cs index 62e79010cc..2635294323 100644 --- a/framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/BackgroundJobWorker.cs +++ b/framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/BackgroundJobWorker.cs @@ -17,7 +17,7 @@ namespace Volo.Abp.BackgroundJobs protected AbpBackgroundJobWorkerOptions WorkerOptions { get; } public BackgroundJobWorker( - AbpTimer timer, + AbpAsyncTimer timer, IOptions jobOptions, IOptions workerOptions, IServiceScopeFactory serviceScopeFactory) @@ -113,4 +113,4 @@ namespace Volo.Abp.BackgroundJobs return nextTryDate; } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerAdapter.cs b/framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerAdapter.cs index e6a65a2865..970beea3f0 100644 --- a/framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerAdapter.cs +++ b/framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerAdapter.cs @@ -34,7 +34,7 @@ namespace Volo.Abp.BackgroundWorkers.Quartz throw new ArgumentException($"{nameof(worker)} type is different from the generic type"); } - var timer = (AbpTimer) worker.GetType().GetProperty("Timer", BindingFlags.Instance | BindingFlags.NonPublic)?.GetValue(worker); + var timer = (AbpAsyncTimer) worker.GetType().GetProperty("Timer", BindingFlags.Instance | BindingFlags.NonPublic)?.GetValue(worker); period = timer?.Period; } else diff --git a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AsyncPeriodicBackgroundWorkerBase.cs b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AsyncPeriodicBackgroundWorkerBase.cs index b766ca531a..094c6eb7eb 100644 --- a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AsyncPeriodicBackgroundWorkerBase.cs +++ b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AsyncPeriodicBackgroundWorkerBase.cs @@ -11,15 +11,15 @@ namespace Volo.Abp.BackgroundWorkers public abstract class AsyncPeriodicBackgroundWorkerBase : BackgroundWorkerBase { protected IServiceScopeFactory ServiceScopeFactory { get; } - protected AbpTimer Timer { get; } + protected AbpAsyncTimer Timer { get; } protected AsyncPeriodicBackgroundWorkerBase( - AbpTimer timer, + AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory) { ServiceScopeFactory = serviceScopeFactory; Timer = timer; - Timer.Elapsed += Timer_Elapsed; + Timer.Elapsed = Timer_Elapsed; } public async override Task StartAsync(CancellationToken cancellationToken = default) @@ -34,10 +34,9 @@ namespace Volo.Abp.BackgroundWorkers await base.StopAsync(cancellationToken); } - private void Timer_Elapsed(object sender, System.EventArgs e) + private async Task Timer_Elapsed(AbpAsyncTimer timer) { - // Discard the result - _ = DoWorkAsync(); + await DoWorkAsync(); } private async Task DoWorkAsync() diff --git a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerBase.cs b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerBase.cs index 1b2237099d..6ac757fef5 100644 --- a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerBase.cs +++ b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerBase.cs @@ -47,9 +47,8 @@ namespace Volo.Abp.BackgroundWorkers } catch (Exception ex) { - _ = scope.ServiceProvider - .GetRequiredService() - .NotifyAsync(new ExceptionNotificationContext(ex)); + var exceptionNotifier = scope.ServiceProvider.GetRequiredService(); + AsyncHelper.RunSync(() => exceptionNotifier.NotifyAsync(new ExceptionNotificationContext(ex))); Logger.LogException(ex); } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs index 728fbd9077..fb1b0c7fe2 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs @@ -20,7 +20,7 @@ namespace Volo.Abp.RabbitMQ protected IExceptionNotifier ExceptionNotifier { get; } - protected AbpTimer Timer { get; } + protected AbpAsyncTimer Timer { get; } protected ExchangeDeclareConfiguration Exchange { get; private set; } @@ -38,7 +38,7 @@ namespace Volo.Abp.RabbitMQ public RabbitMqMessageConsumer( IConnectionPool connectionPool, - AbpTimer timer, + AbpAsyncTimer timer, IExceptionNotifier exceptionNotifier) { ConnectionPool = connectionPool; @@ -50,7 +50,7 @@ namespace Volo.Abp.RabbitMQ Callbacks = new ConcurrentBag>(); Timer.Period = 5000; //5 sec. - Timer.Elapsed += Timer_Elapsed; + Timer.Elapsed = Timer_Elapsed; Timer.RunOnStart = true; } @@ -77,7 +77,7 @@ namespace Volo.Abp.RabbitMQ await TrySendQueueBindCommandsAsync(); } - protected virtual void TrySendQueueBindCommands() + protected virtual async Task TrySendQueueBindCommandsAsync() { try { @@ -119,40 +119,33 @@ namespace Volo.Abp.RabbitMQ catch (Exception ex) { Logger.LogException(ex, LogLevel.Warning); - _ = ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning); + await ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning); } } - protected virtual Task TrySendQueueBindCommandsAsync() - { - TrySendQueueBindCommands(); - return Task.CompletedTask; - } - public virtual void OnMessageReceived(Func callback) { Callbacks.Add(callback); } - protected virtual void Timer_Elapsed(object sender, EventArgs e) + protected virtual async Task Timer_Elapsed(AbpAsyncTimer timer) { if (Channel == null || Channel.IsOpen == false) { - TryCreateChannel(); - TrySendQueueBindCommands(); + await TryCreateChannelAsync(); + await TrySendQueueBindCommandsAsync(); } } - protected virtual void TryCreateChannel() + protected virtual async Task TryCreateChannelAsync() { - DisposeChannel(); + await DisposeChannelAsync(); try { var channel = ConnectionPool .Get(ConnectionName) .CreateModel(); - channel.ExchangeDeclare( exchange: Exchange.ExchangeName, type: Exchange.Type, @@ -172,7 +165,7 @@ namespace Volo.Abp.RabbitMQ var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, basicDeliverEventArgs) => { - await HandleIncomingMessage(channel, basicDeliverEventArgs); + await HandleIncomingMessageAsync(channel, basicDeliverEventArgs); }; channel.BasicConsume( @@ -186,11 +179,11 @@ namespace Volo.Abp.RabbitMQ catch (Exception ex) { Logger.LogException(ex, LogLevel.Warning); - _ = ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning); + await ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning); } } - protected virtual async Task HandleIncomingMessage(IModel channel, BasicDeliverEventArgs basicDeliverEventArgs) + protected virtual async Task HandleIncomingMessageAsync(IModel channel, BasicDeliverEventArgs basicDeliverEventArgs) { try { @@ -204,7 +197,25 @@ namespace Volo.Abp.RabbitMQ catch (Exception ex) { Logger.LogException(ex); - _ = ExceptionNotifier.NotifyAsync(ex); + await ExceptionNotifier.NotifyAsync(ex); + } + } + + protected virtual async Task DisposeChannelAsync() + { + if (Channel == null) + { + return; + } + + try + { + Channel.Dispose(); + } + catch (Exception ex) + { + Logger.LogException(ex, LogLevel.Warning); + await ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning); } } @@ -222,7 +233,7 @@ namespace Volo.Abp.RabbitMQ catch (Exception ex) { Logger.LogException(ex, LogLevel.Warning); - _ = ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning); + AsyncHelper.RunSync(() => ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning)); } } diff --git a/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpAsyncTimer.cs b/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpAsyncTimer.cs new file mode 100644 index 0000000000..8ce369ef66 --- /dev/null +++ b/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpAsyncTimer.cs @@ -0,0 +1,127 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Volo.Abp.DependencyInjection; +using Volo.Abp.ExceptionHandling; + +namespace Volo.Abp.Threading +{ + /// + /// A robust timer implementation that ensures no overlapping occurs. It waits exactly specified between ticks. + /// + public class AbpAsyncTimer : ITransientDependency + { + /// + /// This func is raised periodically according to Period of Timer. + /// + public Func Elapsed = _ => Task.CompletedTask; + + /// + /// Task period of timer (as milliseconds). + /// + public int Period { get; set; } + + /// + /// Indicates whether timer raises Elapsed event on Start method of Timer for once. + /// Default: False. + /// + public bool RunOnStart { get; set; } + + public ILogger Logger { get; set; } + + public IExceptionNotifier ExceptionNotifier { get; set; } + + private readonly Timer _taskTimer; + private volatile bool _performingTasks; + private volatile bool _isRunning; + + public AbpAsyncTimer() + { + ExceptionNotifier = NullExceptionNotifier.Instance; + Logger = NullLogger.Instance; + + _taskTimer = new Timer( + TimerCallBack, + null, + Timeout.Infinite, + Timeout.Infinite + ); + } + + public void Start(CancellationToken cancellationToken = default) + { + if (Period <= 0) + { + throw new AbpException("Period should be set before starting the timer!"); + } + + lock (_taskTimer) + { + _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite); + _isRunning = true; + } + } + + public void Stop(CancellationToken cancellationToken = default) + { + lock (_taskTimer) + { + _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); + while (_performingTasks) + { + Monitor.Wait(_taskTimer); + } + + _isRunning = false; + } + } + + /// + /// This method is called by _taskTimer. + /// + /// Not used argument + private void TimerCallBack(object state) + { + lock (_taskTimer) + { + if (!_isRunning || _performingTasks) + { + return; + } + + _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); + _performingTasks = true; + } + + _ = Timer_Elapsed(); + } + + private async Task Timer_Elapsed() + { + try + { + await Elapsed(this); + } + catch(Exception ex) + { + Logger.LogException(ex); + await ExceptionNotifier.NotifyAsync(ex); + } + finally + { + lock (_taskTimer) + { + _performingTasks = false; + if (_isRunning) + { + _taskTimer.Change(Period, Timeout.Infinite); + } + + Monitor.Pulse(_taskTimer); + } + } + } + } +} diff --git a/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpTimer.cs b/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpTimer.cs index f3439d383d..e7e6efd5de 100644 --- a/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpTimer.cs +++ b/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpTimer.cs @@ -101,7 +101,7 @@ namespace Volo.Abp.Threading catch(Exception ex) { Logger.LogException(ex); - _ = ExceptionNotifier.NotifyAsync(ex); + AsyncHelper.RunSync(() => ExceptionNotifier.NotifyAsync(ex)); } finally {