Spring Cloud Stream
在学习尚硅谷Spring Cloud的视频中,发现Spring官网对于Spring Cloud Stream文档最老都是3.1.6,且3.1版本后的Spring Cloud Stream与视频中的2.x版本发生了较大的更改,官网也不再建议使用2.x版本。另外尚硅谷视频的老师在讲这个知识点时感觉讲的很不清晰,很多时候都是走马观花。因此,笔者打算自己整理一版Spring Cloud 3.x的文档。
文档的整理过程也是个人学习的过程,鉴于笔者水平有限,文档可能存在很多问题,望大家多多指正。
文档参考了很多其他人的博客和笔记,笔者在文章的最后都有标出,如有侵权,烦请告知。
1. 为什么要有Spring Cloud Stream
很自然的第一个问题:为什么要有Spring Cloud Stream?
我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套API,无需关心具体的消息队列实现”。
这样理解是有些不全面的,Spring Cloud Stream的核心是Stream,准确来讲Spring Cloud Stream提供了一整套数据流走向(流向)的API, 它的最终目的是使我们不关心数据的流入和写出,而只关心对数据的业务处理
我们举一个例子:你们公司有一套系统,这套系统由多个模块组成,你负责其中一个模块。数据会从第一个模块流入,处理完后再交给下一个模块。对于你负责的这个模块来说,它的功能就是接收上一个模块处理完成的数据,自己再加工加工,扔给下一个模块。
我们很容易总结出每个模块的流程:
- 从上一个模块拉取数据
- 处理数据
- 将处理完成的数据发给下一个模块
其中流程1和3代表两个模块间的数据交互,这种数据交互往往会采用一些中间件(middleware)。比如模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显,它们的功能都是一样的:提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。但由于中间件的不同,需要使用不同的API。
为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了Spring Cloud Stream。
Spring Cloud Stream想让我们不关心如何获取数据,如何发送数据,而只专心处理自己的业务。还拿上面的例子来说,假设你现在负责的是系统里的模块3,它的功能是将模块2传来的字符串全部转成大写,然后再将这个转化后的字符串发给模块4,也即:
public String handle(String source){
return source.toUpperCase();
}
其中方法的的参数String source
就是模块2传给你的数据,方法的返回就是你要给模块4发送的数据。
如果你使用Spring Cloud Stream来开发,我们的模块其实基本完成了(配置文件回来再说),至于你以前开发时关心的要如何操作中间件API从模块2拉取数据,拉取完成后要怎么解码(反序列化,将字节流转成Java 对象),处理完成后又要如何操作中间件API将数据写给模块4,写的时候还得将你返回的对象转成字节流写出(序列化)等,这些你都不用再关心,Spring Cloud Stream帮你做了。
看到这可能很多同学会问:你上面举得这个例子不还是在说Spring Cloud Stream屏蔽了消息队列的差异吗?
Spring Cloud Stream屏蔽的是一些中间件对于数据流入和数据流出的差异,消息队列自然是属于这种中间件的,也是最常用的。除此以外Spring Cloud Stream还支持其他中间件如Amazon Kinesis,它就不是消息队列。
2. 一些概念
要理解和使用Spring Cloud Stream需要先明白Spring Cloud Stream提出的一些概念。
2.1 Binder
什么是Binder?一句话概括就是具体中间件的统一抽象。一个kafka中间件在Spring Cloud Stream里是一个Binder,一个rabbitMQ中间件也是一个Binder。官方文档中写道:当你引入spring-cloud-stream
依赖的时候,Spring Cloud Stream就会为你的那个中间件生成一个Binder实例,你就可以通过这个Binder实例来和这个消息中间件通信(收发数据)。
很容易得出结论,Spring Cloud Stream对底层中间件的差异屏蔽都是基于我们的Binder,Binder适配了不同的消息中间件(官方文档中写道:Spring Cloud Stream为kafka和rabbitMQ提供了Binder的实现了)。
2.2 Binding
Binding是个比较抽象的概念,我们这里还拿之前的例子来说:
public String handle(String source){
return source.toUpperCase();
}
这是你写的模块3中的业务代码,我们假设你与模块2交互使用的是中间件kafka和与模块4交互使用的是中间件rabbitMQ。也即你的模块的功能就变为了从kafka中获取数据,将获取的字符串数据全转为大写并写出给rabbitMQ。
很明显,这里有两个Binder,一个kafka Binder一个rabbitMQ Binder。而你这个业务处理函数其实也有两个功能:接收中间件的输入和将返回数据输出。再结合Binder,我们可以理解为:
- 函数接收kafka Binder中的输入
- 函数将返回结果写出给rabbitMQ binder。
但是如何表示这种关系呢?也即你现在写了一个函数,怎么表示这个函数的参数是从kafka入的,函数的返回是向rabbitMQ输出的呢?这就需要Binding。
Binding其实就是一座桥,桥的一头是Binder,另一头是你的业务处理函数。Bindings将外部消息中间件与你的业务处理代码连接在了一起(官方原话是:外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由Binder创建))。
了解了这些其实也就了解Spring Cloud Stream的架构图,Spring Cloud Stream官网中有一张图讲了它的架构:
首先最底层的Middleware是中间件,我们的kafka,rabbitMQ都属于中间件。上一层的Binder已经讲了,是对中间件的一层抽象和封装。再上一层的inputs和outputs其实就是Bindings,我们与Binder的交互就是通过Binding,其中写出数据就是output,而获取数据就是input。再上层的Application Core就是我们自己的业务代码,可以看到我们的业务代码通过Binding(input、output)与Binder交互,而Binder又负责和具体中间件交互。
3. 函数式接口
Spring Cloud Stream 2.x与Spring Cloud Stream 3.x最大的不同就是2.x是基于注解的,而3.x是基于函数式编程的。
还拿上面的例子来说:对于你开发的一个模块而言,它无非三种情况:
- 从上一个模块获取数据,将这个数据转发到下一个模块
- 从上一个模块获取数据,自己处理完后不再将这个数据发给别的模块
- 不需要从别处获取数据,自己就是数据源,将自己的数据发送到下一个模块。
这三种模式其实就对应Java 8函数式编程中的三个接口:Function
、Consumer
、Supplier
(不了解这三个接口的可自行搜索相关资料,关键字:Java 8;函数式接口)。
现在我们还来模拟之前的系统,首先模块1是系统的入口模块,不需要其他模块提供数据源,换言之它是个生产者,那么模块1就可以使用接口Supplier
(只有返回没有入参)。我们假设模块1的功能是生成字符串,那模块1的代码可以写为:
public Supplier<String> produceStr(){
return () -> "hello spring cloud stream";
}
模块2会消费模块1的字符串,并将它全部转为大写,然后再将转化后的字符串写出。很明显模块2既是生产者也是消费者,那模块2就可以使用接口Function
(既有返回也有入参)。模块2的代码为:
public Function<String,String> upperCase(){
return String::toUpperCase;
}
模块3会消费模块2的字符串,并将它直接打印到控制台,且模块3不再将字符串写出,很明显模块3只是一个消费者,那模块3就可以使用接口Consumer
(只有入参没有返回)。模块3的代码为:
public Consumer<String> log(){
return System.out::println;
}
其中上例中的String::toUpperCase
和System.out::println
为Java 8的方法引用(不了解的可自行搜索相关资料,关键字:Java8;方法引用)。
可以看到,我们将自己的业务处理都封装成了一个函数式接口,并作为一个函数的返回。
在实际的开发中上面的那些函数都会被标上@Bean注解,注入到Spring容器,也即:
@Bean
public Supplier<String> produceStr(){
return () -> "hello spring cloud stream";
}
@Bean
public Function<String,String> upperCase(){
return String::toUpperCase;
}
@Bean
public Consumer<String> log(){
return System.out::println;
}
我们知道这代表向Spring中注入一个Bean,其中Bean的名字就是函数名,而Bean本身就是函数的返回。也即我们将自己的业务处理逻辑包装成一个对象(函数式接口)注入到了Spring IOC中。
现在假设Binder收到了一条数据,那它会寻找Binding,而Binding是一个桥梁,它会连接一个我们的处理函数,处理函数其实就是这里的Bean,Binder拿到Bean后,自然就会调用Bean的处理函数来处理(因为是函数式接口)。
如果用一张图来描述的话,大概就是这样:
这里主要讲的就是我们之前的业务处理被函数式接口包装成了对象,包装成对象后就可以注入到Spring IOC中,这样的一个Bean对象就可以对应一个Binding,通过Binding与Binder交互。
4. 案例
说了那么多,还是没讲怎么使用。我们不妨还以文档一开始的那个例子来作为编码案例:
现在的需求如下:
模块1生产字符串,并将字符串写出到kafka
模块2消费模块1的字符串,并将字符串转为大写,输出到rabbitMQ
模块3消费模块2的字符串,并将字符串打印到控制台。
也即:
为了项目的简洁,我们将上述模块1、模块2和模块3写在一个项目中。
本文使用的SpringBoot版本是2.6.3
,使用的Spring Cloud版本是2021.0.1
。这里需要注意以下,如果你使用的是尚硅谷视频里老师讲的SpringBoot2.2.2.RELEASE
和Spring Cloud Hoxton.SR1
是无法复现本例的代码的,因为官方是在3.1
版本后废弃了使用注解的方案,转而推荐使用Java函数模式的方式。尚硅谷视频中老师使用的版本Spring Cloud Stream是3.0.1
版本,相比3.1
版本后的API发生了较大变化,所以如果大家想动手实验本文中的案例,最好与笔者配置的环境一致。
在开始创建项目前,我们默认大家都是已经配置好了自己的kafka与rabbitMQ并已经正常启动了。
4.1 maven依赖
我们需要的依赖并不多,其实只需要rabbit和kafka的依赖,整个项目的maven配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.coderzoe</groupId>
<artifactId>logging-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>logging-consumer</name>
<description>logging-consumer</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2021.0.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- 主要依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>
</project>
4.2 配置文件之Binder
之前已经讲了一个kafka实例或者rabbitMQ实例其实就是一个binder,那你现在有了一个kafka,要如何告诉Spring Cloud呢?最简单的就是通过配置文件,配置文件配置Binder的思想很简单,就是告诉Spring Cloud Stream,我要创建一个Binder,这个Binder的类型是kafka或者rabbitMQ,然后它的IP,端口都是啥以及用户名密码等都是啥就好了。
我们先以kafka为例,配置kafka为Binder有两种方式:
spring:
cloud:
stream:
kafka:
binder:
# kafka的Ip和端口,可以是集群
brokers: ip:port
或者:
spring:
cloud:
stream:
binders:
# 你的binder名字,自己随意取,我取的名字叫myKafka
myKafka:
# 你的binder类型,我们这里类型是kafka
type: kafka
# 下面的环境配置与上面的一模一样
environment:
spring:
cloud:
stream:
kafka:
binder:
# kafka的Ip和端口,可以是集群
brokers: ip:port
很明显,第二种比第一种更复杂,你如果只有一个kafka实例,那直接用第一种就可以了,但如果你的项目中有多个kafka实例,比如项目2和项目1之间用的是kafka,项目2和项目3间也用的kafka,这两个kafka又是不是同一套kafka。
所以,第二种配置可以配备多个kafka实例,如:
spring:
cloud:
stream:
binders:
myKafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ip1:port1
myKafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ip2:port2
另外,如果你有多个kafka实例,但使用第一种方式下配备的属性信息会被这多个kafka实例共享,如:
spring:
cloud:
stream:
kafka:
binder:
configuration:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
binders:
myKafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ip1:port1
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
myKafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ip2:port2
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
我们现在有两个kafka实例myKafka1和myKafka2,但我们在一开头配置了security.protocol = SASL_PLAINTEXT
和 sasl.mechanism = PLAIN
,这是kafka的安全配置,这个配置信息会被myKafka1和myKafka2都具备。也即在一开始的这些配置会被每个kafka实例都具有,因此一些公共的配置可以放在一开始。
同理rabbitMQ的配置也有两种:
spring:
rabbitmq:
host: 你的rabbitMQ的IP
port: 你的rabbitMQ的端口
username: 用户名
password: 密码
spring:
cloud:
stream:
binders:
myRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 你的rabbitMQ的IP
port: 你的rabbitMQ的端口
username: 用户名
password: 密码
是的,如果你只有一个rabbitMQ实例可以使用第一种,但如果有多个,就得使用第二种,它与kafka配置的思路一模一样,这里不再赘述。
虽然我们只有一个kafka实例和一个rabitMQ实例,但笔者依然采取了第二种配置文件,一则是考虑到以后实例增多改动比较小的可能,二则是第二种配置笔者认为更清晰。项目对于Binder的配置全部信息为:
spring:
cloud:
stream:
binders:
myRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 你的rabbitMQ的IP
port: 你的rabbitMQ的端口
username: 用户名
password: 密码
myKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ip:port
关于rabbitMQ与kafka更详细的配置,如自动提交,ACK等信息可以参考Spring官网,本文不再列出。
4.3 编写自己的业务代码
配置完Binder就代表你已经具备和外部消息中间件通信的能力了,现在你可以写自己的业务代码了:
package com.coderzoe.loggingconsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* @author coderZoe
*/
@SpringBootApplication
public class LoggingConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(LoggingConsumerApplication.class, args);
}
/**
* 模块1 生产字符串
*/
@Bean
public Supplier<String> produceStr(){
return () -> "hello spring cloud stream";
}
/**
* 模块2,将生产的字符串转为大写
*/
@Bean
public Function<String,String> upperCase(){
return String::toUpperCase;
}
/**
* 模块3 将字符串打印
*/
@Bean
public Consumer<String> log(){
return System.out::println;
}
}
由于项目代码不是很多,我将Bean的注入就写到了启动类中,实际上上面这段代码就是整个项目的所有代码了。
写完这些Bean后,我们还需要将它写到配置文件,告诉Spring Cloud,这些都是用于函数处理的Bean:
spring:
cloud:
function:
definition: produceStr;upperCase;log
4.4 配置文件之Bindings
现在我们有了Binder,也有了处理业务函数,肯定还差一个Binding,将Binder与业务处理联系起来。联系起来的方法很简单,就是通过配置文件来配置。
在讲配置文件前,我们先讲Binding的名称规范。Binding的命名是:<functionName>-in/out-<index>
。
比如:
@Bean
public Supplier<String> produceStr(){
return () -> "hello spring cloud stream";
}
它的Binding名字就是produceStr-out-0
。其中produceStr
是函数名(也是Bean名),out代表这个Binding是向外写出的,而index是输入或输出绑定的索引。对于典型的单个输入/输出函数,它始终为 0,因此它仅与具有多个输入和输出参数的函数相关(一个函数被多次作为输出/输出,比如这个函数被kafka和rabbitMQ都作为输出,那就是一个index0一个index1)。
再比如:
@Bean
public Function<String,String> upperCase(){
return String::toUpperCase;
}
它对应两个Binding,因为它既是输入又是输出(从kafka入数据,向rabbitMQ出数据),它们的名字是:upperCase-in-0
、upperCase-out-0
可以看到,我们通过名字就将Binding和处理函数做了关联。关联了Binding与处理函数,还需要关联Binding与Binder,它的配置写法如下:
spring:
application:
cloud:
stream:
bindings:
produceStr-out-0:
binder: myKafka
通过在你的Binding中指明使用的是哪个binder就可以了。
这样我们配置好了Binding,项目的Bindings完整配置如下:
spring:
cloud:
stream:
bindings:
produceStr-out-0:
binder: myKafka
destination: topic1
upperCase-in-0:
binder: myKafka
group: group1
destination: topic1
upperCase-out-0:
destination: topic2
binder: myRabbit
log-in-0:
binder: myRabbit
group: group1
destination: topic2
这里在配置Binding的时候比上面多了group和destination两个属性,其中group是消费组的意思,而destination是主题(topic)。如果你不了解这两个概念,我建议你查阅一下kafka的相关资料。
4.5 主动发送消息
这样其实我们就完成了整个项目,启动项目你会发现:控制台会不断的打印HELLO SPRING CLOUD STREAM
。
但这个消息我们是被动发送的,因为Binder调用我们的produceStr-out-0
这个Binding来不断的发送消息。很多时候我们是希望主动的发送消息的,比如处理完一条用户请求后,将处理结果发送出去。Spring Cloud Stream主动发送消息借助于StreamBridge
,它的用法如下:
@Service
public class SendService {
private StreamBridge streamBridge;
public void send(String message){
streamBridge.send("upperCase-in-0",message);
}
@Autowired
public void setStreamBridge(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
}
可以看到,就是使用StreamBridge
给一个in的Binding发送消息。
4.6 代码地址
本文代码已经上传至github,大家可以自行参阅:https://github.com/coderZoe/spring-cloud-stream.git
5. 一些补充
5.1 Message
我们刚才的文档一直在以字符串作为消息传递的数据,实际上消息传递的准确对象是org.springframework.messaging.Message
,这是一个接口:
public interface Message<T> {
/**
* Return the message payload.
*/
T getPayload();
/**
* Return message headers for the message (never {@code null} but may be empty).
*/
MessageHeaders getHeaders();
}
也即生产者和消费者交互的对象其实是Message,我们之前写的String只是在生产时被Spring Cloud Stream封装为了Message,而在消费时又从Message转为了String,因此我们其实完全可以这样写:
@Bean
public Supplier<Message<String>> produceStr(){
return () -> MessageBuilder.withPayload("hello spring cloud stream").build();
}
但大部分场景下没有必要,还是那句话,因为Spring Cloud Stream会为我们自动“装箱”和“拆箱”。
另外,消息是支持发送Java对象的,比如:
public static class User{
String name;
int age;
//省略 getter setter和toString
}
@Bean
public Supplier<User> produceUser(){
return () -> new User("tom",18);
}
消费者可以写为:
@Bean
public Consumer<User> logUser(){
return s-> System.out.println(s.name+"_"+s.age);
}
很明显,对象想要被发送需要被序列化,且想要被消费也需要被反序列化,在Spring Cloud Stream中默认的序列化是json。也即对象会被以application/json的形式发送出去。
这可以在配置文件中进行修改:
produceUser-out-0:
binder: myKafka
destination: topic1
content-type: application/json
是不是发现和Spring MVC有点眼熟,是的,其实就是Spring MVC那一套。
5.2 消费组
我们在《4.4 配置文件之Bindings》中为消费的Binding配置了一个group,Spring Cloud Stream建议大家为每个消费者都显示声明一个消费组,因为这样可以保证“断点续传”的功能。比如你消费者挂了,如果指明了消费组,重启后可以从之前挂掉的地方继续消费,但如果没有指明消费组,Spring Cloud Stream会分配一个匿名的消费组,但每次启动这个名字可能都会变,这样可能会导致重启后重复消费。
5.3 其他
关于更多信息,如分区(partition)或者路由,以及kafka与rabbitMQ的一些定制化配置大家可以参考Spring Cloud官网。
另外,Spring Cloud官网给出了Spring Cloud Stream的一些实践案例:spring-cloud/spring-cloud-stream-samples: Samples for Spring Cloud Stream (github.com)
参考文档
SpringCloud-Stream3.x版本使用教程及如何整合rabbitmq_ShuSheng007的博客-CSDN博客_springcloud整合rabbitmq
6 条评论
好的,感谢您的打赏,能帮到您就好
大佬微信号多少?
启动工程后没有主动发消息
项目源码如下:https://github.com/coderZoe/spring-cloud-stream.git
这个项目应该是没有问题的,你可以参考下