RabbitMQ拉模式批量消费消息
/ / 点击: / 阅读耗时 5 分钟实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。
实现推模式推荐的方式是继承DefaultConsumer
基类,也可以使用Spring AMQP的SimpleMessageListenerContainer
。
推模式是最常用的,但是有些情况下推模式并不适用的,比如说:
- 由于某些限制,消费者在某个条件成立时才能消费消息
- 需要批量拉取消息进行处理
实现拉模式
RabbitMQ的Channel提供了basicGet
方法用于拉取消息。
1 | /** |
basicGet
返回GetResponse
类。
1 | public class GetResponse { |
rabbitmq-client版本4.0.3
使用basicGet
拉取消息需要注意:
- Queue中没有消息时
basicGet
返回null,此时应用可以适当休眠 - 如果需要实现消息的可靠消费,应该传递autoAck为false
- GetResponse的envelope属性的deliveryTag属性用于ACK消息时传递给basicAck
- 与
DefaultConsumer
运行在Connection的线程池中不同,使用拉模式需要自己创建线程池
示例代码:
1 | private void consume(Channel channel) throws IOException, InterruptedException { |
批量拉取消息
RabbitMQ支持客户端批量拉取消息,客户端可以连续调用basicGet
方法拉取多条消息,处理完成之后一次性ACK。需要注意:
- 批量拉取循环的退出条件有两个:达到数量上限、
basicGet
返回null - 使用
basicAck
批量ACK传递的参数是最后一条消息的deliveryTag,传递其他deliveryTag将导致ACK失败
示例代码:
1 | String bridgeQueueName = extractorProperties.getBridgeQueueName(); |
关于QueueingConsumer
QueueingConsumer在客户端本地使用BlockingQueue
缓冲消息,其nextDelivery方法也可以用于实现拉模式(其本质上是BlockingQueue.take
),但是QueueingConsumer
现在已经标记为Deprecated。
根据官方文档的解释,QueueingConsumer
有两个坑:
Firstly, the Consumer could stall the processing of all Channels on the Connection. Secondly, if a Consumer made a recursive synchronous call into its Channel the client would deadlock.
建议不要使用QueueingConsumer
。