Spring Cloud Stream,官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中的binder对象交互,通过配置binding(绑定),而 Spring Cloud Stream 的binder对象负载与消息中间件交互,所以,我们只需要高清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式,通过使用Spring Integration 来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念
目前仅支持RabbitMQ、Kafka
官网:https://spring.io/projects/spring-cloud-stream
中文手册:https://www.springcloud.cc/spring-cloud-greenwich.html#spring-cloud-stream-overview-introducing
组成 | 说明 |
---|---|
Middleware | 中间件,目前只支持RabbitMQ和Kafka |
Binder | Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 指信道channel和exchange绑定在一起 |
环境准备
使用Eureka作为注册中心,搭建参考:【SpringCloud】快速入门(一)
使用RabbitMQ作为中间件,搭建参考:【RabbitMQ】 RabbitMQ安装
1、新建一个Spring Cloud Stream生产者模块(springcloud-stream-rabbitmq-provider8801)
2、编辑POM文件,引入stream依赖和eureka依赖
1 <!-- spring cloud stream rabbit --> 2 <dependency> 3 <groupId>org.springframework.cloud</groupId> 4 <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 5 </dependency> 6 7 <!-- eureka client --> 8 <dependency> 9 <groupId>org.springframework.cloud</groupId> 10 <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> 11 </dependency>
完整pom文件如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <parent> 6 <artifactId>test-springcloud</artifactId> 7 <groupId>com.test</groupId> 8 <version>1.0-SNAPSHOT</version> 9 </parent> 10 <modelVersion>4.0.0</modelVersion> 11 12 <artifactId>springcloud-stream-rabbitmq-provider8801</artifactId> 13 14 <dependencies> 15 16 <!-- spring cloud stream rabbit --> 17 <dependency> 18 <groupId>org.springframework.cloud</groupId> 19 <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 20 </dependency> 21 22 <!-- eureka client --> 23 <dependency> 24 <groupId>org.springframework.cloud</groupId> 25 <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> 26 </dependency> 27 28 <!-- spring boot --> 29 <dependency> 30 <groupId>org.springframework.boot</groupId> 31 <artifactId>spring-boot-starter-web</artifactId> 32 </dependency> 33 <dependency> 34 <groupId>org.springframework.boot</groupId> 35 <artifactId>spring-boot-starter-actuator</artifactId> 36 </dependency> 37 38 <dependency> 39 <groupId>org.springframework.boot</groupId> 40 <artifactId>spring-boot-devtools</artifactId> 41 <scope>runtime</scope> 42 <optional>true</optional> 43 </dependency> 44 45 <dependency> 46 <groupId>org.projectlombok</groupId> 47 <artifactId>lombok</artifactId> 48 <optional>true</optional> 49 </dependency> 50 <dependency> 51 <groupId>org.springframework.boot</groupId> 52 <artifactId>spring-boot-starter-test</artifactId> 53 <scope>test</scope> 54 </dependency> 55 56 </dependencies> 57 </project>
3、编辑配置文件,application.yml
1 # 端口 2 server: 3 port: 8801 4 5 spring: 6 application: 7 name: cloud-stream-provider 8 cloud: 9 stream: 10 binders: 11 # 表示定义的名称,用于binding的服务信息 12 defaultRabbit: 13 # 消息组件类型 14 type: rabbit 15 # 设置rabbitmq的相关配置的环境配置 16 environment: 17 spring: 18 rabbitmq: 19 host: 127.0.0.1 20 port: 5672 21 username: guest 22 password: guest 23 # 服务的整合处理 24 bindings: 25 # 这个名字是一个通道的名称 26 output: 27 # 表示要使用的Exchange 名称定义 28 destination: studyExchange 29 # 设置消息类型,本次为json,文本则设置"text/plain" 30 content-type: application/json 31 # 设置要绑定的消息服务的具体设置 32 binder: defaultRabbit 33 34 eureka: 35 client: 36 service-url: 37 defaultZone: http://localhost:8761/eureka
4、编写主启动方法类
1 @SpringBootApplication 2 public class StreamMQMain8801 { 3 4 public static void main(String[] args) { 5 SpringApplication.run(StreamMQMain8801.class, args); 6 } 7 }
5、编辑业务接口,IMessageProvider用来定义发送消息方法
1 public interface IMessageProvider { 2 public String send(); 3 }
6、编辑业务接口实现,MessageProviderImpl
1 import org.springframework.messaging.support.MessageBuilder; 2 3 import java.util.UUID; 4 5 // 定义消息的推送管道 6 @EnableBinding(Source.class) 7 public class MessageProviderImpl implements IMessageProvider { 8 9 @Autowired 10 // 消息发送通道 11 private MessageChannel output; 12 13 public String send() { 14 String serial = UUID.randomUUID().toString(); 15 output.send(MessageBuilder.withPayload(serial).build()); 16 System.out.println("====serial: " + serial); 17 return null; 18 } 19 }
7、编写controller
1 @RestController 2 public class SendMessageController { 3 4 @Autowired 5 private IMessageProvider messageProvider; 6 7 @RequestMapping(value = "/sendMessage") 8 public String sendMessage(){ 9 return messageProvider.send(); 10 } 11 }
8、测试
1)启动Eureka注册中心,启动RabbitMQ消息中间件,启动Stream生产者项目
2)查看RabbitMQ的web界面,在Exchang模块中,可以看到里面新增了一个名为studyExchange的交互器,类型为topic
3)新建一个queue,名为:test.news,且绑定到studyExchange。
4)访问地址:http://localhost:8801/sendMessage,发送消息,查看RabbitMQ后台
可以看到test.news此queue,已经收到消息
1、新建一个Spring Cloud Stream消费者模块(springcloud-stream-rabbitmq-consumer8802)
2、编辑POM文件,引入stream依赖和eureka依赖,同上
完整pom文件如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <parent> 6 <artifactId>test-springcloud</artifactId> 7 <groupId>com.test</groupId> 8 <version>1.0-SNAPSHOT</version> 9 </parent> 10 <modelVersion>4.0.0</modelVersion> 11 12 <artifactId>springcloud-stream-rabbitmq-consumer8802</artifactId> 13 14 <dependencies> 15 16 <!-- spring cloud stream rabbit --> 17 <dependency> 18 <groupId>org.springframework.cloud</groupId> 19 <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 20 </dependency> 21 22 <!-- eureka client --> 23 <dependency> 24 <groupId>org.springframework.cloud</groupId> 25 <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> 26 </dependency> 27 28 <!-- spring boot --> 29 <dependency> 30 <groupId>org.springframework.boot</groupId> 31 <artifactId>spring-boot-starter-web</artifactId> 32 </dependency> 33 <dependency> 34 <groupId>org.springframework.boot</groupId> 35 <artifactId>spring-boot-starter-actuator</artifactId> 36 </dependency> 37 38 <dependency> 39 <groupId>org.springframework.boot</groupId> 40 <artifactId>spring-boot-devtools</artifactId> 41 <scope>runtime</scope> 42 <optional>true</optional> 43 </dependency> 44 45 <dependency> 46 <groupId>org.projectlombok</groupId> 47 <artifactId>lombok</artifactId> 48 <optional>true</optional> 49 </dependency> 50 <dependency> 51 <groupId>org.springframework.boot</groupId> 52 <artifactId>spring-boot-starter-test</artifactId> 53 <scope>test</scope> 54 </dependency> 55 56 </dependencies> 57 </project>
3、便捷配置文件application.yml
1 # 端口 2 server: 3 port: 8802 4 5 spring: 6 application: 7 name: cloud-stream-consumer 8 cloud: 9 stream: 10 binders: 11 # 表示定义的名称,用于binding的服务信息 12 defaultRabbit: 13 # 消息组件类型 14 type: rabbit 15 # 设置rabbitmq的相关配置的环境配置 16 environment: 17 spring: 18 rabbitmq: 19 host: 127.0.0.1 20 port: 5672 21 username: guest 22 password: guest 23 # 服务的整合处理 24 bindings: 25 # 这个名字是一个通道的名称 26 input: 27 # 表示要使用的Exchange 名称定义 28 destination: studyExchange 29 # 设置消息类型,本次为json,文本则设置"text/plain" 30 content-type: application/json 31 # 设置要绑定的消息服务的具体设置 32 binder: defaultRabbit 33 # 消费分组 34 # group: testA 35 36 eureka: 37 client: 38 service-url: 39 defaultZone: http://localhost:8761/eureka
4、编写主启动方法类
1 @SpringBootApplication 2 public class StreamMQMain8802 { 3 public static void main(String[] args) { 4 SpringApplication.run(StreamMQMain8802.class, args); 5 } 6 }
5、编辑消息监听组件
1 @Component 2 @EnableBinding(Sink.class) 3 public class ReceiveMessageListenerController { 4 5 @Value("${server.port}") 6 private String serverPort; 7 8 @StreamListener(Sink.INPUT) 9 public void input(Message<String> message){ 10 System.out.println("消费者" + serverPort + ",消费信息:" + message.getPayload()); 11 } 12 }
6、测试
1)启动Eureka注册中心,启动RabbitMQ消息中间件,启动Stream生产者项目,以及启动Stream消费者项目
2)查看RaibbitMQ的Web后台,发现Queue中多了队列,即Stream消费者项目监听的队列,且此队列绑定了studyExchange
3)访问地址:http://localhost:8801/sendMessage,发送消息,查看RabbitMQ后台
可以看到test.news此queue,已经收到消息,且Stream消费者项目也收到消息,并处理了
【SpringCloud】Spring Cloud Stream 消息驱动(二十三)
原文:https://www.cnblogs.com/h--d/p/12840086.html