在本文中,我们将向您介绍 Spring Cloud Stream ,这是一个用于构建消息驱动的微服务应用程序的框架,这些应用程序由一个常见的消息传递代理(如 "https://link.juejin.im/?target=https%3A%2F%2Fwww.rabbitmq.com%2F">RabbitMQ 、 Apache Kafka 等)连接。
Spring Cloud Stream ,这是一个用于构建消息驱动的微服务应用程序的框架,这些应用程序由一个常见的消息传递代理(如 "https://link.juejin.im/?target=https%3A%2F%2Fwww.rabbitmq.com%2F">RabbitMQ 、 Apache Kafka 等)连接。
Apache Kafka 等)连接。
Spring Cloud Stream 构建在现有Spring框架(如 Spring Messaging 和 Spring Integration )之上。尽管这些框架经过了实战测试,工作得非常好,但是实现与使用的 message broker 紧密耦合。此外,有时对某些用例进行扩展是困难的。
Spring Cloud Stream
Spring Messaging 和 Spring Integration )之上。尽管这些框架经过了实战测试,工作得非常好,但是实现与使用的 message broker 紧密耦合。此外,有时对某些用例进行扩展是困难的。
Spring Integration )之上。尽管这些框架经过了实战测试,工作得非常好,但是实现与使用的 message broker 紧密耦合。此外,有时对某些用例进行扩展是困难的。
message broker
Spring Cloud Stream 背后的想法是一个非常典型的 Spring Boot 概念—— 抽象地讲,让Spring根据配置和依赖关系管理在运行时找出实现自动注入 。这意味著您可以通过更改依赖项和配置文件来更改 message broker 。可以在这里找到目前已经支持的各种消息代理。
Spring Boot
抽象地讲,让Spring根据配置和依赖关系管理在运行时找出实现自动注入
本文将使用 RabbitMQ 作为 message broker 。在此之前,让我们了解一下 broker (代理)的一些基本概念,以及为什么要在面向微服务的体系架构中需要它。
RabbitMQ
broker
在微服务体系架构中,我们有许多相互通信以完成请求的小型应用程序—它们的主要优点之一是改进了的可伸缩性。一个请求从多个下游微服务传递到完成是很常见的。例如,假设我们有一个 Service-A 内部调用 Service-B 和 Service-C 来完成一个请求:
Service-A
Service-B
Service-C
是的,还会有其他组件,比如 Spring Cloud Eureka 、 Spring Cloud Zuul 等等,但我们还是专注关心这类架构的特有问题。
Spring Cloud Eureka 、 Spring Cloud Zuul 等等,但我们还是专注关心这类架构的特有问题。
Spring Cloud Zuul 等等,但我们还是专注关心这类架构的特有问题。
假设由于某种原因 Service-B 需要更多的时间来响应。也许它正在执行 I/O操作 或长时间的 DB事务 ,或者进一步调用其它导致 Service-B 变得更慢的服务,这些都使其无法更具效率。
I/O操作
DB事务
现在,我们可以启动更多的 Service-B 实例来解决这个问题,这样很好,但是 Service-A 实际上是响应很快的,它需要等待 Service-B 的响应来进一步处理。这将导致 Service-A 无法接收更多的请求,这意味著我们还必须启动 Service-A 的多个实例。
另一种方法解决类似情况的是使用事件驱动的微服务体系架构。这基本上意味著 Service-A 不直接通过 HTTP 调用 Service-B 或 Service-C ,而是将请求或事件发布给 message broker (消息代理)。 Service-B 和 Service-C 将成为 message broker (消息代理)上此事件的订阅者。
HTTP
与依赖HTTP调用的传统微服务体系架构相比,这有许多优点:
高级消息队列协议(AMQP) 是 RabbitMQ 用于消息传递的协议。虽然 RabbitMQ 支持其他一些协议,但是 AMQP 由于兼容性和它提供的大量特性而更受欢迎。
AMQP
因此发布者将消息发布到 RabbitMQ 中称为 Exchange (交换器)。 Exchange (交换器)接收消息并将其路由到一个或多个 Queues (队列)。路由演算法依赖于 Exchange (交换器)类型和 routing(路由)key/header(与消息一起传递)。将 Exchange (交换器)连接到 Queues (队列)的这些规则称为 bindings (绑定)。
Exchange
Queues
routing
bindings
绑定可以有4种类型:
routing key
Topic
routing header
来源:http://www.cloudamqp.com/
通过 Exchange (交换器)和 Queues (队列)发布和消费消息的整个过程是通过一个 Channel (通道)完成的。
Channel
有关路由的详细信息,请访问此链接。
我们可以从这里下载并安装基于我们的操作系统的二进位文件。
然而,在本文中,我们将使用 cloudamqp.com 提供的免费云安装。只需注册服务并登录即可。
cloudamqp.com 提供的免费云安装。只需注册服务并登录即可。
在主仪表板上单击 创建新实例 :
创建新实例
然后给你的实例起个名字,然后进入下一步:
然后选择一个可用区:
最后,查看实例信息,点击右下角的 创建实例 :
创建实例
就是这样。现在在云中运行了一个 RabbitMQ 实例。有关实例的更多信息,请转到您的仪表板并单击 新创建的实例 :
新创建的实例
我们可以看到我们可以访问RabbitMQ实例的主机,比如从我们的项目连接所需的用户名和密码:
我们将在Spring应用程序中使用 AMQP URL 连接到这个实例,所以请在某个地方记下它。
AMQP URL
您还可以通过单击左上角的 RabbitMQ manager 来查看管理器控制台。这将采用它来管理的您的 RabbitMQ 实例。
RabbitMQ manager
现在我们的设置已经准备好了,让我们创建我们的服务:
使用 Spring Initializr 创建一个脚手架项目。这将是我们的 producer 项目,我们将使用 REST 端点发布消息。
Spring Initializr 创建一个脚手架项目。这将是我们的 producer 项目,我们将使用 REST 端点发布消息。
producer
REST
选择您喜欢的 Spring Boot 版本,添加 Web 和 Cloud Stream 依赖项,生成 Maven 项目:
Web
Cloud Stream
Maven
请注意 cloud-stream 依赖项。这也需要像 RabbitMQ 、 Kafka 等绑定器依赖项才能工作。
cloud-stream
Kafka
由于我们将使用 RabbitMQ ,添加以下 Maven 依赖项:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> 复制代码
或者,我们也可以将两者结合起来使用 spring-cloud-starter-stream-rabbit :
spring-cloud-starter-stream-rabbit
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> 复制代码
使用同样的方法,创建消费者项目,但仅使用 spring-cloud-starter-stream-rabbit 依赖项。
如前所述,将消息从发布者传递到队列的整个过程是通过通道完成的。因此,让我们创建一个 HelloBinding 介面,其中包含我们的消息机制 greetingChannel :
HelloBinding
greetingChannel
interface HelloBinding {
@Output("greetingChannel") MessageChannel greeting(); } 复制代码
因为这将发布消息,所以我们使用 @Output 注解。方法名可以是我们想要的任意名称,当然,我们可以在一个介面中有多个 Channel (通道)。
@Output
现在,让我们创建一个 REST ,它将消息推送到这个 Channel (通道)
@RestController public class ProducerController {
private MessageChannel greet;
public ProducerController(HelloBinding binding) { greet = binding.greeting(); }
@GetMapping("/greet/{name}") public void publish(@PathVariable String name) { String greeting = "Hello, " + name + "!"; Message<String> msg = MessageBuilder.withPayload(greeting) .build(); this.greet.send(msg); } } 复制代码
上面,我们创建了一个 ProducerController 类,它有一个 MessageChannel 类型的属性 greet。这是通过我们前面声明的方法在构造函数中初始化的。
ProducerController
MessageChannel
greet
注意: 我们可以用简洁的方式做同样的事情,但是我们使用不同的名称来让您更清楚地了解事物是如何连接的。
然后,我们有一个简单的 REST 介面,它接收 PathVariable 的 name ,并使用 MessageBuilder 创建一个 String 类型的消息。最后,我们使用 MessageChannel 上的 .send() 方法来发布消息。
PathVariable
name
MessageBuilder
String
.send()
现在,我们将在的主类中添加 @EnableBinding 注解,传入 HelloBinding 告诉 Spring 载入。
@EnableBinding
Spring
@EnableBinding(HelloBinding.class) @SpringBootApplication public class Application {
public static void main(String[] args) { SpringApplication.run(Application.class, args); } } 复制代码
最后,我们必须告诉 Spring 如何连接到 RabbitMQ (通过前面的 AMQP URL ),并将 greetingChannel 连接到一可用的消费者。
这两个都是在 application.properties 中定义的:
application.properties
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination = greetings
server.port=8080
现在,我们需要监听之前创建的通道 greetingChannel 。让我们为它创建一个绑定:
public interface HelloBinding {
String GREETING = "greetingChannel";
@Input(GREETING) SubscribableChannel greeting(); }
与生产者绑定的两个非常明显区别。因为我们正在消费消息,所以我们使用 SubscribableChannel 和 @Input 注解连接到 greetingChannel ,消息数据将被推送这里。
SubscribableChannel
@Input
现在,让我们创建处理数据的方法:
@EnableBinding(HelloBinding.class) public class HelloListener {
@StreamListener(target = HelloBinding.GREETING) public void processHelloChannelGreeting(String msg) { System.out.println(msg); } }
在这里,我们创建了一个 HelloListener 类,在 processHelloChannelGreeting 方法上添加 @StreamListener 注解。这个方法需要一个字元串作为参数,我们刚刚在控制台列印了这个参数。我们还在类添加 @EnableBinding 启用了 HelloBinding 。
HelloListener
processHelloChannelGreeting
@StreamListener
同样,我们在这里使用 @EnableBinding ,而不是主类,以便告诉我们如何使用。
看看我们的主类,我们没有任何修改:
@SpringBootApplication public class Application {
在 application.properties 配置文件中,我们需要定义与生产者一样的属性,除了修改埠之外
spring.rabbitmq.addresses=<amqp url> spring.cloud.stream.bindings.greetingChannel.destination=greetings server.port=9090 复制代码
让我们同时启动生产者和消费者服务。首先,让我们通过点击端点 http://localhost:8080/greet/john 来生产消息。
http://localhost:8080/greet/john
在消费者日志中看到消息内容:
我们使用以下命令启动另一个消费者服务实例(在另一个埠(9091)上):
$ mvn spring-boot:run -Dserver.port=9091
现在,当我们点击生产者 REST 端点生产消息时,我们看到两个消费者都收到了消息:
这可能是我们在一些用例中想要的。但是,如果我们只想让一个消费者消费一条消息呢?为此,我们需要在 application.properties 中创建一个消费者组。消费者的配置文件:
spring.cloud.stream.bindings.greetingChannel.group = greetings-group
现在,再次在不同的埠上运行消费者的2个实例,并通过生产者生产消息再次查看:
这一切也可以在 RabbitMQ 管理器控制台看到:
在本文中,我们解释了消息传递的主要概念、它在微服务中的角色以及如何使用 Spring Cloud Stream 实现它。我们使用 RabbitMQ 作为消息代理,但是我们也可以使用其他流行的代理,比 如Kafka ,只需更改配置和依赖项。
如Kafka
与往常一样,本文使用的示例代码可以在GitHub获得完整的 源代码 。
原文: stackabuse.com/spring-clou…
作者:Dhananjay Singh
推荐阅读: