pull/10159/head
liangshiwei 3 years ago
parent 28da0b837c
commit 3ee9f57c9d

@ -17,7 +17,7 @@
<ItemGroup>
<ProjectReference Include="..\Volo.Abp.Auditing\Volo.Abp.Auditing.csproj" />
<ProjectReference Include="..\Volo.Abp.Data\Volo.Abp.Data.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus.Boxes\Volo.Abp.EventBus.Boxes.csproj" />
<ProjectReference Include="..\Volo.Abp.ExceptionHandling\Volo.Abp.ExceptionHandling.csproj" />
<ProjectReference Include="..\Volo.Abp.Guids\Volo.Abp.Guids.csproj" />
<ProjectReference Include="..\Volo.Abp.MultiTenancy\Volo.Abp.MultiTenancy.csproj" />

@ -3,6 +3,7 @@ using Volo.Abp.Auditing;
using Volo.Abp.Data;
using Volo.Abp.Domain.Repositories;
using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.ExceptionHandling;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
@ -18,7 +19,7 @@ namespace Volo.Abp.Domain
[DependsOn(
typeof(AbpAuditingModule),
typeof(AbpDataModule),
typeof(AbpEventBusModule),
typeof(AbpEventBusBoxesModule),
typeof(AbpGuidsModule),
typeof(AbpMultiTenancyModule),
typeof(AbpThreadingModule),

@ -2,6 +2,7 @@
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
@ -14,7 +15,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
public OracleDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions) : base(dbContextProvider, clock, distributedEventsOptions)
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
: base(dbContextProvider, clock, eventBusBoxesOptions)
{
}
@ -33,7 +35,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var timeToKeepEvents = Clock.Now.Add(- EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents);
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')";
await dbContext.Database.ExecuteSqlRawAsync(sql);

@ -2,7 +2,7 @@
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
@ -14,7 +14,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
public OracleDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions) : base(dbContextProvider, clock, distributedEventsOptions)
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
: base(dbContextProvider, clock, eventBusBoxesOptions)
{
}
@ -33,7 +34,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')";
await dbContext.Database.ExecuteSqlRawAsync(sql);

@ -2,6 +2,7 @@
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
@ -14,8 +15,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
public PostgreSqlDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions)
: base(dbContextProvider, clock, distributedEventsOptions)
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
: base(dbContextProvider, clock, eventBusBoxesOptions)
{
}
@ -34,7 +35,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < '{timeToKeepEvents}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);

@ -2,10 +2,8 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Volo.Abp.Domain;
using Volo.Abp.EntityFrameworkCore.DependencyInjection;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Modularity;
using Volo.Abp.Uow;
using Volo.Abp.Uow.EntityFrameworkCore;
namespace Volo.Abp.EntityFrameworkCore

@ -5,6 +5,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
@ -15,17 +16,17 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
where TDbContext : IHasEventInbox
{
protected IDbContextProvider<TDbContext> DbContextProvider { get; }
protected AbpDistributedEventBusOptions DistributedEventsOptions { get; }
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected IClock Clock { get; }
public DbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions)
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
{
DbContextProvider = dbContextProvider;
Clock = clock;
DistributedEventsOptions = distributedEventsOptions.Value;
EventBusBoxesOptions = eventBusBoxesOptions.Value;
}
[UnitOfWork]
@ -78,7 +79,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
public virtual async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
var oldEvents = await dbContext.IncomingEvents
.Where(x => x.Processed && x.CreationTime < timeToKeepEvents)
.ToListAsync();

@ -2,7 +2,7 @@
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
@ -14,8 +14,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
public SqlRawDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions)
: base(dbContextProvider, clock, distributedEventsOptions)
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
: base(dbContextProvider, clock, eventBusBoxesOptions)
{
}
@ -34,7 +34,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
var sql = $"DELETE FROM {tableName} WHERE Processed = '1' AND CreationTime < '{timeToKeepEvents}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);

@ -26,10 +26,14 @@ namespace Volo.Abp.EventBus.Boxes
public TimeSpan PeriodTimeSpan { get; set; }
/// <summary>
/// Delay time of <see cref="InboxProcessor"/> and <see cref="OutboxSender"/>
/// Default: 15 seconds
/// </summary>
public TimeSpan DelayTimeSpan { get; set; }
public TimeSpan DistributedLockWaitDuration { get; set; }
/// <summary>
/// Default: 2 hours
/// </summary>
public TimeSpan WaitTimeToDeleteProcessedInboxEvents { get; set; }
public AbpEventBusBoxesOptions()
{
@ -37,7 +41,8 @@ namespace Volo.Abp.EventBus.Boxes
InboxWaitingEventMaxCount = 1000;
OutboxWaitingEventMaxCount = 1000;
PeriodTimeSpan = TimeSpan.FromSeconds(2);
DelayTimeSpan = TimeSpan.FromSeconds(15);
DistributedLockWaitDuration = TimeSpan.FromSeconds(15);
WaitTimeToDeleteProcessedInboxEvents = TimeSpan.FromHours(2);
}
}
}

