
本文旨在探讨响应式编程(Reactive Programming)的核心概念,并基于 Spring Boot WebFlux 框架进行一个完整的工程项目实践。
Signal信号 VS Element元素
在响应式流协议中,Signal(信号) 是指定义在 Publisher(发布者)与 Subscriber(订阅者)之间用于异步通信的一组事件(Event)。而 Element(元素) 则是指响应式流中传输的具体业务数据项(Data Item)。
然而,根据协议规定,元素本身并不能独立存在或传输。它总是作为 onNext 信号的负载(Payload)出现,并通过该信号被推送给订阅者。因此,响应式流的本质是一个异步的信号序列,正是这些信号驱动着数据的传输和状态的变迁。
响应式流协议中的信号主要分为两类:由 Publisher 发出的信号和由 Subscriber 发出的信号。以下是几种关键的信号:
-
onSubscribe 订阅建立信号
- 方向: Publisher → Subscriber
- 载荷: Subscription 实例
- 说明:
- 当 Publisher 和 Subscriber 之间建立订阅关系时,Publisher 向 Subscriber 发送的握手信号,携带 Subscription 实例作为负载。后续 Subscriber 通过此实例向 Publisher 发送
request/cancel 信号。
- 对每个 Subscriber 而言,
onSubscribe 只允许出现一次,且必须是第一个信号。
-
onNext 数据元素信号
- 方向: Publisher → Subscriber
- 载荷: 一个类型为
T 的数据元素
- 说明:
- 此信号用于 Publisher 向 Subscriber 异步推送一个数据元素。因此,所谓的“发射元素”,本质就是发射一个携带该元素的
onNext 信号。
- 该信号可以出现 0 次或多次。
-
onComplete 正常完成信号
- 方向: Publisher → Subscriber
- 载荷: 无
- 说明:
- 表明 Publisher 已完成所有数据的发布,且过程中未发生错误。
- 此信号标志着流以成功状态终止。
- Publisher 发送
onComplete 后,不能再发送任何其他信号。
- 该信号最多出现 1 次。
onError 信号与 onComplete 信号互斥,最多只能出现其中一个。
-
onError 异常终止信号
- 方向: Publisher → Subscriber
- 载荷: Throwable 错误对象
- 说明:
- 表明流在处理过程中发生错误,它将错误作为流中的一个事件传播,而非传统的异常抛出机制。
- 该信号携带 Throwable 错误对象作为负载,以供下游进行错误处理或日志记录。
- 此信号标志着流以失败状态终止。
- Publisher 发送
onError 后,不能再发送任何其他信号。
- 该信号最多出现 1 次。
onError 信号与 onComplete 信号互斥,最多只能出现其中一个。
-
request 背压控制信号
- 方向: Subscriber → Publisher
- 载荷: 请求数据的数量(long 类型)
- 说明:
- Subscriber 通过 Subscription 实例向 Publisher 发送的信号,用于通知 Publisher 其最多可以处理的
onNext 信号数量。
- 用于实现背压(Backpressure),将数据流的控制权从 Publisher 转移至 Subscriber,使 Subscriber 能根据自身处理能力调节上游的推送速度和数量,避免下游被压垮。
-
cancel 取消信号
- 方向: Subscriber → Publisher
- 载荷: 无
- 说明:
- Subscriber 通过 Subscription 实例向 Publisher 发送的信号,用于通知 Publisher 其对后续任何信号都不再感兴趣,要求提前终止订阅关系。这使得 Subscriber 可以在上游发送
onComplete/onError 前主动结束流。
- Publisher 收到此信号后,应尽快停止发送任何信号并清理资源。
- 该信号可以出现多次,但需保证其操作的幂等性。
需要强调的是,Publisher 和 Subscriber 这两个角色并不仅限于响应式链的两端。位于中间的操作符既是发布者也是订阅者——它订阅其上游,并作为新的发布者供其下游订阅。以下面代码为例:
Flux.range():位于响应式链最顶端,是下游 delayElements() 的 Publisher。
delayElements():位于响应式链中间。对其上游 Flux.range() 而言,它是 Subscriber;对其下游 map() 而言,它是 Publisher。
map():位于响应式链中间。对其上游 delayElements() 而言,它是 Subscriber;对其下游 subscribe() 而言,它是 Publisher。
subscribe():位于响应式链最底端,是其上游 map() 的 Subscriber。
Flux.range(10,3)
.delayElements(Duration.ofSeconds(5))
.map(num->num*2)
.subscribe();
下面的示例代码详细展示了响应式流中各种信号的实际交互过程:
import java.time.Duration;
import reactor.core.publisher.Flux;
public class SignalDemo{
public static void main(String[] args) throws Exception {
title("正常完成的流");
normalFlux();
title("发生错误的流");
errorFlux();
title("被取消的流");
cancelFlux();
}
/**
* 正常完成的流
* @throws Exception
*/
public static void normalFlux() throws Exception {
Flux.range(10,3)
// log()操作符:拦截、记录流经其所在位置的的所有信号
.log("Normal.Flux #1")
.delayElements(Duration.ofSeconds(5))
.log("Normal.Flux #2")
.subscribe();
Thread.sleep(1000*20);
}
/**
* 发生错误的流
* @throws Exception
*/
public static void errorFlux() throws Exception {
Flux.interval(Duration.ofSeconds(5))
.log("Error.Flux #1")
.map( num->{
if( num==1) {
throw new IllegalArgumentException("Error Num");
}
return num;
} )
.log("Error.Flux #2")
.subscribe(v->{}, e->{});
Thread.sleep(1000*20);
}
/**
* 被取消的流
* @throws InterruptedException
*/
public static void cancelFlux() throws InterruptedException {
Flux.interval(Duration.ofSeconds(5))
.log("Cancel.Flux #1")
.take(3)
.log("Cancel.Flux #2")
.subscribe();
Thread.sleep(1000*20);
}
public static void title(String title){
String separator = "=".repeat(30);
System.out.println("\n\n"+separator+" "+title+" "+separator);
}
}
程序输出如下,清晰地展示了三种不同场景下的信号流:
============================== 正常完成的流 ==============================
17:40:33.637 [main] INFO Normal.Flux #1 -- | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
17:40:33.639 [main] INFO Normal.Flux #2 -- onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
17:40:33.641 [main] INFO Normal.Flux #2 -- request(unbounded)
17:40:33.641 [main] INFO Normal.Flux #1 -- | request(1)
17:40:33.641 [main] INFO Normal.Flux #1 -- | onNext(10)
17:40:38.659 [parallel-1] INFO Normal.Flux #2 -- onNext(10)
17:40:38.660 [parallel-1] INFO Normal.Flux #1 -- | request(1)
17:40:38.660 [parallel-1] INFO Normal.Flux #1 -- | onNext(11)
17:40:43.665 [parallel-2] INFO Normal.Flux #2 -- onNext(11)
17:40:43.666 [parallel-2] INFO Normal.Flux #1 -- | request(1)
17:40:43.666 [parallel-2] INFO Normal.Flux #1 -- | onNext(12)
17:40:43.671 [parallel-2] INFO Normal.Flux #1 -- | onComplete()
17:40:48.667 [parallel-3] INFO Normal.Flux #2 -- onNext(12)
17:40:48.667 [parallel-3] INFO Normal.Flux #2 -- onComplete()
============================== 发生错误的流 ==============================
17:40:53.661 [main] INFO Error.Flux #1 -- onSubscribe(FluxInterval.IntervalRunnable)
17:40:53.662 [main] INFO Error.Flux #2 -- onSubscribe(FluxMap.MapSubscriber)
17:40:53.662 [main] INFO Error.Flux #2 -- request(unbounded)
17:40:53.662 [main] INFO Error.Flux #1 -- request(unbounded)
17:40:58.667 [parallel-4] INFO Error.Flux #1 -- onNext(0)
17:40:58.669 [parallel-4] INFO Error.Flux #2 -- onNext(0)
17:41:03.668 [parallel-4] INFO Error.Flux #1 -- onNext(1)
17:41:03.679 [parallel-4] INFO Error.Flux #1 -- cancel()
17:41:03.679 [parallel-4] ERROR Error.Flux #2 -- onError(java.lang.IllegalArgumentException: Error Num)
17:41:03.680 [parallel-4] ERROR Error.Flux #2 --
java.lang.IllegalArgumentException: Error Num
...
============================== 被取消的流 ==============================
17:41:13.673 [main] INFO Cancel.Flux #1 -- onSubscribe(FluxInterval.IntervalRunnable)
17:41:13.673 [main] INFO Cancel.Flux #2 -- onSubscribe(FluxLimitRequest.FluxLimitRequestSubscriber)
17:41:13.673 [main] INFO Cancel.Flux #2 -- request(unbounded)
17:41:13.673 [main] INFO Cancel.Flux #1 -- request(3)
17:41:18.679 [parallel-5] INFO Cancel.Flux #1 -- onNext(0)
17:41:18.680 [parallel-5] INFO Cancel.Flux #2 -- onNext(0)
17:41:23.679 [parallel-5] INFO Cancel.Flux #1 -- onNext(1)
17:41:23.681 [parallel-5] INFO Cancel.Flux #2 -- onNext(1)
17:41:28.678 [parallel-5] INFO Cancel.Flux #1 -- onNext(2)
17:41:28.680 [parallel-5] INFO Cancel.Flux #2 -- onNext(2)
17:41:28.680 [parallel-5] INFO Cancel.Flux #1 -- cancel()
17:41:28.680 [parallel-5] INFO Cancel.Flux #2 -- onComplete()
工程实践
脚手架
项目基于 Spring Boot WebFlux 搭建。POM 依赖配置的关键点在于:引入 Spring Boot WebFlux 依赖,并且数据库连接不能使用传统的 JDBC,而应选择其响应式版本 R2DBC。
<dependencyManagement>
<dependencies>
<!--Spring Boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>3.2.5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!--Spring Boot WebFlux-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!--Spring Data R2DBC-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<!--MySQL R2DBC-->
<dependency>
<groupId>io.asyncer</groupId>
<artifactId>r2dbc-mysql</artifactId>
<scope>runtime</scope>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--Tool-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
</dependency>
<!-- Unit Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<!--FastJson-->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.58</version>
</dependency>
</dependencies>
项目配置文件如下:
server:
port: 6969
spring:
application:
name: reactive
r2dbc:
url: r2dbc:mysql://localhost:3306/db1
username: root
password: 52996
pool:
# 启用连接池
enabled: true
定义统一的 API 响应包装类。首先是用于返回单个对象的 ApiSingleResult:
package com.aaron.reactive.common;
import lombok.AllArgsConstructor;
import lombok.Data;
@AllArgsConstructor
@Data
public class ApiSingleResult<T> {
private int code;
private boolean success;
private String message;
private T data;
public static <T> ApiSingleResult<T> success(T data){
return new ApiSingleResult<>(0, true, null, data);
}
public static <T> ApiSingleResult<T> error(){
return new ApiSingleResult<>(-1, false, null, null);
}
public static <T> ApiSingleResult<T> error(int code){
return new ApiSingleResult<>(code, false, null, null);
}
public static <T> ApiSingleResult<T> error(String message){
return new ApiSingleResult<>(-1, false, message, null);
}
public static <T> ApiSingleResult<T> error(int code, String message){
return new ApiSingleResult<>(code, false, message, null);
}
}
用于返回列表数据的 ApiMultiResult:
package com.aaron.reactive.common;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
@AllArgsConstructor
@Data
public class ApiMultiResult<T> {
private int code;
private boolean success;
private String message;
private Long total;
private List<T> data;
public static <T> ApiMultiResult<T> success(List<T> data){
return new ApiMultiResult<>(0, true, null, null, data);
}
public static <T> ApiMultiResult<T> success(Long total, List<T> data){
return new ApiMultiResult<>(0, true, null, total, data);
}
public static <T> ApiMultiResult<T> error(int code){
return new ApiMultiResult<>(code, false, null, null, null);
}
public static <T> ApiMultiResult<T> error(String message){
return new ApiMultiResult<>(-1, false, message, null, null);
}
public static <T> ApiMultiResult<T> error(int code, String message){
return new ApiMultiResult<>(code, false, message, null, null);
}
}
用于服务端推送事件流(Server-Sent Events)的 ApiStreamResult:
package com.aaron.reactive.common;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
@Data
@AllArgsConstructor
public class ApiStreamResult<T> {
private EventType eventType;
private T data;
private int errorCode;
private String errorMessage;
public static <T> ApiStreamResult<T> start(){
return new ApiStreamResult<>(EventType.STREAM_START, null, 0, null);
}
public static <T> ApiStreamResult<T> data(T data){
return new ApiStreamResult<>(EventType.DATA, data, 0, null);
}
public static <T> ApiStreamResult<T> end(){
return new ApiStreamResult<>(EventType.STREAM_END, null, 0, null);
}
public static <T> ApiStreamResult<T> error(int errorCode){
return new ApiStreamResult<>(EventType.STREAM_ERROR, null, errorCode, null);
}
public static <T> ApiStreamResult<T> error(String errorMessage){
return new ApiStreamResult<>(EventType.STREAM_ERROR, null, -1, errorMessage);
}
public static <T> ApiStreamResult<T> error(int errorCode, String errorMessage){
return new ApiStreamResult<>(EventType.STREAM_ERROR, null, errorCode, errorMessage);
}
@AllArgsConstructor
@Getter
public static enum EventType {
STREAM_START("流开始事件"),
DATA("数据事件"),
STREAM_END("流结束事件"),
STREAM_ERROR("流错误事件"),
;
private String desc;
}
}
项目启动类定义如下:
package com.aaron.reactive;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ReactiveApplication{
public static void main(String[] args) {
SpringApplication.run(ReactiveApplication.class ,args);
}
}
CRUD
接下来实现一个完整的用户增删改查功能。首先创建数据库表:
CREATE TABLE IF NOT EXISTS `user_info` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
gmt_create datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
gmt_modified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`name` varchar(64) DEFAULT NULL COMMENT '姓名',
sex varchar(32) DEFAULT NULL COMMENT '性别',
age int DEFAULT NULL COMMENT '年龄',
country varchar(64) DEFAULT NULL COMMENT '国家',
is_vip boolean DEFAULT NULL COMMENT '是否为会员',
remark text DEFAULT NULL COMMENT '备注',
`version` bigint unsigned NOT NULL COMMENT '版本',
is_deleted tinyint unsigned NOT NULL DEFAULT '0' COMMENT '逻辑删除(0正常,1删除)',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='用户';
定义用户实体类(Entity)。其中,使用 @Version 注解的字段有两个作用:
- 用于
save 等方法判定是插入还是更新:若为 NULL 则新增;若不为 NULL 则更新。新增时,若主键字段为 NULL,则 INSERT 语句不包含主键(如用自增ID);若非 NULL 则包含(如用自定义 UUID、雪花算法)。
- 用于实现乐观锁:新增时,该字段值会被设置为 0;更新时,生成的 SQL 会在 WHERE 条件中进行版本校验,并在 SET 语句中使 version 值加 1。
package com.aaron.reactive.entity;
import java.time.LocalDateTime;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.ReadOnlyProperty;
import org.springframework.data.annotation.Version;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;
// 表名映射
@Table(name = "user_info")
@Data
public class User{
// 标识主键
@Id
private Long id;
// 列名映射
@Column("gmt_create")
// 只读字段, 插入、修改时数据库会自动设置
@ReadOnlyProperty
private LocalDateTime createTime;
// 列名映射
@Column("gmt_modified")
// 只读字段, 插入、修改时数据库会自动设置
@ReadOnlyProperty
private LocalDateTime updateTime;
private String name;
private String sex;
private Integer age;
private String country;
private Boolean isVip;
private String remark;
// 版本字段, 用作乐观锁
@Version
private Long version;
/**
* 0代表正常,1代表删除
*/
// 列名映射
@Column("is_deleted")
private Byte isDel = 0;
}
定义用户数据访问层 Repository 接口:
package com.aaron.reactive.repository;
import com.aaron.reactive.entity.User;
import org.springframework.data.domain.Pageable;
import org.springframework.data.r2dbc.repository.Modifying;
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
/**
* 根据ID查询
* @param id
* @return
*/
@Query("SELECT * FROM user_info WHERE is_deleted=0 and id=:id")
Mono<User> fetchById(@Param("id") Long id);
/**
* 查询全部
* @return
*/
@Query("SELECT * FROM user_info WHERE is_deleted=0")
Flux<User> findAll();
/**
* 分页查询:统计总数
* @return
*/
@Query("SELECT count(1) FROM user_info WHERE is_deleted=0" )
Mono<Long> count4PageQuery();
/**
* 分页查询:获取list
* @param pageable
* @return
*/
@Query("SELECT * FROM user_info WHERE is_deleted=0 ORDER BY id ASC LIMIT :#{#pageable.pageSize} OFFSET :#{#pageable.offset}")
Flux<User> pageQuery(@Param("pageable") Pageable pageable);
/**
* 更新备注
* @param id
* @param remark
* @return
*/
@Modifying
@Query("UPDATE user_info SET remark=:remark, version=version+1 WHERE is_deleted=0 and id=:id and version=:version")
Mono<Integer> updateRemark(@Param("id") Long id, @Param("remark") String remark, @Param("version") Long version);
/**
* 逻辑删除
* @param id
* @return
*/
@Modifying
@Query("UPDATE user_info SET is_deleted=1, version=version+1 WHERE id=:id and version=:version")
Mono<Integer> delete(@Param("id") Long id, @Param("version") Long version);
}
创建用户所需的 DTO(Data Transfer Object):
package com.aaron.reactive.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class CreateUserDto{
private String name;
private String sex;
private Integer age;
private String country;
private Boolean isVip;
private String remark;
}
用户服务层 Service 实现:
package com.aaron.reactive.service;
import java.util.List;
import com.alibaba.fastjson2.JSON;
import com.aaron.reactive.common.ApiMultiResult;
import com.aaron.reactive.dto.CreateUserDto;
import com.aaron.reactive.entity.User;
import com.aaron.reactive.repository.UserRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
@Service
public class UserService{
@Autowired
private UserRepository userRepository;
public Mono<User> createUser(CreateUserDto createUserDto){
log.info("UserService createUser param: {}", JSON.toJSONString(createUserDto));
User user = new User();
BeanUtils.copyProperties(createUserDto, user);
Mono<User> userMono = userRepository.save(user)
.doOnSuccess(savedUser -> log.info("UserService createUser savedUser: {}", JSON.toJSONString(savedUser)) )
.doOnError( ex -> log.error("UserService createUser happen ex: ", ex) );
return userMono;
}
public Mono<User> fetchById(Long id){
log.info("UserService fetchById param: {}", id);
Mono<User> userMono = userRepository.fetchById(id)
.doOnSuccess(user -> log.info("UserService fetchById user: {}", JSON.toJSONString(user)) )
.doOnError( ex -> log.error("UserService fetchById happen ex: ", ex) );
return userMono;
}
public Mono<ApiMultiResult<User>> pageQuery(Integer pageNo, Integer pageSize) {
log.info("UserService pageQuery param: {} | {}", pageNo, pageSize);
// 查询total
Mono<Long> totalMono = userRepository.count4PageQuery()
.doOnSuccess( total -> log.info("UserService pageQuery total: {}", total))
.doOnError( ex -> log.error("UserService pageQuery total happen ex: ", ex));
// 查询list
Pageable pageable = PageRequest.of(pageNo-1, pageSize);
Mono<List<User>> userFlux = userRepository.pageQuery(pageable)
.collectList()
.doOnSuccess(users -> log.info("UserService pageQuery users: {}", JSON.toJSONString(users)))
.doOnError( ex -> log.error("UserService pageQuery users happen ex: ", ex));
Mono<ApiMultiResult<User>> apiMultiResultMono = Mono.zip(totalMono, userFlux)
.map(tuple -> {
Long total = tuple.getT1();
List<User> userList = tuple.getT2();
return ApiMultiResult.success(total, userList);
}).doOnSuccess( userApiMultiResult -> log.info("UserService pageQuery res: {}", JSON.toJSONString(userApiMultiResult)))
.doOnError(ex -> log.error("UserService happen ex: ", ex));
return apiMultiResultMono;
}
public Mono<User> updateRemark(Long id, String remark){
log.info("UserService updateRemark param: {} | {}", id, remark);
Mono<User> userMono = userRepository.fetchById(id)
.doOnNext( user -> log.info("UserService updateRemark find user: {}", JSON.toJSONString(user)) )
.switchIfEmpty( Mono.error(new RuntimeException("User No Found")) )
.flatMap( user -> userRepository.updateRemark(id, remark, user.getVersion()) )
.flatMap( count-> {
log.info("UserService updateRemark count : {}", count);
if( count>0 ) {
return userRepository.fetchById(id);
} else {
return Mono.error( new RuntimeException("Update Fail, Please Retry") );
}
}).doOnSuccess( updateUser -> log.info("UserService updateRemark updated User: {}", JSON.toJSONString(updateUser)))
.doOnError( ex -> log.error("UserService updateRemark happen ex: ", ex) );
return userMono;
}
public Mono<Integer> deleteUser(Long id){
log.info("UserService deleteUser param: {}", id);
Mono<Integer> resMono = userRepository.fetchById(id)
.doOnNext( user -> log.info("UserService deleteUser find user: {}", JSON.toJSONString(user)) )
.switchIfEmpty(Mono.error(new RuntimeException("User No Found")))
.flatMap(user -> userRepository.delete(id, user.getVersion()))
.doOnSuccess( count -> log.info("UserService deleteUser res: {}", count))
.doOnError( ex -> log.error("UserService deleteUser happen ex: ", ex) );
return resMono;
}
public Flux<User> findAll(){
Flux<User> userFlux = userRepository.findAll()
.doOnNext(user -> log.info("UserService findAllByStream user: {}", JSON.toJSONString(user)))
.doOnError(ex -> log.error("UserService findAllByStream happen ex: ", ex));
return userFlux;
}
}
用户控制器层 Controller 实现:
package com.aaron.reactive.controller;
import java.time.Duration;
import java.time.LocalTime;
import com.alibaba.fastjson2.JSON;
import com.aaron.reactive.common.ApiMultiResult;
import com.aaron.reactive.common.ApiSingleResult;
import com.aaron.reactive.common.ApiStreamResult;
import com.aaron.reactive.dto.CreateUserDto;
import com.aaron.reactive.entity.User;
import com.aaron.reactive.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
@RestController
@RequestMapping("/user")
public class UserController{
@Autowired
private UserService userService;
/**
* 创建用户
* @param createUserDto
* @return
*/
@PostMapping("/create")
public Mono<ApiSingleResult<User>> createUser(@RequestBody CreateUserDto createUserDto) {
log.info("UserController createUser param : {}", JSON.toJSONString(createUserDto));
Mono<User> userMono = userService.createUser(createUserDto);
Mono<ApiSingleResult<User>> resultMono = userMono.map(user -> ApiSingleResult.success(user))
.doOnSuccess( res-> log.info("UserController createUser res: {}", JSON.toJSONString(res)) )
.onErrorResume(ex -> {
log.error("UserController createUser happen ex: ", ex);
return Mono.just(ApiSingleResult.error(ex.getMessage()));
});
return resultMono;
}
/**
* 根据ID查询用户
* @param id
* @return
*/
@GetMapping("/fetchById")
public Mono<ApiSingleResult<User>> fetchById(@RequestParam Long id) {
log.info("UserController fetchById param : {}", id);
if( id==null ) {
return Mono.just( ApiSingleResult.error("Illegal Argument") );
}
Mono<User> userMono = userService.fetchById(id);
Mono<ApiSingleResult<User>> resultMono = userMono.map(user -> ApiSingleResult.success(user))
.doOnSuccess(res -> log.info("UserController fetchById res: {}", JSON.toJSONString(res)))
.onErrorResume(ex -> {
log.error("UserController fetchById happen ex: ", ex);
return Mono.just(ApiSingleResult.error(ex.getMessage()));
});
return resultMono;
}
/**
* 分页查询
* @param pageNo
* @param pageSize
* @return
*/
@GetMapping("/pageQuery")
public Mono<ApiMultiResult<User>> pageQuery(@RequestParam Integer pageNo, @RequestParam Integer pageSize) {
log.info("UserController pageQuery param : {} | {}", pageNo, pageSize);
pageNo = pageNo < 1 ? 1 : pageNo;
pageSize = pageSize<1 ? 50 : pageSize;
Mono<ApiMultiResult<User>> apiMultiResultMono = userService.pageQuery(pageNo, pageSize)
.doOnSuccess(res -> log.info("UserController pageQuery res: {}", JSON.toJSONString(res)))
.onErrorResume(ex -> {
log.error("UserController pageQuery happen ex: ", ex);
return Mono.just(ApiMultiResult.error(ex.getMessage()));
});
return apiMultiResultMono;
}
/**
* 更新备注
* @param id
* @param remark
* @return
*/
@GetMapping("/updateRemark")
public Mono<ApiSingleResult<User>> updateRemark(@RequestParam Long id, @RequestParam String remark) {
log.info("UserController updateRemark param : {} | {}", id, remark);
if( id==null ) {
return Mono.just( ApiSingleResult.error("Illegal Argument") );
}
Mono<User> userMono = userService.updateRemark(id, remark);
Mono<ApiSingleResult<User>> resultMono = userMono.map(user -> ApiSingleResult.success(user))
.doOnSuccess(res -> log.info("UserController updateRemark res: {}", JSON.toJSONString(res)))
.onErrorResume(ex -> {
log.error("UserController updateRemark happen ex : ", ex);
return Mono.just(ApiSingleResult.error(ex.getMessage()));
});
return resultMono;
}
/**
* 删除用户
* @param id
* @return
*/
@GetMapping("/delete")
public Mono<ApiSingleResult<Integer>> deleteUser(@RequestParam Long id) {
log.info("UserController deleteUser param : {}", id);
if( id==null ) {
return Mono.just( ApiSingleResult.error("Illegal Argument") );
}
Mono<Integer> intMono = userService.deleteUser(id);
Mono<ApiSingleResult<Integer>> resultMono = intMono.map(count -> ApiSingleResult.success(count))
.doOnSuccess(res -> log.info("UserController deleteUser res : {}", JSON.toJSONString(res)))
.onErrorResume(ex -> {
log.error("UserController deleteUser happen ex : ", ex);
return Mono.just(ApiSingleResult.error(ex.getMessage()));
});
return resultMono;
}
/**
* 流式获取全部用户
* @return
*/
@GetMapping(value = "/findAllByStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ApiStreamResult<User>> findAllByStream() {
Flux<User> userFlux = userService.findAll();
// 将 每个User 包装为 data数据事件
Flux<ApiStreamResult<User>> dataEvents = userFlux.map(user -> ApiStreamResult.data(user))
.delayElements( Duration.ofSeconds(2) );
// 创建 start开始事件、end结束事件
Flux<ApiStreamResult<User>> startEvent = Flux.just(ApiStreamResult.start());
Flux<ApiStreamResult<User>> endEvent = Flux.just(ApiStreamResult.end());
Flux<ApiStreamResult<User>> resultFlux = Flux.concat(startEvent, dataEvents, endEvent)
.doOnNext(userApiStreamResult -> log.info("UserController findAllByStream send event: [{}] -->> {}", LocalTime.now(), JSON.toJSONString(userApiStreamResult)))
.onErrorResume(ex -> {
log.error("UserController findAllByStream happen ex: ", ex);
// 发送 error错误事件
return Flux.just( ApiStreamResult.error(ex.getMessage()) );
});
return resultFlux;
}
}
UT
针对用户功能编写单元测试。这里使用 Spring Boot 的 WebTestClient 进行 WebFlux 应用的集成测试。
package com.aaron.reactive.controller;
import com.alibaba.fastjson2.JSON;
import com.aaron.reactive.ReactiveApplication;
import com.aaron.reactive.common.ApiMultiResult;
import com.aaron.reactive.common.ApiSingleResult;
import com.aaron.reactive.common.ApiStreamResult;
import com.aaron.reactive.dto.CreateUserDto;
import com.aaron.reactive.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.EntityExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
// 使用随机端口启动Web服务: classes 指定 主程序启动类, webEnvironment 指定使用随机端口启动
@SpringBootTest(
classes = {ReactiveApplication.class},
webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT
)
@Slf4j
public class UserControllerTest{
private final static String URL_PREFIX = "/user";
@Autowired
private WebTestClient webTestClient;
@Test
public void testCreateUser(){
CreateUserDto createUserDto = CreateUserDto.builder()
.name("王二麻子")
.sex("男")
.age(69)
.country("梵蒂冈")
.isVip(true)
.build();
EntityExchangeResult<ApiSingleResult<User>> entityExchangeResult = webTestClient
// 使用POST方法
.post()
// 设置url
.uri(URL_PREFIX + "/create")
// 设置内容类型
.contentType(MediaType.APPLICATION_JSON)
// 设置请求体Body
.bodyValue(createUserDto)
// 发起请求
.exchange()
// 断言: 期望 HTTP Status Code 200
.expectStatus().isOk()
// 指定反序列化的类型信息
.expectBody(new ParameterizedTypeReference<ApiSingleResult<User>>() {})
// 阻塞获取响应结果
.returnResult();
ApiSingleResult<User> apiSingleResult = entityExchangeResult.getResponseBody();
log.info("UserControllerTest testCreateUser apiSingleResult: {}", JSON.toJSONString(apiSingleResult));
}
@Test
public void testPageQuery(){
EntityExchangeResult<ApiMultiResult<User>> entityExchangeResult = webTestClient
// 使用GET方法
.get()
// 设置url、参数
.uri(uriBuilder -> uriBuilder.path(URL_PREFIX + "/pageQuery")
.queryParam("pageNo", 1)
.queryParam("pageSize", 3)
.build()
).exchange()
.expectStatus().isOk()
// 指定反序列化的类型信息
.expectBody(new ParameterizedTypeReference<ApiMultiResult<User>>() {})
// 阻塞获取响应结果
.returnResult();
ApiMultiResult<User> apiMultiResult = entityExchangeResult.getResponseBody();
log.info("UserControllerTest testPageQuery apiMultiResult: {}", JSON.toJSONString(apiMultiResult));
}
@Test
public void testFindAllByStream(){
Flux<ApiStreamResult<User>> apiStreamResultFlux = webTestClient
.get()
.uri(URL_PREFIX + "/findAllByStream")
// 指定接受的内容类型: text/event-stream
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk()
// 断言响应的内容类型
.expectHeader().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
.returnResult(new ParameterizedTypeReference<ApiStreamResult<User>>() {})
.getResponseBody();
StepVerifier.create(apiStreamResultFlux)
// 消费所有元素并打印日志
.thenConsumeWhile(
event -> true,
event -> log.info("UserControllerTest testFindAllByStream event: {}", JSON.toJSONString(event))
)
// 验证流正常结束
.verifyComplete();
}
}
通过以上从理论到实践的完整流程,我们不仅剖析了响应式编程中信号与流的核心机制,还基于 Spring WebFlux 构建了一个具备 CRUD、分页、流式输出等特性的完整服务。这种非阻塞、异步的编程模型,特别适合需要处理大量并发连接的场景。希望本文的详细解析和实战代码能帮助你在 云栈社区 中更好地理解和应用响应式技术栈。