1. 本篇概要
其實,還有1種場景需要考慮:當消費者接收到消息後,還沒處理完業務邏輯,消費者掛掉了,那消息也算丟失了?,比如用戶下單,訂單中心發送了1個消息到RabbitMQ里的隊列,積分中心收到這個消息,準備給這個下單的用戶增加20積分,但積分還沒增加成功呢,積分中心自己掛掉了,導致數據出現問題。
那麼如何解決這種問題呢?
為了保證消息被消費者成功的消費,RabbitMQ提供了消息確認機制(message acknowledgement),本文主要講解RabbitMQ中,如何使用消息確認機制來保證消息被消費者成功的消費,避免因為消費者突然宕機而引起的消息丟失。
2. 開啟顯式Ack模式
我們開啟一個消費者的代碼是這樣的:
// 創建隊列消費者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received Message " + message + "");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
這裡的重點是channel.basicConsume(QUEUE_NAME, true, consumer);
方法的第2個參數,讓我們先看下basicConsume()的源碼:
public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
return this.basicConsume(queue, autoAck, "", callback);
}
這裡的autoAck參數指的是是否自動確認,如果設置為ture,RabbitMQ會自動把發送出去的消息置為確認,然後從內存(或者磁碟)中刪除,而不管消費者接收到消息是否處理成功;如果設置為false,RabbitMQ會等待消費者顯式的回復確認信號後才會從內存(或者磁碟)中刪除。
建議將autoAck設置為false,這樣消費者就有足夠的時間處理消息,不用擔心處理消息過程中消費者宕機造成消息丟失。
此時,隊列里的消息就分成了2個部分:
- 等待投遞給消費者的消息(下圖中的Ready部分)
- 已經投遞給消費者,但是還沒有收到消費者確認信號的消息(下圖中的Unacked部分)