目录

1、为什么要引入SpringCloud Stream

2、SpringCloud Stream简介

2.1、标准MQ架构图

2.2、SpringCloud Stream架构图

2.3、SpringCloud Stream处理流程

3、如何使用SpringCloud Stream

3.1、创建springcloud-stream-sender项目(消息生产者)

3.1.1、在pom文件中引入依赖

3.1.2、添加application.yml并配置

3.1.3、创建启动类

3.1.4、创建Service服务,发送消息到MQ

3.1.5、创建Controller对外提供接口服务

 3.2、创建springcloud-stream-consumer项目(消息消费者)

3.2.1、在pom文件中引入依赖

3.2.2、添加application.yml并配置

3.2.3、创建启动类

3.2.4、编写消息接收类,监听消费消息

3.3、启动RabbitMQ服务器,访问管理页面http://localhost:15672/

3.4、启动springcloud-stream-sender消息生产者和springcloud-stream-consumer消息消费者服务

3.5、使用IDEAJ自带的http client发送一个post请求到消息生产者

4、SpringCloud Stream集群消费

5、切换消息中间件(RabbitMQ--->Kafka)

5.1、修改springcloud-stream-sender项目

5.2、修改springcloud-stream-consumer项目

5.3、启动Zookeeper、Kafka(服务器)、send(消息生产者)、consumer(消息消费者)

6、自定义通道名称

6.1、自定义输出通道(发布消息)

6.1.1、创建接口MyOutput

6.1.2、修改消息发送服务注解@EnableBinding

6.1.3、修改配置application.yml文件

6.2、自定义输入通道(接收消息)

6.2.1、创建接口

6.2.2、修改消息接收监听服务注解@EnableBinding

6.2.3、修改配置application.yml文件


1、为什么要引入SpringCloud Stream

在实际的企业开发中,消息中间件是至关重要的组件之一。主要解决服务解耦、异步消费、流量削峰等问题。实现高性能、高可用、高扩展和最终一致性架构。不同消息中间件的实现方式和内部结构不一致,如常见的RabbitMQ和Kafka,由于这两个消息中间件的架构不同,像RabbitMQ有exchange,Kafka有topic和partition分区,这些中间件的差异给我们项目开发造成了一定的困扰,如果选择了其中一种消息中间件,由于中间件的差异性,一旦业务需要迁移部分消息队列到其他的消息中间件,这将是一件灾难性的工作,很多工作需要推到重新做,因为消息中间件和项目耦合了,这时候SpringCloud Stream给我们带来了福音,它给我们提供了一种解耦方式,我们不需要关注具体使用的是什么MQ,它会自动的给我们在各种MQ内切换。

引入SpringCloud Stream的目的:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型(解耦)

2、SpringCloud Stream简介

   SpringCloud Stream是一个用来为微服务应用构建消息驱动能力的框架,它可以基于SpringBoot来创建独立的、可用于生产的Spring应用程序。同时为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。

通过使用 SpringCloud Stream可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务的逻辑实现。但是目前SpringCloud Stream只支持RabbitMQ和Kafka的自动化配置。

SpringCloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与SpringCloud Stream中的binder对象交互。

2.1、标准MQ架构图

 

·  生产者/消费者之间靠消息媒介Message传递信息内容

·  消息必须走特定的消息通道MessageChannel

·  消息通道里的消息,消费和收发都是靠消息通道的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

2.2、SpringCloud Stream架构图

 

Middleware中间件,目前只支持RabbitMQ和Kafka

Binder是应用与消息中间件之间的封装,完美地实现了应用程序与消息中间件细节之间的隔离,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现

@Input注解标识输入通道,通过该输入通道接收的消息进入应用程序

@Output注解标识输出通道,发布的消息将通过该通道离开应用程序

@StreamListener监听队列,用于消费者的队列的消息接收

@EnableBinding指信道channel和exchange绑定在一起

2.3、SpringCloud Stream处理流程

 

Binder:很方便的连接中间件,屏蔽差异
Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。
Source和Sink:简单的可理解为参照对象是SpringCloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

