Events should be published and handled in the right order.

pull/9909/head
Halil İbrahim Kalkan 4 years ago
parent b966ec7717
commit 258cf783f7

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using Volo.Abp.Uow;
namespace Volo.Abp.Domain.Entities
{
@ -9,15 +10,15 @@ namespace Volo.Abp.Domain.Entities
IAggregateRoot,
IGeneratesDomainEvents
{
private readonly ICollection<object> _distributedEvents = new Collection<object>();
private readonly ICollection<object> _localEvents = new Collection<object>();
private readonly ICollection<DomainEventRecord> _distributedEvents = new Collection<DomainEventRecord>();
private readonly ICollection<DomainEventRecord> _localEvents = new Collection<DomainEventRecord>();
public virtual IEnumerable<object> GetLocalEvents()
public virtual IEnumerable<DomainEventRecord> GetLocalEvents()
{
return _localEvents;
}
public virtual IEnumerable<object> GetDistributedEvents()
public virtual IEnumerable<DomainEventRecord> GetDistributedEvents()
{
return _distributedEvents;
}
@ -34,12 +35,12 @@ namespace Volo.Abp.Domain.Entities
protected virtual void AddLocalEvent(object eventData)
{
_localEvents.Add(eventData);
_localEvents.Add(new DomainEventRecord(eventData, EventOrderGenerator.GetNext()));
}
protected virtual void AddDistributedEvent(object eventData)
{
_distributedEvents.Add(eventData);
_distributedEvents.Add(new DomainEventRecord(eventData, EventOrderGenerator.GetNext()));
}
}
@ -48,8 +49,8 @@ namespace Volo.Abp.Domain.Entities
IAggregateRoot<TKey>,
IGeneratesDomainEvents
{
private readonly ICollection<object> _distributedEvents = new Collection<object>();
private readonly ICollection<object> _localEvents = new Collection<object>();
private readonly ICollection<DomainEventRecord> _distributedEvents = new Collection<DomainEventRecord>();
private readonly ICollection<DomainEventRecord> _localEvents = new Collection<DomainEventRecord>();
protected BasicAggregateRoot()
{
@ -62,12 +63,12 @@ namespace Volo.Abp.Domain.Entities
}
public virtual IEnumerable<object> GetLocalEvents()
public virtual IEnumerable<DomainEventRecord> GetLocalEvents()
{
return _localEvents;
}
public virtual IEnumerable<object> GetDistributedEvents()
public virtual IEnumerable<DomainEventRecord> GetDistributedEvents()
{
return _distributedEvents;
}
@ -84,12 +85,12 @@ namespace Volo.Abp.Domain.Entities
protected virtual void AddLocalEvent(object eventData)
{
_localEvents.Add(eventData);
_localEvents.Add(new DomainEventRecord(eventData, EventOrderGenerator.GetNext()));
}
protected virtual void AddDistributedEvent(object eventData)
{
_distributedEvents.Add(eventData);
_distributedEvents.Add(new DomainEventRecord(eventData, EventOrderGenerator.GetNext()));
}
}
}

@ -0,0 +1,15 @@
namespace Volo.Abp.Domain.Entities
{
public class DomainEventRecord
{
public object EventData { get; }
public long EventOrder { get; }
public DomainEventRecord(object eventData, long eventOrder)
{
EventData = eventData;
EventOrder = eventOrder;
}
}
}

@ -8,11 +8,14 @@ namespace Volo.Abp.Domain.Entities.Events
public object SourceEntity { get; }
public object EventData { get; }
public long EventOrder { get; }
public DomainEventEntry(object sourceEntity, object eventData)
public DomainEventEntry(object sourceEntity, object eventData, long eventOrder)
{
SourceEntity = sourceEntity;
EventData = eventData;
EventOrder = eventOrder;
}
}
}

