在本文中,我們將向您介紹 Spring Cloud Stream ,這是一個用於構建消息驅動的微服務應用程序的框架,這些應用程序由一個常見的消息傳遞代理(如 "https://link.juejin.im/?target=https%3A%2F%2Fwww.rabbitmq.com%2F">RabbitMQ 、 Apache Kafka 等)連接。

Spring Cloud Stream 構建在現有Spring框架(如 Spring Messaging 和 Spring Integration )之上。儘管這些框架經過了實戰測試,工作得非常好,但是實現與使用的 message broker 緊密耦合。此外,有時對某些用例進行擴展是困難的。

Spring Cloud Stream 背後的想法是一個非常典型的 Spring Boot 概念—— 抽象地講,讓Spring根據配置和依賴關係管理在運行時找出實現自動注入 。這意味著您可以通過更改依賴項和配置文件來更改 message broker 。可以在這裡找到目前已經支持的各種消息代理。

本文將使用 RabbitMQ 作為 message broker 。在此之前,讓我們了解一下 broker (代理)的一些基本概念,以及為什麼要在面向微服務的體系架構中需要它。

2. 微服務中的消息

在微服務體系架構中,我們有許多相互通信以完成請求的小型應用程序—它們的主要優點之一是改進了的可伸縮性。一個請求從多個下游微服務傳遞到完成是很常見的。例如,假設我們有一個 Service-A 內部調用 Service-BService-C 來完成一個請求:

是的,還會有其他組件,比如 Spring Cloud Eureka 、 Spring Cloud Zuul 等等,但我們還是專註關心這類架構的特有問題。

假設由於某種原因 Service-B 需要更多的時間來響應。也許它正在執行 I/O操作 或長時間的 DB事務 ,或者進一步調用其它導致 Service-B 變得更慢的服務,這些都使其無法更具效率。

現在,我們可以啟動更多的 Service-B 實例來解決這個問題,這樣很好,但是 Service-A 實際上是響應很快的,它需要等待 Service-B 的響應來進一步處理。這將導致 Service-A 無法接收更多的請求,這意味著我們還必須啟動 Service-A 的多個實例。

另一種方法解決類似情況的是使用事件驅動的微服務體系架構。這基本上意味著 Service-A 不直接通過 HTTP 調用 Service-BService-C ,而是將請求或事件發布給 message broker (消息代理)。 Service-BService-C 將成為 message broker (消息代理)上此事件的訂閱者。

與依賴HTTP調用的傳統微服務體系架構相比,這有許多優點:

  • 提高可伸縮性和可靠性——現在我們知道哪些服務是整個應用程序中的真正瓶頸。
  • 鼓勵鬆散耦合—— Service-A 不需要了解 Service-BService-C 。它只需要連接到 message broker 並發布事件。事件如何進一步編排取決於代理設置。通過這種方式, Service-A 可以獨立地運行,這是微服務的核心概念之一。
  • 與遺留系統交互——通常我們不能將所有東西都移動到一個新的技術堆棧中。我們仍然必須使用遺留系統,雖然速度很慢,但是很可靠。

3. RabbitMQ

高級消息隊列協議(AMQP) 是 RabbitMQ 用於消息傳遞的協議。雖然 RabbitMQ 支持其他一些協議,但是 AMQP 由於兼容性和它提供的大量特性而更受歡迎。

3.1 RabbitMQ架構設計

因此發布者將消息發布到 RabbitMQ 中稱為 Exchange (交換器)。 Exchange (交換器)接收消息並將其路由到一個或多個 Queues (隊列)。路由演算法依賴於 Exchange (交換器)類型和 routing(路由)key/header(與消息一起傳遞)。將 Exchange (交換器)連接到 Queues (隊列)的這些規則稱為 bindings (綁定)。

綁定可以有4種類型:

  • Direct : 它根據 routing key (路由鍵)將 Exchange (交換器)類型直接路由到特定的 Queues(隊列)。
  • Fanout :它將消息路由到綁定 Exchange (交換器)中的所有 Queues (隊列)。
  • Topic :它根據完全匹配或部分據 routing key (路由鍵)匹配將消息路由到(0、1或更多)的 Queues (隊列)。
  • Headers :它類似於 Topic (主題)交換類型,但是它是基 routing header (路由頭)而不是 routing key (路由鍵)來路由的。

來源:cloudamqp.com/

通過 Exchange (交換器)和 Queues (隊列)發布和消費消息的整個過程是通過一個 Channel (通道)完成的。

有關路由的詳細信息,請訪問此鏈接。

3.2 RabbitMQ 設置

3.2.1 安裝

我們可以從這裡下載並安裝基於我們的操作系統的二進位文件。

然而,在本文中,我們將使用 cloudamqp.com 提供的免費雲安裝。只需註冊服務並登錄即可。

在主儀錶板上單擊 創建新實例 :

然後給你的實例起個名字,然後進入下一步:

然後選擇一個可用區:

最後,查看實例信息,點擊右下角的 創建實例 :

就是這樣。現在在雲中運行了一個 RabbitMQ 實例。有關實例的更多信息,請轉到您的儀錶板並單擊 新創建的實例 :

我們可以看到我們可以訪問RabbitMQ實例的主機,比如從我們的項目連接所需的用戶名和密碼:

我們將在Spring應用程序中使用 AMQP URL 連接到這個實例,所以請在某個地方記下它。

您還可以通過單擊左上角的 RabbitMQ manager 來查看管理器控制台。這將採用它來管理的您的 RabbitMQ 實例。

