Merge pull request #8198 from abpframework/liangshiwei/lazy

Use lazy in connection pooling
pull/8199/head
maliming 5 years ago committed by GitHub
commit 743633b09a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,6 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Confluent.Kafka;
@ -15,7 +14,7 @@ namespace Volo.Abp.Kafka
{
protected AbpKafkaOptions Options { get; }
protected ConcurrentDictionary<string, IConsumer<string, byte[]>> Consumers { get; }
protected ConcurrentDictionary<string, Lazy<IConsumer<string, byte[]>>> Consumers { get; }
protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10);
@ -27,7 +26,7 @@ namespace Volo.Abp.Kafka
{
Options = options.Value;
Consumers = new ConcurrentDictionary<string, IConsumer<string, byte[]>>();
Consumers = new ConcurrentDictionary<string, Lazy<IConsumer<string, byte[]>>>();
Logger = new NullLogger<ConsumerPool>();
}
@ -36,7 +35,7 @@ namespace Volo.Abp.Kafka
connectionName ??= KafkaConnections.DefaultConnectionName;
return Consumers.GetOrAdd(
connectionName, connection =>
connectionName, connection => new Lazy<IConsumer<string, byte[]>>(() =>
{
var config = new ConsumerConfig(Options.Connections.GetOrDefault(connection))
{
@ -45,10 +44,9 @@ namespace Volo.Abp.Kafka
};
Options.ConfigureConsumer?.Invoke(config);
return new ConsumerBuilder<string, byte[]>(config).Build();
}
);
})
).Value;
}
public void Dispose()
@ -78,8 +76,8 @@ namespace Volo.Abp.Kafka
try
{
consumer.Close();
consumer.Dispose();
consumer.Value.Close();
consumer.Value.Dispose();
}
catch
{

@ -14,7 +14,7 @@ namespace Volo.Abp.Kafka
{
protected AbpKafkaOptions Options { get; }
protected ConcurrentDictionary<string, IProducer<string, byte[]>> Producers { get; }
protected ConcurrentDictionary<string, Lazy<IProducer<string, byte[]>>> Producers { get; }
protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10);
@ -26,7 +26,7 @@ namespace Volo.Abp.Kafka
{
Options = options.Value;
Producers = new ConcurrentDictionary<string, IProducer<string, byte[]>>();
Producers = new ConcurrentDictionary<string, Lazy<IProducer<string, byte[]>>>();
Logger = new NullLogger<ProducerPool>();
}
@ -35,14 +35,14 @@ namespace Volo.Abp.Kafka
connectionName ??= KafkaConnections.DefaultConnectionName;
return Producers.GetOrAdd(
connectionName, connection =>
connectionName, connection => new Lazy<IProducer<string, byte[]>>(() =>
{
var config = Options.Connections.GetOrDefault(connection);
Options.ConfigureProducer?.Invoke(new ProducerConfig(config));
return new ProducerBuilder<string, byte[]>(config).Build();
});
})).Value;
}
public void Dispose()
@ -72,7 +72,7 @@ namespace Volo.Abp.Kafka
try
{
producer.Dispose();
producer.Value.Dispose();
}
catch
{

@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
@ -10,14 +11,14 @@ namespace Volo.Abp.RabbitMQ
{
protected AbpRabbitMqOptions Options { get; }
protected ConcurrentDictionary<string, IConnection> Connections { get; }
protected ConcurrentDictionary<string, Lazy<IConnection>> Connections { get; }
private bool _isDisposed;
public ConnectionPool(IOptions<AbpRabbitMqOptions> options)
{
Options = options.Value;
Connections = new ConcurrentDictionary<string, IConnection>();
Connections = new ConcurrentDictionary<string, Lazy<IConnection>>();
}
public virtual IConnection Get(string connectionName = null)
@ -25,15 +26,15 @@ namespace Volo.Abp.RabbitMQ
connectionName ??= RabbitMqConnections.DefaultConnectionName;
return Connections.GetOrAdd(
connectionName,
() =>
connectionName, () => new Lazy<IConnection>(() =>
{
var connection = Options.Connections.GetOrDefault(connectionName);
var hostnames = connection.HostName.TrimEnd(';').Split(';');
// Handle Rabbit MQ Cluster.
return hostnames.Length == 1 ? connection.CreateConnection() : connection.CreateConnection(hostnames);
}
);
})
).Value;
}
public void Dispose()
@ -49,7 +50,7 @@ namespace Volo.Abp.RabbitMQ
{
try
{
connection.Dispose();
connection.Value.Dispose();
}
catch
{

Loading…
Cancel
Save