From 13d128945275665af93edc5f2f464b3e82784aaf Mon Sep 17 00:00:00 2001 From: maliming Date: Fri, 21 Apr 2023 15:15:21 +0800 Subject: [PATCH] Refactor `AddToInboxAsync` method. --- .../AbpAspNetCoreMvcDaprEventsController.cs | 3 ++- .../Abp/EventBus/Azure/AzureDistributedEventBus.cs | 5 ++--- .../Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs | 11 +++++------ .../Abp/EventBus/Kafka/KafkaDistributedEventBus.cs | 3 +-- .../EventBus/RabbitMq/RabbitMqDistributedEventBus.cs | 5 ++--- .../Abp/EventBus/Rebus/RebusDistributedEventBus.cs | 2 +- .../EventBus/Distributed/DistributedEventBusBase.cs | 3 +-- 7 files changed, 14 insertions(+), 18 deletions(-) diff --git a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs index d4d4585013..7ee2f23ff0 100644 --- a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs +++ b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs @@ -21,6 +21,7 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController var daprSerializer = HttpContext.RequestServices.GetRequiredService(); var body = (await JsonDocument.ParseAsync(HttpContext.Request.Body)); + var id = body.RootElement.GetProperty("id").GetString(); var pubSubName = body.RootElement.GetProperty("pubsubname").GetString(); var topic = body.RootElement.GetProperty("topic").GetString(); var data = body.RootElement.GetProperty("data").GetRawText(); @@ -32,7 +33,7 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController var distributedEventBus = HttpContext.RequestServices.GetRequiredService(); var eventData = daprSerializer.Deserialize(data, distributedEventBus.GetEventType(topic)); - await distributedEventBus.DaprTriggerHandlersDirectAsync(distributedEventBus.GetEventType(topic), eventData); + await distributedEventBus.TriggerHandlersAsync(distributedEventBus.GetEventType(topic), eventData); return Ok(); } } diff --git a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs index d9093e6336..100d2d9942 100644 --- a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs @@ -84,10 +84,9 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen return; } - var eventBytes = message.Body.ToArray(); - var eventData = _serializer.Deserialize(eventBytes, eventType); + var eventData = _serializer.Deserialize(message.Body.ToArray(), eventType); - if (await AddToInboxAsync(message.MessageId, eventName, eventType, eventBytes, eventData)) + if (await AddToInboxAsync(message.MessageId, eventName, eventType, eventData)) { return; } diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs index 06e11ac44e..28077ad43e 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -169,13 +169,12 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend } } - public virtual async Task DaprTriggerHandlersDirectAsync(Type eventType, object eventData) + public virtual async Task TriggerHandlersAsync(string messageId, Type eventType, object eventData) { - // TODO: Implement inbox - // if (await AddToInboxAsync(message.MessageId, EventNameAttribute.GetNameOrDefault(eventType), eventType, message.Body.ToArray())) - // { - // return; - // } + if (await AddToInboxAsync(messageId, EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData)) + { + return; + } await TriggerHandlersDirectAsync(eventType, eventData); } diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 1ab1422aed..057b081ad8 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -83,10 +83,9 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen } var messageId = message.GetMessageId(); - var eventBytes = message.Value; var eventData = Serializer.Deserialize(message.Value, eventType); - if (await AddToInboxAsync(messageId, eventName, eventType, eventBytes, eventData)) + if (await AddToInboxAsync(messageId, eventName, eventType, eventData)) { return; } diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index b586f44c26..050f730dba 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -101,10 +101,9 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe return; } - var eventBytes = ea.Body.ToArray(); - var eventData = Serializer.Deserialize(eventBytes, eventType); + var eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType); - if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventBytes, eventData)) + if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventData)) { return; } diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index bbb3866f04..befc734aa3 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -145,7 +145,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen var messageId = MessageContext.Current.TransportMessage.GetMessageId(); var eventName = EventNameAttribute.GetNameOrDefault(eventType); - if (await AddToInboxAsync(messageId, eventName, eventType, MessageContext.Current.TransportMessage.Body, eventData)) + if (await AddToInboxAsync(messageId, eventName, eventType, eventData)) { return; } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index 8cca01485d..871fb9459f 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -147,7 +147,6 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB string messageId, string eventName, Type eventType, - byte[] eventBytes, object eventData) { if (AbpDistributedEventBusOptions.Inboxes.Count <= 0) @@ -184,7 +183,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB GuidGenerator.Create(), messageId, eventName, - eventBytes, + Serialize(eventData), Clock.Now ) );