默认情况下,消费者在项目启动的时候就开始监听队列消费了,在某些特殊情况,可能需要临时关闭消费
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*
* @author 蔡定努
*/
@GetMapping
public boolean aa() {
rabbitTemplate.convertSendAndReceive("name", "123");
return true;
}
/**
* 消息监听
*/
@RabbitListener(id = "con", bindings = @QueueBinding(
value = @Queue(name = "name"),
exchange = @Exchange("ex"),
key = "ex"),
concurrency = "3", ackMode = "AUTO"
)
public void ack(String name con() throws InterruptedException {
log.info("---收到消息-----");
sleep(10000);
}
@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
/**
* 开启/关闭单个消费者
*/
@GetMapping("switchConsumer/{queueId}/{flag}")
public void switchConsumer(@PathVariable("queueId") String queueId, @PathVariable(value = "flag") Boolean flag) {
// Collection<MessageListenerContainer> listenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();
// for (MessageListenerContainer listenerContainer : listenerContainers) {
// listenerContainer.stop();
// }
//
MessageListenerContainer listenerContainer = rabbitListenerEndpointRegistry.getListenerContainer(queueId);
Optional.ofNullable(listenerContainer).ifPresent(messageListenerContainer -> {
if (flag) {
messageListenerContainer.start();
log.info("消费者开启");
} else {
messageListenerContainer.stop();
log.info("消费者关闭");
}
});
}