@ -49,7 +49,7 @@ namespace Volo.Abp.EventBus.Boxes
UnitOfWorkManager = unitOfWorkManager;
Clock = clock;
EventBusBoxesOptions = eventBusBoxesOptions.Value;
Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Seconds;
Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds;
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<InboxProcessor>.Instance;
StoppingTokenSource = new CancellationTokenSource();
@ -120,21 +120,21 @@ namespace Volo.Abp.EventBus.Boxes
else
{
Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName);
await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DelayTimeSpan.Milliseconds, StoppingToken);
await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken);
}
}
}
protected virtual async Task DeleteOldEventsAsync()
{
if (LastCleanTime != null && LastCleanTime > Clock.Now.Add(EventBusBoxesOptions.CleanOldEventTimeIntervalSpan))
if (LastCleanTime != null && LastCleanTime + EventBusBoxesOptions.CleanOldEventTimeIntervalSpan > Clock.Now)
{
return;
}
await Inbox.DeleteOldEventsAsync();
LastCleanTime = DateTime.Now;
LastCleanTime = Clock.Now;
}
}
}

@ -39,7 +39,7 @@ namespace Volo.Abp.EventBus.Boxes
DistributedEventBus = distributedEventBus;
DistributedLockProvider = distributedLockProvider;
EventBusBoxesOptions = eventBusBoxesOptions.Value;
Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Seconds;
Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds;
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<OutboxSender>.Instance;
StoppingTokenSource = new CancellationTokenSource();
@ -100,7 +100,7 @@ namespace Volo.Abp.EventBus.Boxes
else
{
Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName);
await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DelayTimeSpan.Milliseconds, StoppingToken);
await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken);
}
}
}

@ -1,4 +1,3 @@
using System;
using Volo.Abp.Collections;
namespace Volo.Abp.EventBus.Distributed
@ -10,18 +9,11 @@ namespace Volo.Abp.EventBus.Distributed
public OutboxConfigDictionary Outboxes { get; }
public InboxConfigDictionary Inboxes { get; }
/// <summary>
/// Default: -2 hours
/// </summary>
public TimeSpan InboxKeepEventTimeSpan { get; set; }
public AbpDistributedEventBusOptions()
{
Handlers = new TypeList<IEventHandler>();
Outboxes = new OutboxConfigDictionary();
Inboxes = new InboxConfigDictionary();
InboxKeepEventTimeSpan = TimeSpan.FromHours(-2);
}
}
}

@ -4,6 +4,6 @@ namespace Volo.Abp.MongoDB.DistributedEvents
{
public interface IHasEventInbox : IAbpMongoDbContext
{
IMongoCollection<IncomingEventRecord> IncomingEvents { get; set; }
IMongoCollection<IncomingEventRecord> IncomingEvents { get; }
}
}
}

@ -4,6 +4,6 @@ namespace Volo.Abp.MongoDB.DistributedEvents
{
public interface IHasEventOutbox : IAbpMongoDbContext
{
IMongoCollection<OutgoingEventRecord> OutgoingEvents { get; set; }
IMongoCollection<OutgoingEventRecord> OutgoingEvents { get; }
}
}
}

@ -6,6 +6,7 @@ using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
@ -16,17 +17,17 @@ namespace Volo.Abp.MongoDB.DistributedEvents
where TMongoDbContext : IHasEventInbox
{
protected IMongoDbContextProvider<TMongoDbContext> DbContextProvider { get; }
protected AbpDistributedEventBusOptions DistributedEventsOptions { get; }
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected IClock Clock { get; }
public MongoDbContextEventInbox(
IMongoDbContextProvider<TMongoDbContext> dbContextProvider,
IClock clock,
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions)
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
{
DbContextProvider = dbContextProvider;
Clock = clock;
DistributedEventsOptions = distributedEventsOptions.Value;
EventBusBoxesOptions = eventBusBoxesOptions.Value;
}
@ -96,7 +97,7 @@ namespace Volo.Abp.MongoDB.DistributedEvents
public virtual async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
if (dbContext.SessionHandle != null)
{

@ -11,16 +11,9 @@ namespace DistDemoApp
public IMongoCollection<TodoItem> TodoItems => Collection<TodoItem>();
public IMongoCollection<TodoSummary> TodoSummaries => Collection<TodoSummary>();
public IMongoCollection<OutgoingEventRecord> OutgoingEvents
{
get => Collection<OutgoingEventRecord>();
set {}
}
public IMongoCollection<IncomingEventRecord> IncomingEvents
{
get => Collection<IncomingEventRecord>();
set {}
}
public IMongoCollection<OutgoingEventRecord> OutgoingEvents => Collection<OutgoingEventRecord>();
public IMongoCollection<IncomingEventRecord> IncomingEvents => Collection<IncomingEventRecord>();
}
}

Loading…
Cancel
Save