From 453ac6ab9da4ef3b96f84a53def98a2fad3d0321 Mon Sep 17 00:00:00 2001 From: maliming Date: Tue, 1 Dec 2020 14:53:13 +0800 Subject: [PATCH 1/2] Use AsyncEventingBasicConsumer for RabbitMQ's JobQueue. --- .../Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs index 9d372ec52b..b3653b5e81 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs @@ -135,7 +135,7 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ if (AbpBackgroundJobOptions.IsJobExecutionEnabled) { - Consumer = new EventingBasicConsumer(ChannelAccessor.Channel); + var Consumer = new AsyncEventingBasicConsumer(ChannelAccessor.Channel); Consumer.Received += MessageReceived; //TODO: What BasicConsume returns? @@ -173,7 +173,7 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ return properties; } - protected virtual void MessageReceived(object sender, BasicDeliverEventArgs ea) + protected virtual async Task MessageReceived(object sender, BasicDeliverEventArgs ea) { using (var scope = ServiceScopeFactory.CreateScope()) { @@ -185,7 +185,7 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ try { - AsyncHelper.RunSync(() => JobExecuter.ExecuteAsync(context)); + await JobExecuter.ExecuteAsync(context); ChannelAccessor.Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } catch (BackgroundJobExecutionException) From 2f7db9c92d2d700dfbcd168dd64e0e735c1a170a Mon Sep 17 00:00:00 2001 From: maliming Date: Tue, 1 Dec 2020 14:58:17 +0800 Subject: [PATCH 2/2] Remove `var Consumer` --- .../Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs index b3653b5e81..3d7e8c05ff 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs @@ -21,7 +21,7 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ protected BackgroundJobConfiguration JobConfiguration { get; } protected JobQueueConfiguration QueueConfiguration { get; } protected IChannelAccessor ChannelAccessor { get; private set; } - protected EventingBasicConsumer Consumer { get; private set; } + protected AsyncEventingBasicConsumer Consumer { get; private set; } public ILogger> Logger { get; set; } @@ -135,7 +135,7 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ if (AbpBackgroundJobOptions.IsJobExecutionEnabled) { - var Consumer = new AsyncEventingBasicConsumer(ChannelAccessor.Channel); + Consumer = new AsyncEventingBasicConsumer(ChannelAccessor.Channel); Consumer.Received += MessageReceived; //TODO: What BasicConsume returns?