作者:程序架道;
來源:程序架道
關於MQ你還應該知道這些


我想自己控制開始消費如何做

這個是在監聽器模式下,比如我們新上線服務,並不想一旦服務啓動就開始消費數據,這種情況拿JMQ(https://github.com/tiglabs/jmq 京東開源的一款消息中間件產品)來說,可以按如下實施。

配置文件

1
2
3

4
5

代碼樣例

1//獲取消費者實例
2 MessageConsumer consumer= (MessageConsumer) context.getBean("consumer");
3
4//手動開啓消費
5 consumer.resume();
6
7注:如果需要再次暫停,調用 pause() 方法即可

爲什麼需要消息過濾

通常消息中間件會提供消息過濾功能,比如RocketMQ,京東的JMQ等。消息過濾是通過在消息體之外增加Tag來實現。以京東商城訂單交易爲例,訂單會有新訂單,支付成功,發貨三種狀態,他們統一都發送到Order_Topic

主題上面,然後分別有不同的三個系統來消費,如下圖:

關於MQ你還應該知道這些


以RocketMQ爲例,發送消息,標識出Tag,下面的Tag是TagTest。

1Message msg = new Message("MQ_TOPIC_TEST","TagTest","Hello RocketMQ".getBytes());


消費消息的時候,也同樣要指明Tag比如下面的TagTest。

1consumer.subscribe("MQ_TOPIC_TEST", "TagTest", new MessageListener() {
2 public Action consume(Message message, ConsumeContext context) {
3 System.out.println(message.getMsgID());
4 return Action.CommitMessage;
5 }
6});

那麼,可能會問到爲什麼MQ會提供一個Tag來做消息過濾呢,比如我在消息體中跟進業務的狀態類型,類似下面的消息體{"orderId":"OR100001","orderState":"new","price":"200.00"} 我直接根據orderState來判斷訂單的類型也可以達到過濾的目的。但是這樣我們就要解析消息體之後才能獲取到orderState的值。如果使用中間件提供的Tag標籤則不需要解析消息體就能做過濾區分,從而達到提高消費性能的目的。

另外還會有一個疑問,就是我分成多個Toppic不也是可以實現消息分類,不同業務消費不同的Topic不也是可以的嗎,但是一般情況下不同的Topic之間是沒有必然的聯繫的,必然訂單Topic,商品Topic等。用Tag則可以區分同一個Topic下面相互關聯的消息。必然我們例子中的訂單類消息裏面的新訂單、支付訂單、發貨。是一個全集和子集的關係,流程先後的關係。

過期消息是怎麼處理的

造成過期消息的原因,比如消費者暫停了消費,還有比如一個很久以前創建的主題,後來因爲業務調整,大家陸續不再訂閱,都會造成過期消息發生。對於這類消息如果不加以處理,生產服務端的存儲會越來越大造成不必要的壓力。通常的做法是創建一個任務去遷移走這些數據到另外的存儲中比如其他的HBASE集羣中。判斷過期的規則舉例比如查看一天內沒有連接,並且積壓了5天。滿足這兩個條件就判定爲過期了。如下圖

關於MQ你還應該知道這些


消息重試的注意

MQ中間件大都有可以配置消息重試的策略,比如本地重試默認3次,之後再轉爲服務端重試5次等配置。我們主動觸發消息重試在代碼中直接拋出throw new RuntimeException("Consumer Message exceotion");異常即可。每個MQ會根據自己的機制執行重試策略。我們要注意的是冪等性操作,另外我們還要注意有此類重試說明業務有異常應當及時解決這種消費異常,從而保證消費的性能不然服務端消息積壓,更應該要注意一定不要拿這種重試機製做業務邏輯上的處理。那我們就想了我是不是關掉這種重試策略就沒有這種問題了呢,如果關閉了重試,且業務消費時有異常拋出,那麼會導致消費異常,主要表現爲一直重複消費某條或某些消息,看起來像是消費卡住了。因此一般不建議關閉重試機制,如果業務能夠接受在消費異常時丟棄該消息,可自行catch住Exception不再拋出。

下面代碼以RocketMQ官方文檔爲例,演示重試和不重試的方式。

重試的代碼示例

 1public class MessageListenerImpl implements MessageListener {
2 @Override
3 public Action consume(Message message, ConsumeContext context) {
4 //消息邏輯處理
5 doConsumeMessage(message);
6 //方式1:返回 Action.ReconsumeLater,消息將重試,官方推薦使用該方式
7 return Action.ReconsumeLater;
8 //方式2:返回 null,消息將重試
9 return null;
10 //方式3:直接拋出異常, 消息將重試
11 throw new RuntimeException("Consumer Message exceotion");
12 }
13}

不重試的代碼示例

 1public class MessageListenerImpl implements MessageListener {
2 @Override
3 public Action consume(Message message, ConsumeContext context) {
4 try {
5 doConsumeMessage(message);
6 } catch (Throwable e) {
7 //捕獲消費邏輯中的所有異常,並返回 Action.CommitMessage; 以後這條消息將不再重試
8 return Action.CommitMessage;
9 }
10 //消息處理正常,直接返回 Action.CommitMessage;
11 return Action.CommitMessage;
12 }
13}

以上代碼主要旨在說明重試與不重試情況下的具體執行方法。不重試則代碼自行捕獲異常,那麼此條消息將不再輪迴。

爲什麼需要事務消息

說到事務很容易想到數據庫事務ACID,原子性,一致性,隔離性,持久性;但這裏的事務消息中的事務跟數據庫層面的還不完全相符,事務消息中的事務主要是指一致性。生產者將消息發送到服務端BROKER的過程中會發生兩種可能性的失敗,一種是還沒有到達BROKER就發生了網絡閃斷,BROKER沒有收到消息。還有一種是BORKER內部出現錯誤導致沒有發送成功。事務消息就是要解決生產者客戶端和服務端BROKER數據的一致性。

每一個MQ中間件產品的解決方案不太一致,以京東自研中間件產品JMQ爲例,事務消息在客戶端必須啓用補償機制管理器,即TxFeedbackManager。當本地事務正確提交或回滾、而JMQ未能正確的提交或回滾時,TxFeedbackManager可根據已保存的事務狀態進行補償,從而將JMQ事務置於和本地事務一致的狀態。JMQ事務補償機制會查詢本地事務的狀態,再進行補償,從而達到一致。因此,本地事務狀態的保存就變得極爲重要(如果JMQ查不到某個本地事務的狀態,一定時間後會超時,然後設置爲回滾狀態)。業務方需實現 TxStatusQuerier 接口的方法:queryStatus(String txId, String queryId) 來查詢本地事務的狀態,其中txId爲事務Id,必須得有(事務Id是開啓JMQ事務後服務端的返回值);queryId可根據業務需求選擇。

對於事務狀態的保存,業務可根據場景自由選擇,例如數據庫、文件等。實現TxStatusQuerier後,需要將其添加到TxFeedbackManager裏。查詢器是TxFeedbackManager裏的一個map,key爲topic,value即爲業務實現查詢接口的查詢器。

代碼示例如下:

 1//連接配置
2TransportConfig config = new TransportConfig();
3config.setApp(app);
4//設置broker地址
5config.setAddress(address);
6//設置用戶名
7config.setUser(user);
8//設置密碼
9config.setPassword(password);
10//設置發送超時
11config.setSendTimeout(10000);
12//設置是否使用epoll模式,windows環境下設置爲false,linux環境下設置爲true
13config.setEpoll(true);
14
15//創建集羣連接管理器
16manager = new ClusterTransportManager(config);
17manager.start();
18//創建發送者
19producer = new MessageProducer(manager);
20// 事務補償管理器
21txFeedbackManager = new TxFeedbackManager(manager);
22// LocalTxStatusQuerier 實現了 TxStatusQuerier 接口
23txFeedbackManager.addTxStatusQuerier(topic, new LocalTxStatusQuerier());
24producer.setTxFeedbackManager(txFeedbackManager);
25
26producer.start();

MQ中間件一般默認是不開啓事務消息機制的,這種情況下會影響吞吐性能,因此我們在使用的時候確定當前的業務場景是否允許發送即走,如果確實需要在設置開啓事務消息。

消息爲什麼沒有了順序

我們知道有些消息中間件產品是支持順序消息的,但是默認情況是消息是無序的,我們都期望一條訂單從加入購物車、生成訂單、支付成功、發貨這樣的一個順序的流程,可是如果不強制按照對應的配置消息是沒有順序的,那麼消息爲什麼沒有了順序呢,一個最根本的原因是,我們在大訪問量場景下不可能只使用一臺服務器,一個隊列來存儲消息。比如我們說的訂單消息,購物車和生成訂單完全是可能發送到不同主機不同隊列上面去的,這樣消費者在消費的時候也就沒有了順序性可言。這裏舉例訂單消息可能有些極端,因爲購物車到生成訂單中間一般會間隔一個時間片。但爲了讓大家明白這個消息的順序是可以按照這個邏輯來闡述的。

關於MQ你還應該知道這些


小節

我們掌握了消息中間件的基本使用操作,發送,消費,但是這個過程中如果我們多思考一些問題,比如本文中提到的自己控制開始消費,有了業務標識爲什麼還需要中間件自帶的消息過濾機制,消息過期了又是怎樣處理的等等。在基本使用操作的後面我們可以往縱深去思考這樣的問題,對我們立體的理解MQ帶來幫助。

相關文章