Source:org.springframework.cloud.stream.messaging.Source

Sink:org.springframework.cloud.stream.messaging.Sink

3、如何使用SpringCloud Stream

3.1、创建springcloud-stream-sender项目(消息生产者)

3.1.1、在pom文件中引入依赖

<!--引入web模块-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--引入SpringCloud Stream依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3.1.2、添加application.yml并配置

server:
  port: 9061
spring:
  application:
    name: stream-sender
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings: #服务的整合处理
        output: #通道名称,根据@EnableBinding配置的类里的属性OUTPUT的值来配
          destination: TestExchange #Exchange名称,消息目的地
          context-type: application/json #设置消息的类型
          binder: rabbit #设置要绑定的消息服务类型

3.1.3、创建启动类

@SpringBootApplication
public class SenderApp {
    public static void main( String[] args ) {
        SpringApplication.run(SenderApp.class,args);
    }
}

3.1.4、创建Service服务,发送消息到MQ

注意包不要引入错了

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

//属性为org.springframework.cloud.stream.messaging下的Source类
@EnableBinding(Source.class)
public class SendService {

    @Resource
    MessageChannel output;

    @PostMapping("/send")
    public Boolean sendMsg(String msg){
        //MessageBuilder是org.springframework.messaging.support下,注意不要引错包
        boolean result = this.output.send(MessageBuilder.withPayload(msg).build());
        return result;
    }
}

@EnableBinding(Source.class)

 3.1.5、创建Controller对外提供接口服务

@RestController
public class SendController {

    @Resource
    private SendService sendService;

    @PostMapping("/send")
    public String send(){
        String msg="测试信息";
        Boolean result = this.sendService.sendMsg(msg);
        System.out.println("send msg status:  "+result);
        return "发送成功";
    }
}

 3.2、创建springcloud-stream-consumer项目(消息消费者)

3.2.1、在pom文件中引入依赖

<!--引入web模块-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--引入SpringCloud Stream依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3.2.2、添加application.yml并配置

server:
  port: 9062
spring:
  application:
    name: stream-consumer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings: #服务的整合处理
        input: #通道名称
          destination: TestExchange #Exchange名称
          context-type: application/json #设置消息的类型
          binder: rabbit #设置要绑定的消息服务类型

Springcloud stream配置基本和生产者sender端一样,只是把bindings下的output改成input

3.2.3、创建启动类

@SpringBootApplication
public class ConsumerApp {
    public static void main( String[] args ) {
        SpringApplication.run(ConsumerApp.class,args);
    }
}

3.2.4、编写消息接收类,监听消费消息

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

@EnableBinding(Sink.class)
public class MsgListener {

    @Value("${server.port}")
    private String port;

    @StreamListener(Sink.INPUT)
    public void listener(Message<String> msg){
        System.out.println("消费端接受的消息是:  "+msg+"------port: "+port);
    }
}

3.3、启动RabbitMQ服务器,访问管理页面http://localhost:15672/

管理页面默认端口号为15672

3.4、 启动springcloud-stream-sender消息生产者和springcloud-stream-consumer消息消费者服务

springcloud-stream-sender:http://localhost:9061

springcloud-stream-consumer:http://localhost:9062

重新访问RabbitMQ的管理界面,可以看到多了一个Exchange,就是我们在配置文件里配置的Exchange名称

3.5、使用IDEAJ自带的http client发送一个post请求到消息生产者

http://localhost:9061/send

 可以看到返回发送成功的提示

查看消息生产者的后台日志

 查看消息消费者的后台日志

 可以看到消息的内容是GenericMessage

