Enhance RabbitMQ to set PrefetchCount

pull/13402/head
liangshiwei 3 years ago
parent 5aa89c3923
commit f4d544c40b

@ -126,25 +126,31 @@ By default, all the job types use the `Default` RabbitMQ connection.
Configure<AbpRabbitMqBackgroundJobOptions>(options =>
{
options.DefaultQueueNamePrefix = "my_app_jobs.";
options.DefaultDelayedQueueNamePrefix = "my_app_jobs.delayed"
options.PrefetchCount = 1;
options.JobQueues[typeof(EmailSendingArgs)] =
new JobQueueConfiguration(
typeof(EmailSendingArgs),
queueName: "my_app_jobs.emails",
connectionName: "SecondConnection"
connectionName: "SecondConnection",
delayedQueueName:"my_app_jobs.emails.delayed"
);
});
````
* This example sets the default queue name prefix to `my_app_jobs.`. If different applications use the same RabbitMQ server, it would be important to use different prefixes for each application to not consume jobs of each other.
* This example sets the default queue name prefix to `my_app_jobs.` and default delayed queue name prefix to `my_app_jobs.delayed`. If different applications use the same RabbitMQ server, it would be important to use different prefixes for each application to not consume jobs of each other.
* Sets `PrefetchCount` for all queues.
* Also specifies a different connection string for the `EmailSendingArgs`.
`JobQueueConfiguration` class has some additional options in its constructor;
* `queueName`: The queue name that is used for this job. The prefix is not added, so you need to specify the full name of the queue.
* `DelayedQueueName`: The delayed queue name that is used for delayed execution of job. The prefix is not added, so you need to specify the full name of the queue.
* `connectionName`: The RabbitMQ connection name (see the connection configuration above). This is optional and the default value is `Default`.
* `durable` (optional, default: `true`).
* `exclusive` (optional, default: `false`).
* `autoDelete` (optional, default: `false`)
* `autoDelete` (optional, default: `false`).
* `PrefetchCount` (optional, default: null)
See the RabbitMQ documentation if you want to understand the `durable`, `exclusive` and `autoDelete` options better, while most of the times the default configuration is what you want.

@ -141,13 +141,14 @@ Configure<AbpRabbitMqOptions>(options =>
});
````
**Example: Configure the client and exchange names**
**Example: Configure the client, exchange names and prefetchCount**
````csharp
Configure<AbpRabbitMqEventBusOptions>(options =>
{
options.ClientName = "TestApp1";
options.ExchangeName = "TestMessages";
options.PrefetchCount = 1;
});
````

