Added cancellation tokens

pull/10008/head
Halil İbrahim Kalkan 4 years ago
parent 8271fc98a6
commit 79015b0e4e

@ -1,3 +1,4 @@
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Distributed;
@ -5,7 +6,7 @@ namespace Volo.Abp.EventBus.Boxes
{ {
public interface IOutboxSender public interface IOutboxSender
{ {
Task StartAsync(OutboxConfig outboxConfig); Task StartAsync(OutboxConfig outboxConfig, CancellationToken cancellationToken = default);
Task StopAsync(); Task StopAsync(CancellationToken cancellationToken = default);
} }
} }

@ -1,4 +1,5 @@
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Medallion.Threading; using Medallion.Threading;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@ -37,17 +38,17 @@ namespace Volo.Abp.EventBus.Boxes
Logger = NullLogger<OutboxSender>.Instance; Logger = NullLogger<OutboxSender>.Instance;
} }
public virtual Task StartAsync(OutboxConfig outboxConfig) public virtual Task StartAsync(OutboxConfig outboxConfig, CancellationToken cancellationToken = default)
{ {
OutboxConfig = outboxConfig; OutboxConfig = outboxConfig;
Outbox = (IEventOutbox)ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); Outbox = (IEventOutbox)ServiceProvider.GetRequiredService(outboxConfig.ImplementationType);
Timer.Start(); Timer.Start(cancellationToken);
return Task.CompletedTask; return Task.CompletedTask;
} }
public virtual Task StopAsync() public virtual Task StopAsync(CancellationToken cancellationToken = default)
{ {
Timer.Stop(); Timer.Stop(cancellationToken);
return Task.CompletedTask; return Task.CompletedTask;
} }
@ -85,12 +86,11 @@ namespace Volo.Abp.EventBus.Boxes
Logger.LogInformation($"Sent the event to the message broker with id = {waitingEvent.Id:N}"); Logger.LogInformation($"Sent the event to the message broker with id = {waitingEvent.Id:N}");
} }
} }
await Task.Delay(30000);
} }
else else
{ {
Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName);
await Task.Delay(7000); //TODO: Can we pass a cancellation token to cancel on shutdown? (Config?)
} }
} }
} }

Loading…
Cancel
Save