消费端接受的消息是:

 GenericMessage[
payload=测试信息, 
headers={
amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=TestExchange, 
amqp_deliveryTag=1,
 deliveryAttempt=1, amqp_consumerQueue=TestExchange.anonymous.gXVX6rHVSQWlXf7SasUwvQ, amqp_redelivered=false, 
amqp_receivedRoutingKey=TestExchange, 
amqp_timestamp=Fri Apr 28 12:36:31 CST 2023, amqp_messageId=fadf505d-92e9-e54c-c86b-62d577e4706b, id=fe3521c0-eeb7-c85b-c6be-43ce6c23bd92, amqp_consumerTag=amq.ctag-R7q-QWuU7PNqfmYOU5y9oQ, 
sourceData=(Body:'测试信息' MessageProperties [headers={}, timestamp=Fri Apr 28 12:36:31 CST 2023, messageId=fadf505d-92e9-e54c-c86b-62d577e4706b, contentType=application/json, 
contentLength=0, 
receivedDeliveryMode=PERSISTENT, 
priority=0, 
redelivered=false, 
receivedExchange=TestExchange, 
receivedRoutingKey=TestExchange,
deliveryTag=1, 
consumerTag=amq.ctag-R7q-QWuU7PNqfmYOU5y9oQ, consumerQueue=TestExchange.anonymous.gXVX6rHVSQWlXf7SasUwvQ]), contentType=application/json,
timestamp=1682656592010}]------port: 9062

4、SpringCloud Stream集群消费

4.1、修改springcloud-stream-consumer的配置文件,将端口号改为9063,运行springcloud-stream-consumer的启动类启动服务

4.2、这样就形成了一个集群

4.3、 再次请求http://localhost:9061/send发送消息

4.4、查看消费者端的控制台打印信息

 可以看到消费者A、消费者B都打印了消息,即造成了消息重复消费的问题

4.5、如果消息生产者发送消息过程中,生产者发送消息成功,但是消费者A、消费者B在消息发送过程中由于某些原因宕机了,但是重启后没有消费消息,这导致了消息丢失。

4.6、解决消息重复消费和消息丢失的问题,只要在消息消费端配置消费者组就可以解决这个问题

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings: #服务的整合处理
        input: #通道名称
          destination: TestExchange #Exchange名称
          context-type: application/json #设置消息的类型
          binder: rabbit #设置要绑定的消息服务类型
          group: cousumer_group #配置消费者组解决消息重复消费和丢失问题

4.7、使用http://localhost:9061/send发送6条消息

4.8、 查看消费者A、消费者B的控制台

 

 可以看到消费者A、消费者B各消费了3条消息,解决了重复消费

4.9、关掉消费者A、消费者B服务器后,使用http://localhost:9061/send发送3条消息,重新启动消费者A、消费者B服务器,查看消费者A、消费者B的控制台

 

 

消费者A消费了三条消息

消费者B没有消费消息

这样就解决了消息丢失、消息重复消费的问题

关掉消费者A、消费者B服务器后,使用http://localhost:9061/send发送3条消息后查看RabbitMQ的管理页面,可以看到有3条消息待消费

 重启重新启动消费者A、消费者B服务器后刷新界面,消息被消费者消费了

 5、切换消息中间件(RabbitMQ--->Kafka)

5.1、修改springcloud-stream-sender项目

5.1.1、在pom文件中引入Kafka依赖

<!--引入kafka依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

5.1.2、改造springcloud-stream-sender项目配置文件结构

主配置文件application.yml配置如下

server:
  port: 9061
spring:
  application:
    name: stream-sender
  profiles:
    active: kafka   #激活kafka配置文件
#    active: rabbit

application-rabbit.yml配置如下

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings: #服务的整合处理
        output: #通道名称,根据@EnableBinding配置的类里的属性OUTPUT的值来配
          destination: TestExchange #Exchange名称,消息目的地
          context-type: application/json #设置消息的类型
          binder: rabbit #设置要绑定的消息服务类型

application-kafka.yml配置如下

spring:
  cloud:
    stream:
      kafka:  #kafka的连接地址
        binder:
          brokers: localhost:9092
      bindings: #服务的整合处理
        output:
          destination: TestTopic #Exchange名称
          context-type: application/json #设置消息的类型
          binder: kafka #设置要绑定的消息服务类型

5.2、修改springcloud-stream-consumer项目

5.2.1、在pom文件中引入Kafka依赖

<!--引入kafka依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