Project 配置

現在我們的設置已經準備好了,讓我們創建我們的服務:

RabbitMQ

使用 Spring Initializr 創建一個腳手架項目。這將是我們的 producer 項目,我們將使用 REST 端點發布消息。

選擇您喜歡的 Spring Boot 版本,添加 WebCloud Stream 依賴項,生成 Maven 項目:

注意:

請注意 cloud-stream 依賴項。這也需要像 RabbitMQKafka 等綁定器依賴項才能工作。

由於我們將使用 RabbitMQ ,添加以下 Maven 依賴項:

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
複製代碼

或者,我們也可以將兩者結合起來使用 spring-cloud-starter-stream-rabbit :

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
複製代碼

使用同樣的方法,創建消費者項目,但僅使用 spring-cloud-starter-stream-rabbit 依賴項。

4. 創建生產者

如前所述,將消息從發布者傳遞到隊列的整個過程是通過通道完成的。因此,讓我們創建一個 HelloBinding 介面,其中包含我們的消息機制 greetingChannel :

interface HelloBinding {

@Output("greetingChannel")
MessageChannel greeting();
}
複製代碼

因為這將發布消息,所以我們使用 @Output 註解。方法名可以是我們想要的任意名稱,當然,我們可以在一個介面中有多個 Channel (通道)。

現在,讓我們創建一個 REST ,它將消息推送到這個 Channel (通道)

@RestController
public class ProducerController {

private MessageChannel greet;

public ProducerController(HelloBinding binding) {
greet = binding.greeting();
}

@GetMapping("/greet/{name}")
public void publish(@PathVariable String name) {
String greeting = "Hello, " + name + "!";
Message<String> msg = MessageBuilder.withPayload(greeting)
.build();
this.greet.send(msg);
}
}
複製代碼

上面,我們創建了一個 ProducerController 類,它有一個 MessageChannel 類型的屬性 greet。這是通過我們前面聲明的方法在構造函數中初始化的。

注意: 我們可以用簡潔的方式做同樣的事情,但是我們使用不同的名稱來讓您更清楚地了解事物是如何連接的。

然後,我們有一個簡單的 REST 介面,它接收 PathVariablename ,並使用 MessageBuilder 創建一個 String 類型的消息。最後,我們使用 MessageChannel 上的 .send() 方法來發布消息。

現在,我們將在的主類中添加 @EnableBinding 註解,傳入 HelloBinding 告訴 Spring 載入。

@EnableBinding(HelloBinding.class)
@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
複製代碼

最後,我們必須告訴 Spring 如何連接到 RabbitMQ (通過前面的 AMQP URL ),並將 greetingChannel 連接到一可用的消費者。

這兩個都是在 application.properties 中定義的:

spring.rabbitmq.addresses=<amqp url>

spring.cloud.stream.bindings.greetingChannel.destination = greetings

server.port=8080

5. 創建消費者

現在,我們需要監聽之前創建的通道 greetingChannel 。讓我們為它創建一個綁定:

public interface HelloBinding {

String GREETING = "greetingChannel";

@Input(GREETING)
SubscribableChannel greeting();
}

與生產者綁定的兩個非常明顯區別。因為我們正在消費消息,所以我們使用 SubscribableChannel@Input 註解連接到 greetingChannel ,消息數據將被推送這裡。

現在,讓我們創建處理數據的方法:

@EnableBinding(HelloBinding.class)
public class HelloListener {

@StreamListener(target = HelloBinding.GREETING)
public void processHelloChannelGreeting(String msg) {
System.out.println(msg);
}
}

在這裡,我們創建了一個 HelloListener 類,在 processHelloChannelGreeting 方法上添加 @StreamListener 註解。這個方法需要一個字元串作為參數,我們剛剛在控制台列印了這個參數。我們還在類添加 @EnableBinding 啟用了 HelloBinding

同樣,我們在這裡使用 @EnableBinding ,而不是主類,以便告訴我們如何使用。

看看我們的主類,我們沒有任何修改:

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
複製代碼

application.properties 配置文件中,我們需要定義與生產者一樣的屬性,除了修改埠之外

spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination=greetings
server.port=9090
複製代碼

6. 全部測試

讓我們同時啟動生產者和消費者服務。首先,讓我們通過點擊端點 http://localhost:8080/greet/john 來生產消息。

在消費者日誌中看到消息內容:

我們使用以下命令啟動另一個消費者服務實例(在另一個埠(9091)上):

$ mvn spring-boot:run -Dserver.port=9091

現在,當我們點擊生產者 REST 端點生產消息時,我們看到兩個消費者都收到了消息:

這可能是我們在一些用例中想要的。但是,如果我們只想讓一個消費者消費一條消息呢?為此,我們需要在 application.properties 中創建一個消費者組。消費者的配置文件:

spring.cloud.stream.bindings.greetingChannel.group = greetings-group

現在,再次在不同的埠上運行消費者的2個實例,並通過生產者生產消息再次查看:

這一切也可以在 RabbitMQ 管理器控制台看到:

7. 結論

在本文中,我們解釋了消息傳遞的主要概念、它在微服務中的角色以及如何使用 Spring Cloud Stream 實現它。我們使用 RabbitMQ 作為消息代理,但是我們也可以使用其他流行的代理,比 如Kafka ,只需更改配置和依賴項。

與往常一樣,本文使用的示例代碼可以在GitHub獲得完整的 源代碼

原文: stackabuse.com/spring-clou…

作者:Dhananjay Singh

推薦閱讀:

相关文章