作者:ITDragon龍
來源:http://www.cnblogs.com/itdragon/p/8535187.html


衆所周知,消息中間件是大型分佈式系統中不可或缺的重要組件。它使用簡單,卻解決了不少難題,比如異步處理,系統藕合,流量削鋒,分佈式事務管理等。實現了一個高性能,高可用,高擴展的系統。

本章通過介紹消息中間件的應用場景,消息中間件的傳輸模式,ActiveMQ快速入門 三個方面來對消息中間件進行入門介紹。還在等什麼,趕快來學習吧!

  • 說明:消息中間件非常強大,值得我們認真去學習和使用。完整代碼請異步github。
  • 技術:消息中間件的應用場景,通信模式,ActiveMQ。
  • 源碼:
  • https://github.com/ITDragonBlog/daydayup/tree/master/MQ

文章目錄結構:

消息中間件企業級應用


消息中間件應用場景

異步處理

異步處理:調用者發起請求後,調用者不會立刻得到結果,也無需等待結果,繼續執行其他業務邏輯。提高了效率但存在異步請求失敗的隱患,適用於非核心業務邏輯處理。

同步處理:調用者發起請求後,調用者必須等待直到返回結果,再根據返回的結果執行其他業務邏輯。效率雖然沒有異步處理高,但能保證業務邏輯可控性,適用於核心業務邏輯處理。

舉一個比較常見的應用場景:爲了確保註冊用戶的真實性,一般在註冊成功後會發送驗證郵件或者驗證碼短信,只有認證成功的用戶才能正常使用平臺功能。

如下圖所示:同步處理和異步處理的比較。

消息中間件企業級應用

用消息中間件實現異步處理的好處:

  • 在傳統的系統架構,用戶從註冊到跳轉成功頁面,中間需要等待郵件發送的業務邏輯耗時。這不僅影響系統響應時間,降低了CPU吞吐量,同時還影響了用戶的體驗。
  • 通過消息中間件將郵件發送的業務邏輯異步處理,用戶註冊成功後發送數據到消息中間件,再跳轉成功頁面,郵件發送的邏輯再由訂閱該消息中間件的其他系統負責處理,
  • 消息中間件的讀寫速度非常的快,其中的耗時可以忽略不計。通過消息中間件可以處理更多的請求。

小結:正確使用消息中間件將非核心業務邏輯功能異步處理,可以提高系統的響應效率,提高了CPU的吞吐量,改善用戶的體驗。

系統藕合和事務的最終一致性

分佈式系統是若干個獨立的計算機(系統)集合。每個計算機負責自己的模塊,實現系統的解耦,也避免單點故障對整個系統的影響。每個系統還可以做一個集羣,進一步降低故障的發生概率。

在這樣的分佈式系統中,消息中間件又扮演着什麼樣的角色呢?

舉一個比較常見的應用場景:訂單系統下單成功後,需要調用倉庫系統接口,選擇最優的發貨倉庫和更新商品庫存。若因爲某種原因在調用倉庫系統接口失敗,會直接影響到下單流程。

如下圖所示:感受一下消息中間件扮演的重要角色。

消息中間件企業級應用

用消息中間件實現系統藕合的好處:

  • 消息中間件可以讓各系統之間耦合性降低,不會因爲其他系統的異常影響到自身業務邏輯。各盡其職,訂單系統只需負責將訂單數據持久化到數據庫中,倉庫系統只需負責更新庫存,不會因爲倉庫系統的原因從而影響到下單的流程。
  • 各位看官是否發現了一個問題,下單和庫存減少本應該是一個事務。因爲分佈式的原因很難保證事務的強一致性。這裏通過消息中間件實現事務的最終一致性效果(後續會詳細介紹)。

小結:事務的一致性固然重要,沒有庫存會導致下單失敗是一個理論上很正常的邏輯。但實際業務中並非如此,我們完全可以利用發貨期通過採購或者借庫的方式來增加庫存。這樣無疑可以增加銷量,還是可以保證事務的最終一致性。

流量削鋒

流量削鋒也稱限流。在秒殺,搶購的活動中,爲了不影響整個系統的正常使用,一般會通過消息中間件做限流,避免流量突增壓垮系統,前端頁面可以提示"排隊等待",即便用戶體驗很差,也不能讓系統垮掉。