@ -1,10 +1,8 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.Auditing;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities.Events.Distributed;
using Volo.Abp.DynamicProxy;
@ -44,16 +42,9 @@ namespace Volo.Abp.Domain.Entities.Events
Logger = NullLogger<EntityChangeEventHelper>.Instance;
}
public async Task TriggerEventsAsync(EntityChangeReport changeReport)
public virtual void PublishEntityCreatingEvent(object entity)
{
await TriggerEntityChangeEvents(changeReport.ChangedEntities);
await TriggerLocalEvents(changeReport.DomainEvents);
await TriggerDistributedEvents(changeReport.DistributedEvents);
}
public virtual async Task TriggerEntityCreatingEventAsync(object entity)
{
await TriggerEventWithEntity(
TriggerEventWithEntity(
LocalEventBus,
typeof(EntityCreatingEventData<>),
entity,
@ -61,9 +52,9 @@ namespace Volo.Abp.Domain.Entities.Events
);
}
public virtual async Task TriggerEntityCreatedEventAsync(object entity)
public virtual void PublishEntityCreatedEvent(object entity)
{
await TriggerEventWithEntity(
TriggerEventWithEntity(
LocalEventBus,
typeof(EntityCreatedEventData<>),
entity,
@ -75,7 +66,7 @@ namespace Volo.Abp.Domain.Entities.Events
var eto = EntityToEtoMapper.Map(entity);
if (eto != null)
{
await TriggerEventWithEntity(
TriggerEventWithEntity(
DistributedEventBus,
typeof(EntityCreatedEto<>),
eto,
@ -96,9 +87,9 @@ namespace Volo.Abp.Domain.Entities.Events
);
}
public virtual async Task TriggerEntityUpdatingEventAsync(object entity)
public virtual void PublishEntityUpdatingEvent(object entity)
{
await TriggerEventWithEntity(
TriggerEventWithEntity(
LocalEventBus,
typeof(EntityUpdatingEventData<>),
entity,
@ -106,9 +97,9 @@ namespace Volo.Abp.Domain.Entities.Events
);
}
public virtual async Task TriggerEntityUpdatedEventAsync(object entity)
public virtual void PublishEntityUpdatedEvent(object entity)
{
await TriggerEventWithEntity(
TriggerEventWithEntity(
LocalEventBus,
typeof(EntityUpdatedEventData<>),
entity,
@ -120,7 +111,7 @@ namespace Volo.Abp.Domain.Entities.Events
var eto = EntityToEtoMapper.Map(entity);
if (eto != null)
{
await TriggerEventWithEntity(
TriggerEventWithEntity(
DistributedEventBus,
typeof(EntityUpdatedEto<>),
eto,
@ -130,9 +121,9 @@ namespace Volo.Abp.Domain.Entities.Events
}
}
public virtual async Task TriggerEntityDeletingEventAsync(object entity)
public virtual void PublishEntityDeletingEvent(object entity)
{
await TriggerEventWithEntity(
TriggerEventWithEntity(
LocalEventBus,
typeof(EntityDeletingEventData<>),
entity,
@ -140,9 +131,9 @@ namespace Volo.Abp.Domain.Entities.Events
);
}
public virtual async Task TriggerEntityDeletedEventAsync(object entity)
public virtual void PublishEntityDeletedEvent(object entity)
{
await TriggerEventWithEntity(
TriggerEventWithEntity(
LocalEventBus,
typeof(EntityDeletedEventData<>),
entity,
@ -154,7 +145,7 @@ namespace Volo.Abp.Domain.Entities.Events
var eto = EntityToEtoMapper.Map(entity);
if (eto != null)
{
await TriggerEventWithEntity(
TriggerEventWithEntity(
DistributedEventBus,
typeof(EntityDeletedEto<>),
eto,
@ -164,48 +155,7 @@ namespace Volo.Abp.Domain.Entities.Events
}
}
protected virtual async Task TriggerEntityChangeEvents(List<EntityChangeEntry> changedEntities)
{
foreach (var changedEntity in changedEntities)
{
switch (changedEntity.ChangeType)
{
case EntityChangeType.Created:
await TriggerEntityCreatingEventAsync(changedEntity.Entity);
await TriggerEntityCreatedEventAsync(changedEntity.Entity);
break;
case EntityChangeType.Updated:
await TriggerEntityUpdatingEventAsync(changedEntity.Entity);
await TriggerEntityUpdatedEventAsync(changedEntity.Entity);
break;
case EntityChangeType.Deleted:
await TriggerEntityDeletingEventAsync(changedEntity.Entity);
await TriggerEntityDeletedEventAsync(changedEntity.Entity);
break;
default:
throw new AbpException("Unknown EntityChangeType: " + changedEntity.ChangeType);
}
}
}
protected virtual async Task TriggerLocalEvents(List<DomainEventEntry> localEvents)
{
foreach (var localEvent in localEvents)
{
await LocalEventBus.PublishAsync(localEvent.EventData.GetType(), localEvent.EventData);
}
}
protected virtual async Task TriggerDistributedEvents(List<DomainEventEntry> distributedEvents)
{
foreach (var distributedEvent in distributedEvents)
{
await DistributedEventBus.PublishAsync(distributedEvent.EventData.GetType(),
distributedEvent.EventData);
}
}
protected virtual async Task TriggerEventWithEntity(
protected virtual void TriggerEventWithEntity(
IEventBus eventPublisher,
Type genericEventType,
object entityOrEto,
@ -218,16 +168,11 @@ namespace Volo.Abp.Domain.Entities.Events
if (currentUow == null)
{
await eventPublisher.PublishAsync(
eventType,
eventData,
onUnitOfWorkComplete: false
);
Logger.LogWarning("UnitOfWorkManager.Current is null! Can not publish the event.");
return;
}
var eventRecord = new UnitOfWorkEventRecord(eventType, eventData)
var eventRecord = new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext())
{
Properties =
{

@ -1,32 +0,0 @@
using System.Collections.Generic;
namespace Volo.Abp.Domain.Entities.Events
{
public class EntityChangeReport
{
public List<EntityChangeEntry> ChangedEntities { get; }
public List<DomainEventEntry> DomainEvents { get; }
public List<DomainEventEntry> DistributedEvents { get; }
public EntityChangeReport()
{
ChangedEntities = new List<EntityChangeEntry>();
DomainEvents = new List<DomainEventEntry>();
DistributedEvents = new List<DomainEventEntry>();
}
public bool IsEmpty()
{
return ChangedEntities.Count <= 0 &&
DomainEvents.Count <= 0 &&
DistributedEvents.Count <= 0;
}
public override string ToString()
{
return $"[EntityChangeReport] ChangedEntities: {ChangedEntities.Count}, DomainEvents: {DomainEvents.Count}, DistributedEvents: {DistributedEvents.Count}";
}
}
}

@ -0,0 +1,22 @@
using System.Collections.Generic;
namespace Volo.Abp.Domain.Entities.Events
{
public class EntityEventReport
{
public List<DomainEventEntry> DomainEvents { get; }
public List<DomainEventEntry> DistributedEvents { get; }
public EntityEventReport()
{
DomainEvents = new List<DomainEventEntry>();
DistributedEvents = new List<DomainEventEntry>();
}
public override string ToString()
{
return $"[{nameof(EntityEventReport)}] DomainEvents: {DomainEvents.Count}, DistributedEvents: {DistributedEvents.Count}";
}
}
}

@ -1,5 +1,3 @@
using System.Threading.Tasks;
namespace Volo.Abp.Domain.Entities.Events
{
/// <summary>
@ -7,15 +5,13 @@ namespace Volo.Abp.Domain.Entities.Events
/// </summary>
public interface IEntityChangeEventHelper
{
Task TriggerEventsAsync(EntityChangeReport changeReport);
Task TriggerEntityCreatingEventAsync(object entity);
Task TriggerEntityCreatedEventAsync(object entity);
void PublishEntityCreatingEvent(object entity);
void PublishEntityCreatedEvent(object entity);
Task TriggerEntityUpdatingEventAsync(object entity);
Task TriggerEntityUpdatedEventAsync(object entity);
void PublishEntityUpdatingEvent(object entity);
void PublishEntityUpdatedEvent(object entity);
Task TriggerEntityDeletingEventAsync(object entity);
Task TriggerEntityDeletedEventAsync(object entity);
void PublishEntityDeletingEvent(object entity);
void PublishEntityDeletedEvent(object entity);
}
}

@ -1,5 +1,3 @@
using System.Threading.Tasks;
namespace Volo.Abp.Domain.Entities.Events
{
/// <summary>
@ -14,42 +12,30 @@ namespace Volo.Abp.Domain.Entities.Events
private NullEntityChangeEventHelper()
{
}
public Task TriggerEntityCreatingEventAsync(object entity)
{
return Task.CompletedTask;
}
public Task TriggerEntityCreatedEventAsync(object entity)
public void PublishEntityCreatingEvent(object entity)
{
return Task.CompletedTask;
}
public Task TriggerEntityUpdatingEventAsync(object entity)
public void PublishEntityCreatedEvent(object entity)
{
return Task.CompletedTask;
}
public Task TriggerEntityUpdatedEventAsync(object entity)
public void PublishEntityUpdatingEvent(object entity)
{
return Task.CompletedTask;
}
public Task TriggerEntityDeletingEventAsync(object entity)
public void PublishEntityUpdatedEvent(object entity)
{
return Task.CompletedTask;
}
public Task TriggerEntityDeletedEventAsync(object entity)
public void PublishEntityDeletingEvent(object entity)
{
return Task.CompletedTask;
}
public Task TriggerEventsAsync(EntityChangeReport changeReport)
public void PublishEntityDeletedEvent(object entity)
{
return Task.CompletedTask;
}
}
}

@ -6,9 +6,9 @@ namespace Volo.Abp.Domain.Entities
public interface IGeneratesDomainEvents
{
IEnumerable<object> GetLocalEvents();
IEnumerable<DomainEventRecord> GetLocalEvents();
IEnumerable<object> GetDistributedEvents();
IEnumerable<DomainEventRecord> GetDistributedEvents();
void ClearLocalEvents();

@ -22,6 +22,8 @@ using Volo.Abp.Domain.Repositories;
using Volo.Abp.EntityFrameworkCore.EntityHistory;
using Volo.Abp.EntityFrameworkCore.Modeling;
using Volo.Abp.EntityFrameworkCore.ValueConverters;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Local;
using Volo.Abp.Guids;
using Volo.Abp.MultiTenancy;
using Volo.Abp.ObjectExtending;
@ -59,6 +61,10 @@ namespace Volo.Abp.EntityFrameworkCore
public IUnitOfWorkManager UnitOfWorkManager => LazyServiceProvider.LazyGetRequiredService<IUnitOfWorkManager>();
public IClock Clock => LazyServiceProvider.LazyGetRequiredService<IClock>();
public IDistributedEventBus DistributedEventBus => LazyServiceProvider.LazyGetRequiredService<IDistributedEventBus>();
public ILocalEventBus LocalEventBus => LazyServiceProvider.LazyGetRequiredService<ILocalEventBus>();
public ILogger<AbpDbContext<TDbContext>> Logger => LazyServiceProvider.LazyGetService<ILogger<AbpDbContext<TDbContext>>>(NullLogger<AbpDbContext<TDbContext>>.Instance);
@ -150,20 +156,21 @@ namespace Volo.Abp.EntityFrameworkCore
try
{
var auditLog = AuditingManager?.Current?.Log;
List<EntityChangeInfo> entityChangeList = null;
if (auditLog != null)
{
entityChangeList = EntityHistoryHelper.CreateChangeList(ChangeTracker.Entries().ToList());
}
var changeReport = ApplyAbpConcepts();
ApplyAbpConcepts();
var eventReport = CreateEventReport();
var result = await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
await EntityChangeEventHelper.TriggerEventsAsync(changeReport);
if (auditLog != null)
PublishEntityEvents(eventReport);
if (entityChangeList != null)
{
EntityHistoryHelper.UpdateChangeList(entityChangeList);
auditLog.EntityChanges.AddRange(entityChangeList);
@ -182,6 +189,23 @@ namespace Volo.Abp.EntityFrameworkCore
}
}
private void PublishEntityEvents(EntityEventReport changeReport)
{
foreach (var localEvent in changeReport.DomainEvents)
{
UnitOfWorkManager.Current?.AddOrReplaceLocalEvent(
new UnitOfWorkEventRecord(localEvent.EventData.GetType(), localEvent.EventData, localEvent.EventOrder)
);
}
foreach (var distributedEvent in changeReport.DistributedEvents)
{
UnitOfWorkManager.Current?.AddOrReplaceDistributedEvent(
new UnitOfWorkEventRecord(distributedEvent.EventData.GetType(), distributedEvent.EventData, distributedEvent.EventOrder)
);
}
}
/// <summary>
/// This method will call the DbContext <see cref="SaveChangesAsync(bool, CancellationToken)"/> method directly of EF Core, which doesn't apply concepts of abp.
/// </summary>
@ -202,11 +226,18 @@ namespace Volo.Abp.EntityFrameworkCore
ChangeTracker.CascadeDeleteTiming = CascadeTiming.OnSaveChanges;
ChangeTracker.Tracked += ChangeTracker_Tracked;
ChangeTracker.StateChanged += ChangeTracker_StateChanged;
}
protected virtual void ChangeTracker_Tracked(object sender, EntityTrackedEventArgs e)
{
FillExtraPropertiesForTrackedEntities(e);
PublishEventsForTrackedEntity(e.Entry);
}
protected virtual void ChangeTracker_StateChanged(object sender, EntityStateChangedEventArgs e)
{
PublishEventsForTrackedEntity(e.Entry);
}
protected virtual void FillExtraPropertiesForTrackedEntities(EntityTrackedEventArgs e)
@ -254,37 +285,107 @@ namespace Volo.Abp.EntityFrameworkCore
}
}
}
protected virtual EntityChangeReport ApplyAbpConcepts()
private void PublishEventsForTrackedEntity(EntityEntry entry)
{
var changeReport = new EntityChangeReport();
switch (entry.State)
{
case EntityState.Added:
EntityChangeEventHelper.PublishEntityCreatingEvent(entry.Entity);
EntityChangeEventHelper.PublishEntityCreatedEvent(entry.Entity);
break;
case EntityState.Modified:
if (entry.Properties.Any(x => x.IsModified && x.Metadata.ValueGenerated == ValueGenerated.Never))
{
if (entry.Entity is ISoftDelete && entry.Entity.As<ISoftDelete>().IsDeleted)
{
EntityChangeEventHelper.PublishEntityDeletingEvent(entry.Entity);
EntityChangeEventHelper.PublishEntityDeletedEvent(entry.Entity);
}
else
{
EntityChangeEventHelper.PublishEntityUpdatingEvent(entry.Entity);
EntityChangeEventHelper.PublishEntityUpdatedEvent(entry.Entity);
}
}
break;
case EntityState.Deleted:
EntityChangeEventHelper.PublishEntityDeletingEvent(entry.Entity);
EntityChangeEventHelper.PublishEntityDeletedEvent(entry.Entity);
break;
}
}
protected virtual void ApplyAbpConcepts()
{
foreach (var entry in ChangeTracker.Entries().ToList())
{
ApplyAbpConcepts(entry, changeReport);
ApplyAbpConcepts(entry);
}
return changeReport;
}
protected virtual EntityEventReport CreateEventReport()
{
var eventReport = new EntityEventReport();
foreach (var entry in ChangeTracker.Entries().ToList())
{
var generatesDomainEventsEntity = entry.Entity as IGeneratesDomainEvents;
if (generatesDomainEventsEntity == null)
{
continue;
}
protected virtual void ApplyAbpConcepts(EntityEntry entry, EntityChangeReport changeReport)
var localEvents = generatesDomainEventsEntity.GetLocalEvents()?.ToArray();
if (localEvents != null && localEvents.Any())
{
eventReport.DomainEvents.AddRange(
localEvents.Select(
eventRecord => new DomainEventEntry(
entry.Entity,
eventRecord.EventData,
eventRecord.EventOrder
)
)
);
generatesDomainEventsEntity.ClearLocalEvents();
}
var distributedEvents = generatesDomainEventsEntity.GetDistributedEvents()?.ToArray();
if (distributedEvents != null && distributedEvents.Any())
{
eventReport.DistributedEvents.AddRange(
distributedEvents.Select(
eventRecord => new DomainEventEntry(
entry.Entity,
eventRecord.EventData,
eventRecord.EventOrder)
)
);
generatesDomainEventsEntity.ClearDistributedEvents();
}
}
return eventReport;
}
protected virtual void ApplyAbpConcepts(EntityEntry entry)
{
switch (entry.State)
{
case EntityState.Added:
ApplyAbpConceptsForAddedEntity(entry, changeReport);
ApplyAbpConceptsForAddedEntity(entry);
break;
case EntityState.Modified:
ApplyAbpConceptsForModifiedEntity(entry, changeReport);
ApplyAbpConceptsForModifiedEntity(entry);
break;
case EntityState.Deleted:
ApplyAbpConceptsForDeletedEntity(entry, changeReport);
ApplyAbpConceptsForDeletedEntity(entry);
break;
}
HandleExtraPropertiesOnSave(entry);
AddDomainEvents(changeReport, entry.Entity);
}
protected virtual void HandleExtraPropertiesOnSave(EntityEntry entry)
@ -361,15 +462,14 @@ namespace Volo.Abp.EntityFrameworkCore
}
}
protected virtual void ApplyAbpConceptsForAddedEntity(EntityEntry entry, EntityChangeReport changeReport)
protected virtual void ApplyAbpConceptsForAddedEntity(EntityEntry entry)
{
CheckAndSetId(entry);
SetConcurrencyStampIfNull(entry);
SetCreationAuditProperties(entry);
changeReport.ChangedEntities.Add(new EntityChangeEntry(entry.Entity, EntityChangeType.Created));
}
protected virtual void ApplyAbpConceptsForModifiedEntity(EntityEntry entry, EntityChangeReport changeReport)
protected virtual void ApplyAbpConceptsForModifiedEntity(EntityEntry entry)
{
if (entry.State == EntityState.Modified && entry.Properties.Any(x => x.IsModified && x.Metadata.ValueGenerated == ValueGenerated.Never))
{
@ -379,24 +479,17 @@ namespace Volo.Abp.EntityFrameworkCore
if (entry.Entity is ISoftDelete && entry.Entity.As<ISoftDelete>().IsDeleted)
{
SetDeletionAuditProperties(entry);
changeReport.ChangedEntities.Add(new EntityChangeEntry(entry.Entity, EntityChangeType.Deleted));
}
else
{
changeReport.ChangedEntities.Add(new EntityChangeEntry(entry.Entity, EntityChangeType.Updated));
}
}
}
protected virtual void ApplyAbpConceptsForDeletedEntity(EntityEntry entry, EntityChangeReport changeReport)
protected virtual void ApplyAbpConceptsForDeletedEntity(EntityEntry entry)
{
if (TryCancelDeletionForSoftDelete(entry))
{
UpdateConcurrencyStamp(entry);
SetDeletionAuditProperties(entry);
}
changeReport.ChangedEntities.Add(new EntityChangeEntry(entry.Entity, EntityChangeType.Deleted));
}
protected virtual bool IsHardDeleted(EntityEntry entry)
@ -410,29 +503,6 @@ namespace Volo.Abp.EntityFrameworkCore
return hardDeletedEntities.Contains(entry.Entity);
}
protected virtual void AddDomainEvents(EntityChangeReport changeReport, object entityAsObj)
{
var generatesDomainEventsEntity = entityAsObj as IGeneratesDomainEvents;
if (generatesDomainEventsEntity == null)
{
return;
}
var localEvents = generatesDomainEventsEntity.GetLocalEvents()?.ToArray();
if (localEvents != null && localEvents.Any())
{
changeReport.DomainEvents.AddRange(localEvents.Select(eventData => new DomainEventEntry(entityAsObj, eventData)));
generatesDomainEventsEntity.ClearLocalEvents();
}
var distributedEvents = generatesDomainEventsEntity.GetDistributedEvents()?.ToArray();
if (distributedEvents != null && distributedEvents.Any())
{
changeReport.DistributedEvents.AddRange(distributedEvents.Select(eventData => new DomainEventEntry(entityAsObj, eventData)));
generatesDomainEventsEntity.ClearDistributedEvents();
}
}
protected virtual void UpdateConcurrencyStamp(EntityEntry entry)
{
var entity = entry.Entity as IHasConcurrencyStamp;

@ -104,7 +104,7 @@ namespace Volo.Abp.EventBus
{
AddToUnitOfWork(
UnitOfWorkManager.Current,
new UnitOfWorkEventRecord(eventType, eventData)
new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext())
);
return;
}

@ -12,6 +12,7 @@ using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Local;
using Volo.Abp.Guids;
using Volo.Abp.MemoryDb;
using Volo.Abp.Uow;
namespace Volo.Abp.Domain.Repositories.MemoryDb
{
@ -65,7 +66,7 @@ namespace Volo.Abp.Domain.Repositories.MemoryDb
return ApplyDataFilters((await GetCollectionAsync()).AsQueryable());
}
protected virtual async Task TriggerDomainEventsAsync(object entity)
protected virtual void TriggerDomainEvents(object entity)
{
var generatesDomainEventsEntity = entity as IGeneratesDomainEvents;
if (generatesDomainEventsEntity == null)
@ -78,7 +79,7 @@ namespace Volo.Abp.Domain.Repositories.MemoryDb
{
foreach (var localEvent in localEvents)
{
await LocalEventBus.PublishAsync(localEvent.GetType(), localEvent);
UnitOfWorkManager.Current?.AddOrReplaceLocalEvent(new UnitOfWorkEventRecord(localEvent.GetType(), localEvent, EventOrderGenerator.GetNext()));
}
generatesDomainEventsEntity.ClearLocalEvents();
@ -89,7 +90,7 @@ namespace Volo.Abp.Domain.Repositories.MemoryDb
{
foreach (var distributedEvent in distributedEvents)
{
await DistributedEventBus.PublishAsync(distributedEvent.GetType(), distributedEvent);
UnitOfWorkManager.Current?.AddOrReplaceDistributedEvent(new UnitOfWorkEventRecord(distributedEvent.GetType(), distributedEvent, EventOrderGenerator.GetNext()));
}
generatesDomainEventsEntity.ClearDistributedEvents();
@ -143,37 +144,37 @@ namespace Volo.Abp.Domain.Repositories.MemoryDb
AuditPropertySetter.SetDeletionProperties(entity);
}
protected virtual async Task TriggerEntityCreateEvents(TEntity entity)
protected virtual void TriggerEntityCreateEvents(TEntity entity)
{
await EntityChangeEventHelper.TriggerEntityCreatingEventAsync(entity);
await EntityChangeEventHelper.TriggerEntityCreatedEventAsync(entity);
EntityChangeEventHelper.PublishEntityCreatingEvent(entity);
EntityChangeEventHelper.PublishEntityCreatedEvent(entity);
}
protected virtual async Task TriggerEntityUpdateEventsAsync(TEntity entity)
protected virtual void TriggerEntityUpdateEvents(TEntity entity)
{
await EntityChangeEventHelper.TriggerEntityUpdatingEventAsync(entity);
await EntityChangeEventHelper.TriggerEntityUpdatedEventAsync(entity);
EntityChangeEventHelper.PublishEntityUpdatingEvent(entity);
EntityChangeEventHelper.PublishEntityUpdatedEvent(entity);
}
protected virtual async Task TriggerEntityDeleteEventsAsync(TEntity entity)
protected virtual void TriggerEntityDeleteEvents(TEntity entity)
{
await EntityChangeEventHelper.TriggerEntityDeletingEventAsync(entity);
await EntityChangeEventHelper.TriggerEntityDeletedEventAsync(entity);
EntityChangeEventHelper.PublishEntityDeletingEvent(entity);
EntityChangeEventHelper.PublishEntityDeletedEvent(entity);
}
protected virtual async Task ApplyAbpConceptsForAddedEntityAsync(TEntity entity)
protected virtual void ApplyAbpConceptsForAddedEntity(TEntity entity)
{
CheckAndSetId(entity);
SetCreationAuditProperties(entity);
await TriggerEntityCreateEvents(entity);
await TriggerDomainEventsAsync(entity);
TriggerEntityCreateEvents(entity);
TriggerDomainEvents(entity);
}
protected virtual async Task ApplyAbpConceptsForDeletedEntityAsync(TEntity entity)
protected virtual void ApplyAbpConceptsForDeletedEntity(TEntity entity)
{
SetDeletionAuditProperties(entity);
await TriggerEntityDeleteEventsAsync(entity);
await TriggerDomainEventsAsync(entity);
TriggerEntityDeleteEvents(entity);
TriggerDomainEvents(entity);
}
public override async Task<TEntity> FindAsync(
@ -199,7 +200,7 @@ namespace Volo.Abp.Domain.Repositories.MemoryDb
bool autoSave = false,
CancellationToken cancellationToken = default)
{
await ApplyAbpConceptsForAddedEntityAsync(entity);
ApplyAbpConceptsForAddedEntity(entity);
(await GetCollectionAsync()).Add(entity);
@ -216,14 +217,14 @@ namespace Volo.Abp.Domain.Repositories.MemoryDb
if (entity is ISoftDelete softDeleteEntity && softDeleteEntity.IsDeleted)
{
SetDeletionAuditProperties(entity);
await TriggerEntityDeleteEventsAsync(entity);
TriggerEntityDeleteEvents(entity);
}
else
{
await TriggerEntityUpdateEventsAsync(entity);
TriggerEntityUpdateEvents(entity);
}
await TriggerDomainEventsAsync(entity);
TriggerDomainEvents(entity);
(await GetCollectionAsync()).Update(entity);
@ -235,7 +236,7 @@ namespace Volo.Abp.Domain.Repositories.MemoryDb
bool autoSave = false,
CancellationToken cancellationToken = default)
{
await ApplyAbpConceptsForDeletedEntityAsync(entity);
ApplyAbpConceptsForDeletedEntity(entity);
if (entity is ISoftDelete softDeleteEntity && !IsHardDeleted(entity))
{

@ -17,6 +17,7 @@ using Volo.Abp.EventBus.Local;
using Volo.Abp.Guids;
using Volo.Abp.MongoDB;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Uow;
namespace Volo.Abp.Domain.Repositories.MongoDB
{
@ -182,14 +183,14 @@ namespace Volo.Abp.Domain.Repositories.MongoDB
if (entity is ISoftDelete softDeleteEntity && softDeleteEntity.IsDeleted)
{
SetDeletionAuditProperties(entity);
await TriggerEntityDeleteEventsAsync(entity);
TriggerEntityDeleteEvents(entity);
}
else
{
await TriggerEntityUpdateEventsAsync(entity);
TriggerEntityUpdateEvents(entity);
}
await TriggerDomainEventsAsync(entity);
TriggerDomainEvents(entity);
var oldConcurrencyStamp = SetNewConcurrencyStamp(entity);
ReplaceOneResult result;
@ -235,14 +236,14 @@ namespace Volo.Abp.Domain.Repositories.MongoDB
if (isSoftDeleteEntity)
{
SetDeletionAuditProperties(entity);
await TriggerEntityDeleteEventsAsync(entity);
TriggerEntityDeleteEvents(entity);
}
else
{
await TriggerEntityUpdateEventsAsync(entity);
TriggerEntityUpdateEvents(entity);
}
await TriggerDomainEventsAsync(entity);
TriggerDomainEvents(entity);
SetNewConcurrencyStamp(entity);
}
@ -295,7 +296,7 @@ namespace Volo.Abp.Domain.Repositories.MongoDB
if (typeof(ISoftDelete).IsAssignableFrom(typeof(TEntity)) && !IsHardDeleted(entity))
{
((ISoftDelete)entity).IsDeleted = true;
await ApplyAbpConceptsForDeletedEntityAsync(entity);
ApplyAbpConceptsForDeletedEntity(entity);
ReplaceOneResult result;
@ -324,7 +325,7 @@ namespace Volo.Abp.Domain.Repositories.MongoDB
}
else
{
await ApplyAbpConceptsForDeletedEntityAsync(entity);
ApplyAbpConceptsForDeletedEntity(entity);
DeleteResult result;
@ -374,7 +375,7 @@ namespace Volo.Abp.Domain.Repositories.MongoDB
hardDeletedEntities.Add(entity);
}
await ApplyAbpConceptsForDeletedEntityAsync(entity);
ApplyAbpConceptsForDeletedEntity(entity);
}
var dbContext = await GetDbContextAsync(cancellationToken);
@ -573,33 +574,33 @@ namespace Volo.Abp.Domain.Repositories.MongoDB
{
CheckAndSetId(entity);
SetCreationAuditProperties(entity);
await TriggerEntityCreateEvents(entity);
await TriggerDomainEventsAsync(entity);
TriggerEntityCreateEvents(entity);
TriggerDomainEvents(entity);
}
private async Task TriggerEntityCreateEvents(TEntity entity)
private void TriggerEntityCreateEvents(TEntity entity)
{
await EntityChangeEventHelper.TriggerEntityCreatingEventAsync(entity);
await EntityChangeEventHelper.TriggerEntityCreatedEventAsync(entity);
EntityChangeEventHelper.PublishEntityCreatingEvent(entity);
EntityChangeEventHelper.PublishEntityCreatedEvent(entity);
}
protected virtual async Task TriggerEntityUpdateEventsAsync(TEntity entity)
protected virtual void TriggerEntityUpdateEvents(TEntity entity)
{
await EntityChangeEventHelper.TriggerEntityUpdatingEventAsync(entity);
await EntityChangeEventHelper.TriggerEntityUpdatedEventAsync(entity);
EntityChangeEventHelper.PublishEntityUpdatingEvent(entity);
EntityChangeEventHelper.PublishEntityUpdatedEvent(entity);
}
protected virtual async Task ApplyAbpConceptsForDeletedEntityAsync(TEntity entity)
protected virtual void ApplyAbpConceptsForDeletedEntity(TEntity entity)
{
SetDeletionAuditProperties(entity);
await TriggerEntityDeleteEventsAsync(entity);
await TriggerDomainEventsAsync(entity);
TriggerEntityDeleteEvents(entity);
TriggerDomainEvents(entity);
}
protected virtual async Task TriggerEntityDeleteEventsAsync(TEntity entity)
protected virtual void TriggerEntityDeleteEvents(TEntity entity)
{
await EntityChangeEventHelper.TriggerEntityDeletingEventAsync(entity);
await EntityChangeEventHelper.TriggerEntityDeletedEventAsync(entity);
EntityChangeEventHelper.PublishEntityDeletingEvent(entity);
EntityChangeEventHelper.PublishEntityDeletedEvent(entity);
}
protected virtual void CheckAndSetId(TEntity entity)
@ -639,7 +640,7 @@ namespace Volo.Abp.Domain.Repositories.MongoDB
AuditPropertySetter.SetDeletionProperties(entity);
}
protected virtual async Task TriggerDomainEventsAsync(object entity)
protected virtual void TriggerDomainEvents(object entity)
{
var generatesDomainEventsEntity = entity as IGeneratesDomainEvents;
if (generatesDomainEventsEntity == null)
@ -652,7 +653,7 @@ namespace Volo.Abp.Domain.Repositories.MongoDB
{
foreach (var localEvent in localEvents)
{
await LocalEventBus.PublishAsync(localEvent.GetType(), localEvent);
UnitOfWorkManager.Current?.AddOrReplaceLocalEvent(new UnitOfWorkEventRecord(localEvent.GetType(), localEvent, EventOrderGenerator.GetNext()));
}
generatesDomainEventsEntity.ClearLocalEvents();
@ -663,7 +664,7 @@ namespace Volo.Abp.Domain.Repositories.MongoDB
{
foreach (var distributedEvent in distributedEvents)
{
await DistributedEventBus.PublishAsync(distributedEvent.GetType(), distributedEvent);
UnitOfWorkManager.Current?.AddOrReplaceDistributedEvent(new UnitOfWorkEventRecord(distributedEvent.GetType(), distributedEvent, EventOrderGenerator.GetNext()));
}
generatesDomainEventsEntity.ClearDistributedEvents();

@ -19,5 +19,18 @@ namespace Volo.Abp.Uow
{
_currentUow.Value = unitOfWork;
}
public IUnitOfWork GetCurrentByChecking()
{
var uow = UnitOfWork;
//Skip reserved unit of work
while (uow != null && (uow.IsReserved || uow.IsDisposed || uow.IsCompleted))
{
uow = uow.Outer;
}
return uow;
}
}
}

@ -0,0 +1,14 @@
using System.Threading;
namespace Volo.Abp.Uow
{
public static class EventOrderGenerator
{
private static long _lastOrder;
public static long GetNext()
{
return Interlocked.Increment(ref _lastOrder);
}
}
}

@ -2,6 +2,6 @@
{
public interface IAmbientUnitOfWork : IUnitOfWorkAccessor
{
IUnitOfWork GetCurrentByChecking();
}
}

@ -139,7 +139,7 @@ namespace Volo.Abp.Uow
{
if (LocalEvents.Any())
{
var localEventsToBePublished = LocalEvents.ToArray();
var localEventsToBePublished = LocalEvents.OrderBy(e => e.EventOrder).ToArray();
LocalEvents.Clear();
await UnitOfWorkEventPublisher.PublishLocalEventsAsync(
localEventsToBePublished
@ -148,7 +148,7 @@ namespace Volo.Abp.Uow
if (DistributedEvents.Any())
{
var distributedEventsToBePublished = DistributedEvents.ToArray();
var distributedEventsToBePublished = DistributedEvents.OrderBy(e => e.EventOrder).ToArray();
DistributedEvents.Clear();
await UnitOfWorkEventPublisher.PublishDistributedEventsAsync(
distributedEventsToBePublished

@ -9,6 +9,8 @@ namespace Volo.Abp.Uow
public Type EventType { get; }
public long EventOrder { get; }
/// <summary>
/// Extra properties can be used if needed.
/// </summary>
@ -16,10 +18,12 @@ namespace Volo.Abp.Uow
public UnitOfWorkEventRecord(
Type eventType,
object eventData)
object eventData,
long eventOrder)
{
EventType = eventType;
EventData = eventData;
EventOrder = eventOrder;
}
}
}

@ -10,7 +10,7 @@ namespace Volo.Abp.Uow
[Obsolete("This will be removed in next versions.")]
public static AsyncLocal<bool> DisableObsoleteDbContextCreationWarning { get; } = new AsyncLocal<bool>();
public IUnitOfWork Current => GetCurrentUnitOfWork();
public IUnitOfWork Current => _ambientUnitOfWork.GetCurrentByChecking();
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IAmbientUnitOfWork _ambientUnitOfWork;
@ -86,19 +86,6 @@ namespace Volo.Abp.Uow
return true;
}
private IUnitOfWork GetCurrentUnitOfWork()
{
var uow = _ambientUnitOfWork.UnitOfWork;
//Skip reserved unit of work
while (uow != null && (uow.IsReserved || uow.IsDisposed || uow.IsCompleted))
{
uow = uow.Outer;
}
return uow;
}
private IUnitOfWork CreateNewUnitOfWork()
{
var scope = _serviceScopeFactory.CreateScope();

@ -30,19 +30,65 @@ namespace Volo.Abp.TestApp.Testing
[Fact]
public virtual async Task Should_Publish_Events_In_Order()
{
bool entityCreatedEventHandled = false;
bool testPersonCreateHandled = false;
bool douglesUpdateHandled = false;
bool douglesNameChangeHandled = false;
bool customEventHandled = false;
bool customEvent2Handled = false;
LocalEventBus.Subscribe<EntityCreatedEventData<Person>>(data =>
{
data.Entity.Name.ShouldBe("TestPerson1");
entityCreatedEventHandled = true;
testPersonCreateHandled = true;
douglesUpdateHandled.ShouldBeFalse();
douglesNameChangeHandled.ShouldBeFalse();
customEventHandled.ShouldBeFalse();
customEvent2Handled.ShouldBeFalse();
return Task.CompletedTask;
});
LocalEventBus.Subscribe<MyCustomEventData>(data =>
{
data.Value.ShouldBe("42");
entityCreatedEventHandled.ShouldBe(true);
customEventHandled = true;
testPersonCreateHandled.ShouldBeTrue();
douglesUpdateHandled.ShouldBeFalse();
douglesNameChangeHandled.ShouldBeFalse();
customEvent2Handled.ShouldBeFalse();
return Task.CompletedTask;
});
LocalEventBus.Subscribe<PersonNameChangedEvent>(data =>
{
data.OldName.ShouldBe("Douglas");
data.Person.Name.ShouldBe("Douglas-Updated");
douglesNameChangeHandled = true;
testPersonCreateHandled.ShouldBeTrue();
customEventHandled.ShouldBeTrue();
douglesUpdateHandled.ShouldBeFalse();
customEvent2Handled.ShouldBeFalse();
return Task.CompletedTask;
});
LocalEventBus.Subscribe<EntityUpdatedEventData<Person>>(data =>
{
data.Entity.Name.ShouldBe("Douglas-Updated");
douglesUpdateHandled = true;
testPersonCreateHandled.ShouldBeTrue();
customEventHandled.ShouldBeTrue();
douglesNameChangeHandled.ShouldBeTrue();
customEvent2Handled.ShouldBeFalse();
return Task.CompletedTask;
});
LocalEventBus.Subscribe<MyCustomEventData2>(data =>
{
data.Value.ShouldBe("44");
customEvent2Handled = true;
testPersonCreateHandled.ShouldBeTrue();
customEventHandled.ShouldBeTrue();
douglesUpdateHandled.ShouldBeTrue();
douglesNameChangeHandled.ShouldBeTrue();
return Task.CompletedTask;
});
@ -53,6 +99,12 @@ namespace Volo.Abp.TestApp.Testing
);
await LocalEventBus.PublishAsync(new MyCustomEventData { Value = "42" });
var douglas = await PersonRepository.GetAsync(TestDataBuilder.UserDouglasId);
douglas.ChangeName("Douglas-Updated");
await PersonRepository.UpdateAsync(douglas);
await LocalEventBus.PublishAsync(new MyCustomEventData2 { Value = "44" });
});
}
@ -125,5 +177,10 @@ namespace Volo.Abp.TestApp.Testing
{
public string Value { get; set; }
}
private class MyCustomEventData2
{
public string Value { get; set; }
}
}
}

@ -88,12 +88,16 @@ namespace Volo.Abp.TestApp.Testing
[Fact]
public async Task Multiple_Update_Should_Result_With_Single_Updated_Event_In_The_Same_Uow()
{
var personId = Guid.NewGuid();
await PersonRepository.InsertAsync(new Person(personId, Guid.NewGuid().ToString("D"), 42));
var createEventCount = 0;
var updateEventCount = 0;
var updatedAge = 0;
DistributedEventBus.Subscribe<EntityCreatedEto<PersonEto>>(eto =>
{
createEventCount++;
return Task.CompletedTask;
});
DistributedEventBus.Subscribe<EntityUpdatedEto<PersonEto>>(eto =>
{
updateEventCount++;
@ -101,6 +105,9 @@ namespace Volo.Abp.TestApp.Testing
return Task.CompletedTask;
});
var personId = Guid.NewGuid();
await PersonRepository.InsertAsync(new Person(personId, Guid.NewGuid().ToString("D"), 42));
using (var uow = GetRequiredService<IUnitOfWorkManager>().Begin())
{
var person = await PersonRepository.GetAsync(personId);
@ -120,6 +127,7 @@ namespace Volo.Abp.TestApp.Testing
await uow.CompleteAsync();
}
createEventCount.ShouldBe(1);
updateEventCount.ShouldBe(1);
updatedAge.ShouldBe(45);
}

Loading…
Cancel
Save