diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs index b2995d1079..73f3ad72a7 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Diagnostics; using System.Linq; using Confluent.Kafka; @@ -15,7 +14,7 @@ namespace Volo.Abp.Kafka { protected AbpKafkaOptions Options { get; } - protected ConcurrentDictionary> Consumers { get; } + protected ConcurrentDictionary>> Consumers { get; } protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10); @@ -27,7 +26,7 @@ namespace Volo.Abp.Kafka { Options = options.Value; - Consumers = new ConcurrentDictionary>(); + Consumers = new ConcurrentDictionary>>(); Logger = new NullLogger(); } @@ -36,7 +35,7 @@ namespace Volo.Abp.Kafka connectionName ??= KafkaConnections.DefaultConnectionName; return Consumers.GetOrAdd( - connectionName, connection => + connectionName, connection => new Lazy>(() => { var config = new ConsumerConfig(Options.Connections.GetOrDefault(connection)) { @@ -45,10 +44,9 @@ namespace Volo.Abp.Kafka }; Options.ConfigureConsumer?.Invoke(config); - return new ConsumerBuilder(config).Build(); - } - ); + }) + ).Value; } public void Dispose() @@ -78,8 +76,8 @@ namespace Volo.Abp.Kafka try { - consumer.Close(); - consumer.Dispose(); + consumer.Value.Close(); + consumer.Value.Dispose(); } catch { diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs index c4b4b9ead0..3381e373da 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs @@ -14,7 +14,7 @@ namespace Volo.Abp.Kafka { protected AbpKafkaOptions Options { get; } - protected ConcurrentDictionary> Producers { get; } + protected ConcurrentDictionary>> Producers { get; } protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10); @@ -26,7 +26,7 @@ namespace Volo.Abp.Kafka { Options = options.Value; - Producers = new ConcurrentDictionary>(); + Producers = new ConcurrentDictionary>>(); Logger = new NullLogger(); } @@ -35,14 +35,14 @@ namespace Volo.Abp.Kafka connectionName ??= KafkaConnections.DefaultConnectionName; return Producers.GetOrAdd( - connectionName, connection => + connectionName, connection => new Lazy>(() => { var config = Options.Connections.GetOrDefault(connection); Options.ConfigureProducer?.Invoke(new ProducerConfig(config)); return new ProducerBuilder(config).Build(); - }); + })).Value; } public void Dispose() @@ -72,7 +72,7 @@ namespace Volo.Abp.Kafka try { - producer.Dispose(); + producer.Value.Dispose(); } catch { diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs index 7ca8346d11..c227d978a1 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs @@ -1,4 +1,5 @@ -using System.Collections.Concurrent; +using System; +using System.Collections.Concurrent; using System.Collections.Generic; using Microsoft.Extensions.Options; using RabbitMQ.Client; @@ -10,14 +11,14 @@ namespace Volo.Abp.RabbitMQ { protected AbpRabbitMqOptions Options { get; } - protected ConcurrentDictionary Connections { get; } + protected ConcurrentDictionary> Connections { get; } private bool _isDisposed; public ConnectionPool(IOptions options) { Options = options.Value; - Connections = new ConcurrentDictionary(); + Connections = new ConcurrentDictionary>(); } public virtual IConnection Get(string connectionName = null) @@ -25,15 +26,15 @@ namespace Volo.Abp.RabbitMQ connectionName ??= RabbitMqConnections.DefaultConnectionName; return Connections.GetOrAdd( - connectionName, - () => + connectionName, () => new Lazy(() => { var connection = Options.Connections.GetOrDefault(connectionName); var hostnames = connection.HostName.TrimEnd(';').Split(';'); // Handle Rabbit MQ Cluster. return hostnames.Length == 1 ? connection.CreateConnection() : connection.CreateConnection(hostnames); - } - ); + + }) + ).Value; } public void Dispose() @@ -49,7 +50,7 @@ namespace Volo.Abp.RabbitMQ { try { - connection.Dispose(); + connection.Value.Dispose(); } catch {