@ -126,25 +126,31 @@ Configure<AbpRabbitMqOptions>(options =>
Configure<AbpRabbitMqBackgroundJobOptions>(options =>
{
options.DefaultQueueNamePrefix = "my_app_jobs.";
options.DefaultDelayedQueueNamePrefix = "my_app_jobs.delayed"
options.PrefetchCount = 1;
options.JobQueues[typeof(EmailSendingArgs)] =
new JobQueueConfiguration(
typeof(EmailSendingArgs),
queueName: "my_app_jobs.emails",
connectionName: "SecondConnection"
connectionName: "SecondConnection",
delayedQueueName:"my_app_jobs.emails.delayed"
);
});
```
- 这个示例将默认的队列名前缀设置为 `my_app_jobs.`,如果多个项目都使用的同一个 RabbitMQ 服务,设置不同的前缀可以避免执行其他项目的后台作业.
- 这个示例将默认的队列名前缀设置为 `my_app_jobs.`并且设置默认的延迟队列名为 `my_app_jobs.delayed`,如果多个项目都使用的同一个 RabbitMQ 服务,设置不同的前缀可以避免执行其他项目的后台作业.
- 设置了预取数量, 用于所有队列.
- 这里还设置了 `EmailSendingArgs` 绑定的 RabbitMQ 连接.
`JobQueueConfiguration` 类的构造函数中,还有一些其他的可选参数.
- `queueName`: 指定后台作业对应的队列名称(全名).
* `DelayedQueueName`: 指定后台延迟执行的作业对于的队列名称(全名).
- `connectionName`: 后台作业对应的 RabbitMQ 连接名称,默认是 `Default`.
- `durable`: 可选参数,默认为 `true`.
- `exclusive`: 可选参数,默认为 `false`.
- `autoDelete`: 可选参数,默认为 `false`.
* `PrefetchCount` (可选参数, 默认为: null)
如果你想要更多地了解 `durable`,`exclusive`,`autoDelete` 的用法,请阅读 RabbitMQ 提供的文档.

@ -141,13 +141,14 @@ Configure<AbpRabbitMqOptions>(options =>
});
````
**示例: 配置客户端和交换机名称**
**示例: 配置客户端,交换机名称和预取数量**
````csharp
Configure<AbpRabbitMqEventBusOptions>(options =>
{
options.ClientName = "TestApp1";
options.ExchangeName = "TestMessages";
options.PrefetchCount = 1;
});
````

@ -19,6 +19,8 @@ public class AbpRabbitMqBackgroundJobOptions
/// Default value: "AbpBackgroundJobsDelayed."
/// </summary>
public string DefaultDelayedQueueNamePrefix { get; set; }
public ushort? PrefetchCount { get; set; }
public AbpRabbitMqBackgroundJobOptions()
{

@ -66,7 +66,8 @@ public class JobQueue<TArgs> : IJobQueue<TArgs>
new JobQueueConfiguration(
typeof(TArgs),
AbpRabbitMqBackgroundJobOptions.DefaultQueueNamePrefix + JobConfiguration.JobName,
AbpRabbitMqBackgroundJobOptions.DefaultDelayedQueueNamePrefix + JobConfiguration.JobName
AbpRabbitMqBackgroundJobOptions.DefaultDelayedQueueNamePrefix + JobConfiguration.JobName,
prefetchCount: AbpRabbitMqBackgroundJobOptions.PrefetchCount
);
}
@ -143,6 +144,11 @@ public class JobQueue<TArgs> : IJobQueue<TArgs>
Consumer = new AsyncEventingBasicConsumer(ChannelAccessor.Channel);
Consumer.Received += MessageReceived;
if (QueueConfiguration.PrefetchCount.HasValue)
{
ChannelAccessor.Channel.BasicQos(0, QueueConfiguration.PrefetchCount.Value, false);
}
//TODO: What BasicConsume returns?
ChannelAccessor.Channel.BasicConsume(
queue: QueueConfiguration.QueueName,

@ -20,12 +20,14 @@ public class JobQueueConfiguration : QueueDeclareConfiguration
string connectionName = null,
bool durable = true,
bool exclusive = false,
bool autoDelete = false)
bool autoDelete = false,
ushort? prefetchCount = null)
: base(
queueName,
durable,
exclusive,
autoDelete)
autoDelete,
prefetchCount)
{
JobArgsType = jobArgsType;
ConnectionName = connectionName;

@ -13,6 +13,8 @@ public class AbpRabbitMqEventBusOptions
public string ExchangeName { get; set; }
public string ExchangeType { get; set; }
public ushort? PrefetchCount { get; set; }
public string GetExchangeTypeOrDefault()
{

@ -78,7 +78,8 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
AbpRabbitMqEventBusOptions.ClientName,
durable: true,
exclusive: false,
autoDelete: false
autoDelete: false,
prefetchCount: AbpRabbitMqEventBusOptions.PrefetchCount
),
AbpRabbitMqEventBusOptions.ConnectionName
);

@ -13,6 +13,8 @@ public class QueueDeclareConfiguration
public bool Exclusive { get; set; }
public bool AutoDelete { get; set; }
public ushort? PrefetchCount { get; set; }
public IDictionary<string, object> Arguments { get; }
@ -20,13 +22,15 @@ public class QueueDeclareConfiguration
[NotNull] string queueName,
bool durable = true,
bool exclusive = false,
bool autoDelete = false)
bool autoDelete = false,
ushort? prefetchCount = null)
{
QueueName = queueName;
Durable = durable;
Exclusive = exclusive;
AutoDelete = autoDelete;
Arguments = new Dictionary<string, object>();
PrefetchCount = prefetchCount;
}
public virtual QueueDeclareOk Declare(IModel channel)

@ -168,6 +168,11 @@ public class RabbitMqMessageConsumer : IRabbitMqMessageConsumer, ITransientDepen
var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.Received += HandleIncomingMessageAsync;
if (Queue.PrefetchCount.HasValue)
{
Channel.BasicQos(0, Queue.PrefetchCount.Value, false);
}
Channel.BasicConsume(
queue: Queue.QueueName,
autoAck: false,

Loading…
Cancel
Save