在本文中,我們將向您介紹 Spring Cloud Stream ,這是一個用於構建消息驅動的微服務應用程序的框架,這些應用程序由一個常見的消息傳遞代理(如 "https://link.juejin.im/?target=https%3A%2F%2Fwww.rabbitmq.com%2F">RabbitMQ 、 Apache Kafka 等)連接。
Spring Cloud Stream ,這是一個用於構建消息驅動的微服務應用程序的框架,這些應用程序由一個常見的消息傳遞代理(如 "https://link.juejin.im/?target=https%3A%2F%2Fwww.rabbitmq.com%2F">RabbitMQ 、 Apache Kafka 等)連接。
Apache Kafka 等)連接。
Spring Cloud Stream 構建在現有Spring框架(如 Spring Messaging 和 Spring Integration )之上。儘管這些框架經過了實戰測試,工作得非常好,但是實現與使用的 message broker 緊密耦合。此外,有時對某些用例進行擴展是困難的。
Spring Cloud Stream
Spring Messaging 和 Spring Integration )之上。儘管這些框架經過了實戰測試,工作得非常好,但是實現與使用的 message broker 緊密耦合。此外,有時對某些用例進行擴展是困難的。
Spring Integration )之上。儘管這些框架經過了實戰測試,工作得非常好,但是實現與使用的 message broker 緊密耦合。此外,有時對某些用例進行擴展是困難的。
message broker
Spring Cloud Stream 背後的想法是一個非常典型的 Spring Boot 概念—— 抽象地講,讓Spring根據配置和依賴關係管理在運行時找出實現自動注入 。這意味著您可以通過更改依賴項和配置文件來更改 message broker 。可以在這裡找到目前已經支持的各種消息代理。
Spring Boot
抽象地講,讓Spring根據配置和依賴關係管理在運行時找出實現自動注入
本文將使用 RabbitMQ 作為 message broker 。在此之前,讓我們了解一下 broker (代理)的一些基本概念,以及為什麼要在面向微服務的體系架構中需要它。
RabbitMQ
broker
在微服務體系架構中,我們有許多相互通信以完成請求的小型應用程序—它們的主要優點之一是改進了的可伸縮性。一個請求從多個下游微服務傳遞到完成是很常見的。例如,假設我們有一個 Service-A 內部調用 Service-B 和 Service-C 來完成一個請求:
Service-A
Service-B
Service-C
是的,還會有其他組件,比如 Spring Cloud Eureka 、 Spring Cloud Zuul 等等,但我們還是專註關心這類架構的特有問題。
Spring Cloud Eureka 、 Spring Cloud Zuul 等等,但我們還是專註關心這類架構的特有問題。
Spring Cloud Zuul 等等,但我們還是專註關心這類架構的特有問題。
假設由於某種原因 Service-B 需要更多的時間來響應。也許它正在執行 I/O操作 或長時間的 DB事務 ,或者進一步調用其它導致 Service-B 變得更慢的服務,這些都使其無法更具效率。
I/O操作
DB事務
現在,我們可以啟動更多的 Service-B 實例來解決這個問題,這樣很好,但是 Service-A 實際上是響應很快的,它需要等待 Service-B 的響應來進一步處理。這將導致 Service-A 無法接收更多的請求,這意味著我們還必須啟動 Service-A 的多個實例。
另一種方法解決類似情況的是使用事件驅動的微服務體系架構。這基本上意味著 Service-A 不直接通過 HTTP 調用 Service-B 或 Service-C ,而是將請求或事件發布給 message broker (消息代理)。 Service-B 和 Service-C 將成為 message broker (消息代理)上此事件的訂閱者。
HTTP
與依賴HTTP調用的傳統微服務體系架構相比,這有許多優點:
高級消息隊列協議(AMQP) 是 RabbitMQ 用於消息傳遞的協議。雖然 RabbitMQ 支持其他一些協議,但是 AMQP 由於兼容性和它提供的大量特性而更受歡迎。
AMQP
因此發布者將消息發布到 RabbitMQ 中稱為 Exchange (交換器)。 Exchange (交換器)接收消息並將其路由到一個或多個 Queues (隊列)。路由演算法依賴於 Exchange (交換器)類型和 routing(路由)key/header(與消息一起傳遞)。將 Exchange (交換器)連接到 Queues (隊列)的這些規則稱為 bindings (綁定)。
Exchange
Queues
routing
bindings
綁定可以有4種類型:
routing key
Topic
routing header
來源:http://www.cloudamqp.com/
通過 Exchange (交換器)和 Queues (隊列)發布和消費消息的整個過程是通過一個 Channel (通道)完成的。
Channel
有關路由的詳細信息,請訪問此鏈接。
我們可以從這裡下載並安裝基於我們的操作系統的二進位文件。
然而,在本文中,我們將使用 cloudamqp.com 提供的免費雲安裝。只需註冊服務並登錄即可。
cloudamqp.com 提供的免費雲安裝。只需註冊服務並登錄即可。
在主儀錶板上單擊 創建新實例 :
創建新實例
然後給你的實例起個名字,然後進入下一步:
然後選擇一個可用區:
最後,查看實例信息,點擊右下角的 創建實例 :
創建實例
就是這樣。現在在雲中運行了一個 RabbitMQ 實例。有關實例的更多信息,請轉到您的儀錶板並單擊 新創建的實例 :
新創建的實例
我們可以看到我們可以訪問RabbitMQ實例的主機,比如從我們的項目連接所需的用戶名和密碼:
我們將在Spring應用程序中使用 AMQP URL 連接到這個實例,所以請在某個地方記下它。
AMQP URL
您還可以通過單擊左上角的 RabbitMQ manager 來查看管理器控制台。這將採用它來管理的您的 RabbitMQ 實例。
RabbitMQ manager
現在我們的設置已經準備好了,讓我們創建我們的服務:
使用 Spring Initializr 創建一個腳手架項目。這將是我們的 producer 項目,我們將使用 REST 端點發布消息。
Spring Initializr 創建一個腳手架項目。這將是我們的 producer 項目,我們將使用 REST 端點發布消息。
producer
REST
選擇您喜歡的 Spring Boot 版本,添加 Web 和 Cloud Stream 依賴項,生成 Maven 項目:
Web
Cloud Stream
Maven
請注意 cloud-stream 依賴項。這也需要像 RabbitMQ 、 Kafka 等綁定器依賴項才能工作。
cloud-stream
Kafka
由於我們將使用 RabbitMQ ,添加以下 Maven 依賴項:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> 複製代碼
或者,我們也可以將兩者結合起來使用 spring-cloud-starter-stream-rabbit :
spring-cloud-starter-stream-rabbit
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> 複製代碼
使用同樣的方法,創建消費者項目,但僅使用 spring-cloud-starter-stream-rabbit 依賴項。
如前所述,將消息從發布者傳遞到隊列的整個過程是通過通道完成的。因此,讓我們創建一個 HelloBinding 介面,其中包含我們的消息機制 greetingChannel :
HelloBinding
greetingChannel
interface HelloBinding {
@Output("greetingChannel") MessageChannel greeting(); } 複製代碼
因為這將發布消息,所以我們使用 @Output 註解。方法名可以是我們想要的任意名稱,當然,我們可以在一個介面中有多個 Channel (通道)。
@Output
現在,讓我們創建一個 REST ,它將消息推送到這個 Channel (通道)
@RestController public class ProducerController {
private MessageChannel greet;
public ProducerController(HelloBinding binding) { greet = binding.greeting(); }
@GetMapping("/greet/{name}") public void publish(@PathVariable String name) { String greeting = "Hello, " + name + "!"; Message<String> msg = MessageBuilder.withPayload(greeting) .build(); this.greet.send(msg); } } 複製代碼
上面,我們創建了一個 ProducerController 類,它有一個 MessageChannel 類型的屬性 greet。這是通過我們前面聲明的方法在構造函數中初始化的。
ProducerController
MessageChannel
greet
注意: 我們可以用簡潔的方式做同樣的事情,但是我們使用不同的名稱來讓您更清楚地了解事物是如何連接的。
然後,我們有一個簡單的 REST 介面,它接收 PathVariable 的 name ,並使用 MessageBuilder 創建一個 String 類型的消息。最後,我們使用 MessageChannel 上的 .send() 方法來發布消息。
PathVariable
name
MessageBuilder
String
.send()
現在,我們將在的主類中添加 @EnableBinding 註解,傳入 HelloBinding 告訴 Spring 載入。
@EnableBinding
Spring
@EnableBinding(HelloBinding.class) @SpringBootApplication public class Application {
public static void main(String[] args) { SpringApplication.run(Application.class, args); } } 複製代碼
最後,我們必須告訴 Spring 如何連接到 RabbitMQ (通過前面的 AMQP URL ),並將 greetingChannel 連接到一可用的消費者。
這兩個都是在 application.properties 中定義的:
application.properties
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination = greetings
server.port=8080
現在,我們需要監聽之前創建的通道 greetingChannel 。讓我們為它創建一個綁定:
public interface HelloBinding {
String GREETING = "greetingChannel";
@Input(GREETING) SubscribableChannel greeting(); }
與生產者綁定的兩個非常明顯區別。因為我們正在消費消息,所以我們使用 SubscribableChannel 和 @Input 註解連接到 greetingChannel ,消息數據將被推送這裡。
SubscribableChannel
@Input
現在,讓我們創建處理數據的方法:
@EnableBinding(HelloBinding.class) public class HelloListener {
@StreamListener(target = HelloBinding.GREETING) public void processHelloChannelGreeting(String msg) { System.out.println(msg); } }
在這裡,我們創建了一個 HelloListener 類,在 processHelloChannelGreeting 方法上添加 @StreamListener 註解。這個方法需要一個字元串作為參數,我們剛剛在控制台列印了這個參數。我們還在類添加 @EnableBinding 啟用了 HelloBinding 。
HelloListener
processHelloChannelGreeting
@StreamListener
同樣,我們在這裡使用 @EnableBinding ,而不是主類,以便告訴我們如何使用。
看看我們的主類,我們沒有任何修改:
@SpringBootApplication public class Application {
在 application.properties 配置文件中,我們需要定義與生產者一樣的屬性,除了修改埠之外
spring.rabbitmq.addresses=<amqp url> spring.cloud.stream.bindings.greetingChannel.destination=greetings server.port=9090 複製代碼
讓我們同時啟動生產者和消費者服務。首先,讓我們通過點擊端點 http://localhost:8080/greet/john 來生產消息。
http://localhost:8080/greet/john
在消費者日誌中看到消息內容:
我們使用以下命令啟動另一個消費者服務實例(在另一個埠(9091)上):
$ mvn spring-boot:run -Dserver.port=9091
現在,當我們點擊生產者 REST 端點生產消息時,我們看到兩個消費者都收到了消息:
這可能是我們在一些用例中想要的。但是,如果我們只想讓一個消費者消費一條消息呢?為此,我們需要在 application.properties 中創建一個消費者組。消費者的配置文件:
spring.cloud.stream.bindings.greetingChannel.group = greetings-group
現在,再次在不同的埠上運行消費者的2個實例,並通過生產者生產消息再次查看:
這一切也可以在 RabbitMQ 管理器控制台看到:
在本文中,我們解釋了消息傳遞的主要概念、它在微服務中的角色以及如何使用 Spring Cloud Stream 實現它。我們使用 RabbitMQ 作為消息代理,但是我們也可以使用其他流行的代理,比 如Kafka ,只需更改配置和依賴項。
如Kafka
與往常一樣,本文使用的示例代碼可以在GitHub獲得完整的 源代碼 。
原文: stackabuse.com/spring-clou…
作者:Dhananjay Singh
推薦閱讀: