Fast stop on application stop.

pull/10008/head
Halil İbrahim Kalkan 4 years ago
parent 8114a628ac
commit 2db1a1cf91

@ -28,6 +28,8 @@ namespace Volo.Abp.EventBus.Boxes
protected string DistributedLockName => "Inbox_" + InboxConfig.Name;
public ILogger<InboxProcessor> Logger { get; set; }
protected CancellationTokenSource StoppingTokenSource { get; }
protected CancellationToken StoppingToken { get; }
public InboxProcessor(
IServiceProvider serviceProvider,
@ -46,6 +48,8 @@ namespace Volo.Abp.EventBus.Boxes
Timer.Period = 2000; //TODO: Config?
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<InboxProcessor>.Instance;
StoppingTokenSource = new CancellationTokenSource();
StoppingToken = StoppingTokenSource.Token;
}
private async Task TimerOnElapsed(AbpAsyncTimer arg)
@ -63,13 +67,20 @@ namespace Volo.Abp.EventBus.Boxes
public Task StopAsync(CancellationToken cancellationToken = default)
{
StoppingTokenSource.Cancel();
Timer.Stop(cancellationToken);
StoppingTokenSource.Dispose();
return Task.CompletedTask;
}
protected virtual async Task RunAsync()
{
await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName))
if (StoppingToken.IsCancellationRequested)
{
return;
}
await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken))
{
if (handle != null)
{
@ -79,7 +90,7 @@ namespace Volo.Abp.EventBus.Boxes
while (true)
{
var waitingEvents = await Inbox.GetWaitingEventsAsync(1000); //TODO: Config?
var waitingEvents = await Inbox.GetWaitingEventsAsync(1000); //TODO: Config? Pass StoppingToken!
if (waitingEvents.Count <= 0)
{
break;
@ -107,7 +118,7 @@ namespace Volo.Abp.EventBus.Boxes
else
{
Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName);
await Task.Delay(7000); //TODO: Can we pass a cancellation token to cancel on shutdown? (Config?)
await TaskDelayHelper.DelayAsync(15000, StoppingToken); //TODO: Config?
}
}
}

@ -21,6 +21,9 @@ namespace Volo.Abp.EventBus.Boxes
protected OutboxConfig OutboxConfig { get; private set; }
protected string DistributedLockName => "Outbox_" + OutboxConfig.Name;
public ILogger<OutboxSender> Logger { get; set; }
protected CancellationTokenSource StoppingTokenSource { get; }
protected CancellationToken StoppingToken { get; }
public OutboxSender(
IServiceProvider serviceProvider,
@ -35,6 +38,8 @@ namespace Volo.Abp.EventBus.Boxes
Timer.Period = 2000; //TODO: Config?
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<OutboxSender>.Instance;
StoppingTokenSource = new CancellationTokenSource();
StoppingToken = StoppingTokenSource.Token;
}
public virtual Task StartAsync(OutboxConfig outboxConfig, CancellationToken cancellationToken = default)
@ -47,7 +52,9 @@ namespace Volo.Abp.EventBus.Boxes
public virtual Task StopAsync(CancellationToken cancellationToken = default)
{
StoppingTokenSource.Cancel();
Timer.Stop(cancellationToken);
StoppingTokenSource.Dispose();
return Task.CompletedTask;
}
@ -62,8 +69,6 @@ namespace Volo.Abp.EventBus.Boxes
{
if (handle != null)
{
Logger.LogDebug("Obtained the distributed lock: " + DistributedLockName);
while (true)
{
var waitingEvents = await Outbox.GetWaitingEventsAsync(1000); //TODO: Config?
@ -89,7 +94,7 @@ namespace Volo.Abp.EventBus.Boxes
else
{
Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName);
await Task.Delay(7000); //TODO: Can we pass a cancellation token to cancel on shutdown? (Config?)
await TaskDelayHelper.DelayAsync(15000, StoppingToken); //TODO: Config?
}
}
}

@ -0,0 +1,20 @@
using System.Threading;
using System.Threading.Tasks;
namespace Volo.Abp.EventBus.Boxes
{
internal static class TaskDelayHelper
{
public static async Task DelayAsync(int milliseconds, CancellationToken cancellationToken = default)
{
try
{
await Task.Delay(milliseconds, cancellationToken);
}
catch (TaskCanceledException)
{
return;
}
}
}
}

@ -14,7 +14,7 @@ namespace DistDemoApp
typeof(AbpAutofacModule),
typeof(AbpDddDomainModule),
typeof(AbpEventBusBoxesModule)
)]
)]
public class DistDemoAppSharedModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)

Loading…
Cancel
Save