5.2.2、改造springcloud-stream-consumer项目配置文件结构

主配置文件application.yml配置如下

server:
  port: 9062
spring:
  application:
    name: stream-consumer
  profiles:
    active: kafka  #激活kafka配置文件
#    active: rabbit

application-rabbit.yml配置如下

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings: #服务的整合处理
        input: #通道名称
          destination: TestExchange #Exchange名称
          context-type: application/json #设置消息的类型
          binder: rabbit #设置要绑定的消息服务类型
          group: cousumer_group #配置消费者组解决消息重复消费和丢失问题

application-kafka.yml配置如下

spring:
  cloud:
    stream:
      kafka:  #kafka的连接地址
        binder:
          brokers: localhost:9092
      bindings: #服务的整合处理
        input:
          destination: TestTopic #Exchange名称
          context-type: application/json #设置消息的类型
          binder: kafka #设置要绑定的消息服务类型
          group: consumer_group #配置消费者组解决消息重复消费和丢失问题

5.3、启动Zookeeper、Kafka(服务器)、send(消息生产者)、consumer(消息消费者)

本地启动Zookeeper、Kafka时需要修改的配置文件可百度查询

6、自定义通道名称

参照Source与Sink的格式自定义输入输出通道名称

Source:org.springframework.cloud.stream.messaging.Source

Sink:org.springframework.cloud.stream.messaging.Sink

6.1、自定义输出通道(发布消息)

6.1.1、创建接口MyOutput

public interface MyOutput {
    String OUTPUT = "MyOutput";

    @Output("MyOutput")
    MessageChannel output();
}

6.1.2、修改消息发送服务注解@EnableBinding

//属性为org.springframework.cloud.stream.messaging下的Source类
//@EnableBinding(Source.class)
@EnableBinding(MyOutput.class)
public class SendService {

    @Resource
    MessageChannel MyOutput;

    @PostMapping("/send")
    public Boolean sendMsg(String msg){
        //MessageBuilder是org.springframework.messaging.support下,注意不要引错包
        boolean result = this.MyOutput
.send(MessageBuilder.withPayload(msg).build());
        return result;
    }
}

6.1.3、修改配置application.yml文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings: #服务的整合处理
        output: #通道名称,根据@EnableBinding配置的类里的属性OUTPUT的值来配
          destination: TestExchange #Exchange名称,消息目的地
          context-type: application/json #设置消息的类型
          binder: rabbit #设置要绑定的消息服务类型
        MyOutput: #通道名称,根据@EnableBinding配置的类里的属性OUTPUT的值来配
          destination: MyOutputExchange #Exchange名称,消息目的地
          context-type: application/json #设置消息的类型
          binder: rabbit #设置要绑定的消息服务类型

 可以定义多个通道,每个通道可以配置自己独立的消息中间件(可以rabbit,也可以kafka)

6.2、自定义输入通道(接收消息)

6.2.1、创建接口

public interface MyInput {
    String INPUT = "MyInput";

    @Input("MyInput")
    SubscribableChannel input();
}

 6.2.2、修改消息接收监听服务注解@EnableBinding

//@EnableBinding(Sink.class)
@EnableBinding(MyInput.class)
public class MsgListener {

    @Value("${server.port}")
    private String port;

    @StreamListener(MyInput.INPUT)
    public void listener(Message<String> msg){
        System.out.println("消费端接受的消息是:  "+msg+"------port: "+port);
    }
}

 

6.2.3、修改配置application.yml文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings: #服务的整合处理
        input: #通道名称
          destination: TestExchange #Exchange名称
          context-type: application/json #设置消息的类型
          binder: rabbit #设置要绑定的消息服务类型
          group: cousumer_group #配置消费者组解决消息重复消费和丢失问题
        MyInput: #通道名称
          destination: MyOutputExchange #Exchange名称,与对应的输出绑定同一个Exchange
          context-type: application/json #设置消息的类型
          binder: rabbit #设置要绑定的消息服务类型
          group: myoutput_group #配置消费者组解决消息重复消费和丢失问题

 

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