【Spring Cloud Stream 消息驱动】 —— 每天一点小知识

🐳Spring Cloud Stream 消息驱动

在微服务架构中,消息驱动是一种常见的通信方式,它通过解耦和异步处理提供了可靠的服务间通信机制。Spring Cloud Stream 是 Spring Cloud 生态系统中的一个模块,它提供了一种简化和统一的方式来构建基于消息驱动的应用程序。本文将介绍 Spring Cloud Stream 的基本概念和用法,并通过一个案例来说明如何使用 Spring Cloud Stream 实现消息驱动。


消息驱动概述

  💧在传统的应用程序架构中,服务间的通信通常是通过直接调用来完成的,这种紧耦合的方式会导致系统的可扩展性和灵活性受限。而消息驱动则提供了一种解耦的方式,将消息作为信息载体,在不同的服务之间传递和处理。消息驱动的架构模式具有以下优势:

 • 解耦性:消息驱动通过将消息作为中介来实现服务之间的通信,服务之间不直接依赖于彼此的存在和实现细节,从而实现解耦。
 • 异步处理:消息驱动允许发送方将消息发送到消息队列后立即返回,而不需要等待接收方处理完毕。这种异步处理的方式可以提高系统的响应性和吞吐量。
 • 可靠性:消息驱动使用消息队列来存储和传递消息,消息队列通常具备持久化、可靠性和高可用性的特性,从而确保消息的可靠传递。

  💧Spring Cloud Stream 提供了一种简化和统一的编程模型,使得开发人员可以更轻松地使用消息驱动来构建应用程序。它提供了抽象的消息绑定层,使得应用程序可以与不同的消息中间件(如 RabbitMQ、Kafka 等)进行集成,而无需关心底层消息中间件的细节。

案例说明

  💧我们以一个在线商城的订单系统为例来说明如何使用 Spring Cloud Stream 实现消息驱动。订单系统包括两个微服务:订单服务和库存服务。当用户下单时,订单服务将订单信息发布到消息队列中,库存服务订阅订单消息并处理库存扣减操作。

  💧接下来,让我们一步一步实现这个案例,并使用 RabbitMQ 作为消息中间件。

消息驱动之生产者

  💧首先,我们需要创建订单服务作为消息驱动的生产者。订单服务负责将订单信息发布到消息队列中。

 1. 在您的订单服务项目中,添加以下依赖:
<
dependency>
  org.springframework.cloud
  spring-cloud-starter-stream-rabbit
 1. application.properties 文件中添加 RabbitMQ 的连接信息:
spring.cloud.stream.bindings.output.destination=orders
spring.cloud.stream.bindings.output.content-type=application/json
spring.rabbitmq.host=
spring.rabbitmq.port=
spring.rabbitmq.username=
spring.rabbitmq.password=
 1. 创建一个名为 OrderProducer 的类,用于发送订单消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Source.class)
public class OrderProducer {
  private final Source source;
  @Autowired
  public OrderProducer(Source source) {
    this.source = source;
  }
  public void sendOrderMessage(Order order) {
    source.output().send(MessageBuilder.withPayload(order).build());
  }
}
 1. 创建一个名为 Order 的类,用于表示订单信息:
public class Order {
  private String orderId;
  private String customerId;
  // 其他订单属性和方法省略...
  // 构造函数、getter、setter 省略...
}
 1. 在需要发送订单消息的地方,使用 OrderProducer 发送消息。例如,在一个订单控制器中:
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController {
  private final OrderProducer orderProducer;
  public OrderController(OrderProducer orderProducer) {
    this.orderProducer = orderProducer;
  }
  @PostMapping("/place-order")
  public String placeOrder(@RequestBody Order order) {
    // 处理订单逻辑...
    // 发送订单消息
    orderProducer.sendOrderMessage(order);
    return "Order placed successfully";
  }
}

  💧通过完成上述步骤,订单服务就成为了消息驱动的生产者,它将订单信息发布到名为 “orders” 的消息队列中。

  💧接下来,我们将创建库存服务作为消息驱动的消费者,并处理订单消息中的库存扣减操作。

消息驱动之消费者

  💧我们将创建库存服务作为消息驱动的消费者,它将订阅订单消息并处理库存扣减操作。

 1. 在库存服务项目中,添加以下依赖:

  org.springframework.cloud
  spring-cloud-starter-stream-rabbit
 1. application.properties 文件中添加 RabbitMQ 的连接信息,与订单服务中的配置保持一致。
 2. 创建一个名为 OrderConsumer 的类,用于接收和处理订单消息:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class OrderConsumer {
  @StreamListener(Sink.INPUT)
  public void processOrderMessage(Order order) {
    // 处理订单消息,执行库存扣减操作
    // ...
    // 打印日志
    System.out.println("Received order: " + order.getOrderId());
  }
}
 1. 启动库存服务应用程序。库存服务现在已经成为消息驱动的消费者,并订阅了名为 “orders” 的消息队列。每当有订单消息到达队列时,processOrderMessage 方法将被调用,并处理相应的库存扣减操作。

  💧通过以上步骤,我们完成了消息驱动的生产者和消费者的搭建。订单服务作为生产者将订单消息发布到消息队列中,而库存服务作为消费者订阅订单消息并处理相应的业务逻辑。

分组与持久化

  💧Spring Cloud Stream 还提供了一些高级功能,例如消费者分组和消息持久化,以满足更复杂的应用需求。

消费者分组

  💧消费者分组可以确保相同分组名称的消费者实例共享消息的处理负载。这对于水平扩展和负载均衡非常有用。

  💧要为消费者设置分组,只需在消费者类上添加 @EnableBinding 注解,并在 @StreamListener 注解中指定分组名称,如下所示:

@Component
@EnableBinding(Sink.class)
public class OrderConsumer {
  @StreamListener(target = Sink.INPUT, condition = "headers['type']=='order' and headers['group']=='group1'")
  public void processOrderMessage(Order order) {
    // 处理订单消息
    // ...
  }
}

消息持久化

  💧消息持久化是确保消息在发生故障或重启后仍然可靠地传递的重要机制。Spring Cloud Stream 默认情况下会将消息持久化到消息中间件中,但需要确保消息中间件也配置了持久化机制。

  💧例如,对于 RabbitMQ,可以通过在 application.properties 文件中设置以下属性来启用消息持久化:

spring.cloud.stream.rabbit.bindings.input.consumer.de
clare-durable-queue=true
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true

  💧这将确保消费者队列和订阅是持久化的。

总结

通过使用 Spring Cloud Stream,我们可以轻松构建基于消息驱动的应用程序。本文介绍了消息驱动的概念,通过一个在线商城的订单系统案例演示了如何使用 Spring Cloud Stream 来实现消息驱动。我们创建了订单服务作为消息驱动的生产者,将订单信息发布到消息队列中;同时创建了库存服务作为消息驱动的消费者,订阅订单消息并处理库存扣减操作。此外,还介绍了消费者分组和消息持久化的高级功能。

发表评论