消息中間件企業級應用

小結:限流可以在流量突增的情況下保障系統的穩定。系統宕機會被同行抓住笑柄。


消息中間件的傳輸模式

消息中間件除了支持對點對和發佈訂閱兩種模式外,在實際開發中還有一種雙向應答模式被廣泛使用。

點對點(p2p)模式

點對點(p2p)模式有三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。發送者將消息發送到一個特定的隊列中,等待接收者從隊列中獲取消息消耗。

P2P的三個特點:

  • 每個消息只能被一個接收者消費,且消息被消費後默認從隊列中刪掉(也可以通過其他簽收機制重複消費)。
  • 發送者和接收者之間沒有依賴性,生產者發送消息和消費者接收消息並不要求同時運行。
  • 接收者在成功接收消息之後需向隊列發送接收成功的確認消息。
消息中間件企業級應用

發佈訂閱(Pub/Sub)模式

發佈訂閱(Pub/Sub)模式也有三個角色:主題(Topic),發佈者(Publisher),訂閱者(Subscriber)。發佈者將消息發送到主題隊列中,系統再將這些消息傳遞給訂閱者。

Pub/Sub的特點:

  • 每個消息可以被多個訂閱者消費。
  • 發佈者和訂閱者之間存在依賴性。訂閱者必須先訂閱主題後才能接收到信息,在訂閱前發佈的消息,訂閱者是接收不到的。
  • 非持久化訂閱:如果訂閱者不在線,此時發佈的消息訂閱者是也接收不到,即便訂閱者重新上線也接收不到。
  • 持久化訂閱:訂閱者訂閱主題後,即便訂閱者不在線,此時發佈的消息可以在訂閱者重新上線後接收到的。
消息中間件企業級應用

雙向應答模式

雙向應答模式並不是消息中間件提供的一種通信模式,它是由於實際生成環境的需要,在原有的基礎上做了改良。即消息的發送者也是消息的接收者。消息的接收者也是消息的發送者。如下圖所示

消息中間件企業級應用


ActiveMQ 入門

ActiveMQ是Apache出品,簡單好用,能力強大,可以處理大部分的業務的開源消息總線。同時也支持JMS規範。

JMS(JAVA Message Service,java消息服務)API是一個消息服務的標準或者說是規範,允許應用程序組件基於JavaEE平臺創建、發送、接收和讀取消息。它使分佈式通信耦合度更低,消息服務更加可靠以及異步性。

ActiveMQ 安裝

ActiveMQ 的安裝很簡單,三個步驟:

第一步:下載,官網下載地址:http://activemq.apache.org/download.html。

第二步:運行,壓縮包解壓後,在bin目錄下根據電腦系統位數找到對應的wrapper.exe程序,雙擊運行。

第三步:訪問,瀏覽器訪問http://localhost:8161/admin,賬號密碼都是admin。

ActiveMQ 工作流程

我們通過簡單的P2P模式來瞭解ActiveMQ的工作流程。

生產者發送消息給MQ主要步驟:

第一步:創建連接工廠實例

第二步:創建連接並啓動

第三步:獲取操作消息的接口

第四步:創建隊列,即Queue或者Topic

第五步:創建消息發送者

第六步:發送消息,關閉資源

import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息隊列生產者
* @author itdragon
*/
public class ITDragonProducer {
private static final String QUEUE_NAME = "ITDragon.Queue";
public static void main(String[] args) {
// ConnectionFactory: 連接工廠,JMS 用它創建連接
ConnectionFactory connectionFactory = null;
// Connection: 客戶端和JMS系統之間建立的鏈接
Connection connection = null;
// Session: 一個發送或接收消息的線程 ,操作消息的接口
Session session = null;
// Destination: 消息的目的地,消息發送給誰
Destination destination = null;
// MessageProducer: 消息生產者
MessageProducer producer = null;
try {
// step1 構造ConnectionFactory實例對象,需要填入 用戶名, 密碼 以及要連接的地址,默認端口爲"tcp://localhost:61616"
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
// step2 連接工廠創建連接對象
connection = connectionFactory.createConnection();
// step3 啓動
connection.start();
// step4 獲取操作連接
/**
* 第一個參數:是否設置事務 true or false。 如果設置了true,第二個參數忽略,並且需要commit()才執行
* 第二個參數:acknowledge模式
* AUTO_ACKNOWLEDGE:自動確認,客戶端發送和接收消息不需要做額外的工作。不管消息是否被正常處理。 默認
* CLIENT_ACKNOWLEDGE:客戶端確認。客戶端接收到消息後,必須手動調用acknowledge方法,jms服務器纔會刪除消息。
* DUPS_OK_ACKNOWLEDGE:允許重複的確認模式。
*/
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// step5 創建一個隊列到目的地
destination = session.createQueue(QUEUE_NAME);
// step6 在目的地創建一個生產者
producer = session.createProducer(destination);
// step7 生產者設置不持久化,若要設置持久化則使用 PERSISTENT
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// step8 生產者發送信息,具體的業務邏輯
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for(int i = 0; i < 5; i++) {
String []operators = {"+","-","*","/"};
Random random = new Random(System.currentTimeMillis());
String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
TextMessage message = session.createTextMessage(expression);
// 發送消息到目的地方
producer.send(message);
System.out.println("Queue Sender ---------> " + expression);
}
}
}

消費者從MQ中獲取數據消費步驟和上面類似,只是將發送消息改成了接收消息。

import javax.jms.Connection; 
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.itdragon.utils.ITDragonUtil;
/**
* 消息隊列消費者
* @author itdragon
*/
public class ITDragonConsumer {
private static final String QUEUE_NAME = "ITDragon.Queue"; // 要和Sender一致
public static void main(String[] args) {
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
// MessageConsumer: 信息消費者
MessageConsumer consumer = null;
try {
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(QUEUE_NAME);
consumer = session.createConsumer(destination);
// 不斷地接收信息,直到沒有爲止
while (true) {
TextMessage message = (TextMessage) consumer.receive();
if (null != message) {
System.out.print(ITDragonUtil.cal(message.getText()));
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

SpringBoot 整合ActiveMQ使用

SpringBoot可以幫助我們快速搭建項目,減少Spring整合第三方配置的精力。SpringBoot整合ActiveMQ也是非常簡單。

只需要簡單的兩個步驟:

第一步,在pom.xml文件中添加依賴使其支持ActiveMQ

第二步,在application.properties文件中配置連接ActiveMQ參數

pom.xml是Maven項目的核心配置文件

 
org.springframework.boot
spring-boot-starter-activemq


org.apache.activemq
activemq-pool

application.properties是SpringBoot項目的核心參數配置文件

spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=true
spring.activemq.pool.enabled=true

spring.activemq.in-memory 默認值爲true,表示無需安裝ActiveMQ的服務器,直接使用內存。

spring.activemq.pool.enabled 表示通過連接池的方式連接。

springboot-activemq-producer

springboot-activemq-producer 項目模擬生產者所在的系統,支持同時發送點對點模式和發佈訂閱模式。

兩個核心文件:一個是消息發送類,一個是隊列Bean管理配置類。

三種主要模式:一個是對點對模式,隊列名爲"queue.name";一個是發佈訂閱模式,主題名爲"topic.name";最後一個是雙向應答模式,隊列名爲"response.name" 。

import java.util.Random;
import javax.jms.Queue;
import javax.jms.Topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
* 消息隊列生產者
* @author itdragon
*/
@Service
@EnableScheduling
public class ITDragonProducer {
@Autowired
private JmsMessagingTemplate jmsTemplate;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
@Autowired
private Queue responseQueue;
/**
* 點對點(p2p)模式測試
* 包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。
* 發送者將消息發送到一個特定的隊列,隊列保留着消息,直到接收者從隊列中獲取消息。
*/
@Scheduled(fixedDelay = 5000)
public void testP2PMQ(){
for(int i = 0; i < 5; i++) {
String []operators = {"+","-","*","/"};
Random random = new Random(System.currentTimeMillis());
String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
jmsTemplate.convertAndSend(this.queue, expression);
System.out.println("Queue Sender ---------> " + expression);
}
}
/**
* 訂閱/發佈(Pub/Sub)模擬測試
* 包含三個角色:主題(Topic),發佈者(Publisher),訂閱者(Subscriber) 。
* 多個發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
*/
@Scheduled(fixedDelay = 5000)
public void testPubSubMQ() {
for (int i = 0; i < 5; i++) {
String []operators = {"+","-","*","/"};
Random random = new Random(System.currentTimeMillis());
String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
jmsTemplate.convertAndSend(this.topic, expression);
System.out.println("Topic Sender ---------> " + expression);
}
}
/**
* 雙向應答模式測試
* P2P和Pub/Sub是MQ默認提供的兩種模式,而雙向應答模式則是在原有的基礎上做了改進。發送者既是接收者,接收者也是發送者。
*/
@Scheduled(fixedDelay = 5000)
public void testReceiveResponseMQ(){
for (int i = 0; i < 5; i++) {
String []operators = {"+","-","*","/"};
Random random = new Random(System.currentTimeMillis());
String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
jmsTemplate.convertAndSend(this.responseQueue, expression);
}
}
// 接收P2P模式,消費者返回的數據
@JmsListener(destination = "out.queue")
public void receiveResponse(String message) {
System.out.println("Producer Response Receiver ---------> " + message);
}
}


import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* bean配置管理類
* @author itdragon
*/
@Configuration
public class ActiveMQBeansConfig {
@Bean // 定義一個名字爲queue.name的點對點列隊
public Queue queue() {
return new ActiveMQQueue("queue.name");
}
@Bean // 定義一個名字爲topic.name的主題隊列
public Topic topic() {
return new ActiveMQTopic("topic.name");
}
@Bean // 定義一個名字爲response.name的雙向應答隊列
public Queue responseQueue() {
return new ActiveMQQueue("response.name");
}
}

springboot-activemq-consumer

springboot-activemq-consumer 模擬消費者所在的服務器,主要負責監聽隊列消息。

兩個核心文件:一個是消息接收類,一個是兼容點對點模式和發佈訂閱模式的鏈接工廠配置類。

import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;
import com.itdragon.utils.ITDragonUtil;
/**
* 消息隊列消費者
* @author itdragon
*/
@Service
public class ITDragonConsumer {
// 1. 監聽名字爲"queue.name"的點對點隊列
@JmsListener(destination = "queue.name", containerFactory="queueListenerFactory")
public void receiveQueue(String text) {
System.out.println("Queue Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text));
}
// 2. 監聽名字爲"topic.name"的發佈訂閱隊列
@JmsListener(destination = "topic.name", containerFactory="topicListenerFactory")
public void receiveTopicOne(String text) {
System.out.println(Thread.currentThread().getName() + " No.1 Topic Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text));
}
// 2. 監聽名字爲"topic.name"的發佈訂閱隊列
@JmsListener(destination = "topic.name", containerFactory="topicListenerFactory")
public void receiveTopicTwo(String text) {
System.out.println(Thread.currentThread().getName() +" No.2 Topic Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text));
}
// 3. 監聽名字爲"response.name"的接收應答(雙向)隊列
@JmsListener(destination = "response.name")
@SendTo("out.queue")
public String receiveResponse(String text) {
System.out.println("Response Receiver : " + text + " \t 正在返回數據......");
return ITDragonUtil.cal(text).toString();
}
}


import java.util.concurrent.Executors;
import javax.jms.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
@Configuration
@EnableJms
public class JmsConfig {
@Bean // 開啓pub/Sub模式
public JmsListenerContainerFactory> topicListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean // JMS默認開啓P2P模式
public JmsListenerContainerFactory> queueListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(false);
factory.setConnectionFactory(connectionFactory);
return factory;
}
}

總結

  • 消息中間件可以解決異步處理,系統解耦,流量削鋒,分佈式系統事務管理等問題。
  • 消息中間件默認支持點對點模式和發佈訂閱模式,實際工作中還可以使用雙向應當模式。
  • ActiveMQ是Apache出品,簡單好用,功能強大,可以處理大部分的業務的開源消息總線。

擴展閱讀

消息中間件的那些坑!

寫給程序員的中間件入門課

爲什麼一定要用消息中間件?

大企業HR透露:這樣的應聘者更易被青睞!

Java多線程的應用場景和應用目的舉例

消息隊列屬性及常見消息隊列介紹

相關文章