架構成長之路:消息中間件RabbitMQ可靠性投遞與生產實踐
本章重點:
可靠性投遞
1.確保消息發送到RabbitMQ伺服器2.確保消息被正確的路由3.確保消息在隊列正確地存儲4.確保消息從隊列正確地投遞到消費者5.消費者回調
6.補償機制7.消息冪等性8.消息的順序性
可靠性投遞
首先需要明確,效率和可靠性是無法兼得的,如果要保證每一個環節都成功,勢必會對消息的收發效率造成影響,如過是一些業務實時性要求不是特別高的場合,可以犧牲可靠性來換取效率。
- ①代表消息從生產者發送到Exchange
- ②代表消息從Exchange路由到Queue
- ③ 代表消息在Queue中存儲;
- ④ 代表消費者訂閱Queue並消費消息。
1.確保消息發送到RabbitMQ伺服器
可能因為網路或者Broker的問題導致①失敗,而生產者是無法得知消息是否正確發送到Broker的。
有兩種解決方案:
第一種是Transaction事務模式
第二種是Confirm確認模式
1.在通過channel.txSelect方法開啟事務之後,我們便可以發布消息給RabbitMQ了,如果事務提交成功,則消息一定 到達了RabbitMQ中,如果在事務提交執行之前由於RabbitMQ異常崩潰或者其他原因拋出異常,這個時候我們便可以將其捕獲,進而通過執行channel.txRollback方法來實現事務回滾。使用事務機制的話會「吸干」RabbitMQ的性 能,一般不建議使用。
2.生產者通過調用channel.con?rmSelect方法(即Con?rm.Select命令)將信道設置為con?rm模式。一旦消息被投遞到所有匹配的隊列之後,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含消息的唯一ID),這就使得生產者知曉消息已經正確到達了目的地了。
2.確保消息被正確的路由
可能因為路由關鍵字錯誤,或者隊列不存在,或者隊列名稱錯誤導致②失敗。
- 使用mandatory參數和ReturnListener,可以實現消息無法路由的時候返回給生產者。
- 另一種方式就是使用備份交換機(alternate-exchange),無法路由的消息會發送到這個交換機上。
Map<String,Object> arguments = new HashMap<String,Object>();
// 指定交換機的備份交換機
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);
3.確保消息在隊列正確地存儲
可能因為系統宕機、重啟、關閉等等情況導致存儲在隊列的消息丟失,即③出現問題。
解決方案:
1.隊列持久化
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
2.交換機持久化
// String exchange, boolean durable
channel.exchangeDeclare("MY_EXCHANGE","true");
3.消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties
.Builder()
// 2代表持久化,其他代表瞬態
.deliveryMode(2)
.build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
4.確保消息從隊列正確地投遞到消費者
如果消費者收到消息後未來得及處理即發生異常,或者處理過程中發生異常,會導致④失敗。
為了保證消息從隊列可靠性到達消費者,RabbitMQ提供了消息確認機制(message acknowledgement),消費者在訂閱隊列時,可以指定autoAck參數,當autoAck等於false時,RabbitMQ會等待消費者顯示地回復確認消息才從隊列中刪除該消息。如果消息消費失敗,也可以調用Basic.Reject或者BasicNack來拒絕當前消息而不是確認,如果requere參數為true,可以把這條消息重新存入隊列,以便發送給下一個消費者。
5.消費者回調
消費者處理消息之後,可以再發送一條消息給生產者,或者調用生產者地API,告知消息處理完畢。
6.補償機制
對於一定時間沒有響應地消息,可以設置一個定時重發地機制,但是要控制次數,比如最多重複三次,否則會造成消息堆積。
7.消息冪等性
服務端是沒有這種控制的,只能在消費端控制。
如何避免消息的重複消費?
消息重複消費可能會有兩個原因:
- 生產者的問題。環節①重複發送消息,比如在開啟Confirm模式但未收到確認
- 環節④出了問題,由於消費者未發送ACK或者其它原因,消息重複投遞
對於重複發送的消息,可以對每一條消息生成一個唯一的業務id,通過日誌或者建表來做重複控制。
8.消息的順序性
消息的順序性是指消費者消費消息的順序跟生產者投遞消息的順序是一致的。
在RabbitMQ中,一個隊列有多個消費者時,由於不同的消費者消費消息的速度是不一樣的,順序無法保證
讀者分享
覺得不錯的朋友可以點點左下角的拇指小贊一下,同時在這給大家推薦一個微信公眾號,那裡每天都會有技術乾貨、技術動向、職業生涯、行業熱點、職場趣事等一切有關於程序員的內容分享。更有海量Java架構、移動互聯網架構相關源碼視頻,面試資料,電子書籍截止於4月28日免費發放。我看了覺得資源還不錯,如果你們有需要的話,可以點開下面文檔後掃描下方二維碼關注wx公眾號免費獲取↓↓↓
Java面試必備學習資源免費獲取資源大本營↓↓↓
Java架構資料
Java源碼解析,到各種框架學習,再到項目實戰,一應俱全,包括但不限於:Spring、Mybatis等源碼、Java進階、Java架構師、虛擬機、性能優化、並發編程、數據結構和演算法。移動互聯網架構資料
Android進階、Android架構、APP開發、NDK模塊開發、小程序開發、微信開發。推薦閱讀: