RabbitMQ拉模式批量消费消息

实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。

实现推模式推荐的方式是继承DefaultConsumer基类,也可以使用Spring AMQP的SimpleMessageListenerContainer

推模式是最常用的,但是有些情况下推模式并不适用的,比如说:

  • 由于某些限制,消费者在某个条件成立时才能消费消息
  • 需要批量拉取消息进行处理

实现拉模式

RabbitMQ的Channel提供了basicGet方法用于拉取消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}
* @see com.rabbitmq.client.AMQP.Basic.Get
* @see com.rabbitmq.client.AMQP.Basic.GetOk
* @see com.rabbitmq.client.AMQP.Basic.GetEmpty
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @return a {@link GetResponse} containing the retrieved message data
* @throws java.io.IOException if an error is encountered
*/
GetResponse basicGet(String queue, boolean autoAck) throws IOException;

basicGet返回GetResponse类。

1
2
3
4
5
6
7
public class GetResponse {
private final Envelope envelope;
private final BasicProperties props;
private final byte[] body;
private final int messageCount;

// ...

rabbitmq-client版本4.0.3

使用basicGet拉取消息需要注意:

  1. Queue中没有消息时basicGet返回null,此时应用可以适当休眠
  2. 如果需要实现消息的可靠消费,应该传递autoAck为false
  3. GetResponse的envelope属性的deliveryTag属性用于ACK消息时传递给basicAck
  4. DefaultConsumer运行在Connection的线程池中不同,使用拉模式需要自己创建线程池

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void consume(Channel channel) throws IOException, InterruptedException {
while (true) {
if (!isConditionSatisfied()) {
TimeUnit.MILLISECONDS.sleep(1);
continue;
}
GetResponse response = channel.basicGet(CAOSH_TEST_QUEUE, false);
if (response == null) {
TimeUnit.MILLISECONDS.sleep(1);
continue;
}
String data = new String(response.getBody());
logger.info("Get message <= {}", data);
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
}

批量拉取消息

RabbitMQ支持客户端批量拉取消息,客户端可以连续调用basicGet方法拉取多条消息,处理完成之后一次性ACK。需要注意:

  1. 批量拉取循环的退出条件有两个:达到数量上限、basicGet返回null
  2. 使用basicAck批量ACK传递的参数是最后一条消息的deliveryTag,传递其他deliveryTag将导致ACK失败

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
String bridgeQueueName = extractorProperties.getBridgeQueueName();
int batchSize = extractorProperties.getBatchSize();
List<GetResponse> responseList = Lists.newArrayListWithCapacity(batchSize);
long tag = 0;
while (responseList.size() < batchSize) {
GetResponse getResponse = channel.basicGet(bridgeQueueName, false);
if (getResponse == null) {
break;
}
responseList.add(getResponse);
tag = getResponse.getEnvelope().getDeliveryTag();
}
if (responseList.isEmpty()) {
TimeUnit.MILLISECONDS.sleep(1);
} else {
logger.info("Get <{}> responses this batch", responseList.size());
// handle messages
channel.basicAck(tag, true);
}

关于QueueingConsumer

QueueingConsumer在客户端本地使用BlockingQueue缓冲消息,其nextDelivery方法也可以用于实现拉模式(其本质上是BlockingQueue.take),但是QueueingConsumer现在已经标记为Deprecated。

根据官方文档的解释,QueueingConsumer有两个坑:

Firstly, the Consumer could stall the processing of all Channels on the Connection. Secondly, if a Consumer made a recursive synchronous call into its Channel the client would deadlock.

建议不要使用QueueingConsumer

参考资料

Consumer Acknowledgements and Publisher Confirms