在构建响应式应用时,合理地组合多个数据流或控制数据流的发出时机至关重要。本文将深入探讨 Spring WebFlux 项目背后的响应式编程库——Reactor 中的组合操作符与延迟操作符,并通过具体代码示例帮助你理解其工作机制与应用场景。

辅助打印工具
为了方便观察数据流的行为,我们首先准备一个辅助打印工具类 ReactorLogUtil。
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* 辅助打印工具
*/
public class ReactorLogUtil {
private static Duration timeout = Duration.ofSeconds(60);
private static final String separator = "-".repeat(20);
public static <T> void log(String varName, Mono<T> mono){
Object res = null;
try {
// block(): 阻塞获取
res = mono.block(timeout);
} catch (Exception ex) {
String msg = Optional.ofNullable(ex.getMessage())
.map(String::toLowerCase)
.orElse("");
if( msg.contains("timeout") ) {
res = "<Timeout>";
} else {
res = "<Throw "+ex.toString()+">";
}
}
String msg = new StringBuilder()
.append("<Mono> ")
.append(varName)
.append(" : ")
.append( Optional.ofNullable(res).orElse("<Empty Mono>") )
.toString();
System.out.println(msg);
}
public static <T> void log(String varName, Flux<T> flux){
String res = null;
try{
Mono<List<T>> listMono = flux.collectList();
// block(): 阻塞获取
List<T> list = listMono.block(timeout);
res = list.toString();
} catch (Exception ex) {
if( ex.getMessage().toLowerCase().contains("timeout") ) {
res = "<Timeout>";
} else {
res = "<Throw "+ex.toString()+">";
}
}
String msg = new StringBuilder()
.append("<Flux> ")
.append(varName)
.append(" : ")
.append( res )
.toString();
System.out.println(msg);
}
public static void separator(String title){
System.out.println("\n"+separator+" "+title+" "+separator);
}
}
组合操作符
merge()/mergeWith()
将多个 Mono 或 Flux 中的元素合并到一个 Flux 中。元素不保证顺序,遵循“先到先得”的原则。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("merge()/mergeWith()");
Mono<String> m1 = Mono.just("Tony")
.delayElement(Duration.ofMillis(400));
Mono<String> m2 = Mono.just("69")
.delayElement(Duration.ofMillis(100));
Flux<String> f1 = Flux.just("A","B","C")
.delayElements(Duration.ofMillis(120));
Flux<String> f2 = Flux.just("1", "2", "3")
.delayElements(Duration.ofMillis(100));
Flux<String> fm12 = m1.mergeWith(m2);
ReactorLogUtil.log("fm12", fm12);
Flux<?> fm1 = Flux.merge(f1, f2);
Flux<?> fm2 = f1.mergeWith(f2);
ReactorLogUtil.log("f12", fm1);
ReactorLogUtil.log("f12", fm2);
}
输出如下,可以看到延迟更小的元素先被发出。
-------------------- merge()/mergeWith() --------------------
<Flux> fm12 : [69, Tony]
<Flux> f12 : [1, A, 2, B, 3, C]
<Flux> f12 : [1, A, 2, B, 3, C]
concat()/concatWith()
将多个 Mono 或 Flux 中的元素合并到一个 Flux 中。元素顺序严格按照给定的 Mono 或 Flux 的顺序进行拼接,即使后续的流元素延迟更短。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("concat()/concatWith()");
Mono<String> m1 = Mono.just("Tony")
.delayElement(Duration.ofMillis(400));
Mono<String> m2 = Mono.just("69")
.delayElement(Duration.ofMillis(100));
Flux<String> f1 = Flux.just("A","B","C")
.delayElements(Duration.ofMillis(120));
Flux<String> f2 = Flux.just("1", "2", "3")
.delayElements(Duration.ofMillis(100));
Flux<String> fc12 = m1.concatWith(m2);
ReactorLogUtil.log("fc12", fc12);
Flux<String> fc1 = Flux.concat(f1, f2);
Flux<String> fc2 = f1.concatWith(f2);
ReactorLogUtil.log("fc1", fc1);
ReactorLogUtil.log("fc2", fc2);
}
输出如下,即使 m2 延迟更短,也必须等待 m1 完成后才开始。
-------------------- concat()/concatWith() --------------------
<Flux> fc12 : [Tony, 69]
<Flux> fc1 : [A, B, C, 1, 2, 3]
<Flux> fc2 : [A, B, C, 1, 2, 3]
zip()/zipWith()
将各个 Flux 或 Mono 中对应索引位置的元素组合在一起。支持自定义组合逻辑,默认组合为 Tuple 元组。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("zip()/zipWith()");
Mono<String> m1 = Mono.just("Tony")
.delayElement(Duration.ofMillis(400));
Mono<String> m2 = Mono.just("69")
.delayElement(Duration.ofMillis(100));
Flux<String> f1 = Flux.just("A","B","C")
.delayElements(Duration.ofMillis(120));
Flux<String> f2 = Flux.just("1", "2", "3")
.delayElements(Duration.ofMillis(100));
Mono<Tuple2<String, String>> mz1 = Mono.zip(m1, m2);
Mono<String> mz2 = Mono.zip(m1, m2, (name, age)->name.toUpperCase()+" is "+age);
ReactorLogUtil.log("mz1", mz1);
ReactorLogUtil.log("mz2", mz2);
Flux<Tuple2<String, String>> fz1 = Flux.zip(f1, f2);
Flux<String> fz2 = f1.zipWith(f2, (f1e, f2e) -> f1e.toLowerCase() + f2e);
ReactorLogUtil.log("fz1", fz1);
ReactorLogUtil.log("fz2", fz2);
}
输出如下,zip 操作会等待所有参与流的对应元素都就绪。
-------------------- zip()/zipWith() --------------------
<Mono> mz1 : [Tony,69]
<Mono> mz2 : TONY is 69
<Flux> fz1 : [[A,1], [B,2], [C,3]]
<Flux> fz2 : [a1, b2, c3]
defaultIfEmpty()
当上游数据流为空时,返回一个指定的默认值。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("defaultIfEmpty()");
Flux<String> fd1 = Flux.just("Aaron", "Bob")
.defaultIfEmpty("Tina");
Flux<?> fd2 = Flux.empty()
.defaultIfEmpty("Tina");
ReactorLogUtil.log("fd1", fd1);
ReactorLogUtil.log("fd2", fd2);
}
-------------------- defaultIfEmpty() --------------------
<Flux> fd1 : [Aaron, Bob]
<Flux> fd2 : [Tina]
switchIfEmpty()
当上游数据流为空时,切换到一个备用的 Publisher。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("switchIfEmpty()");
Flux<String> fs1 = Flux.just("Aaron", "Bob")
.switchIfEmpty( Flux.just("11", "22") );
Flux<?> fs2 = Flux.empty()
.switchIfEmpty( Flux.just("11", "22") );
ReactorLogUtil.log("fs1", fs1);
ReactorLogUtil.log("fs2", fs2);
}
-------------------- switchIfEmpty() --------------------
<Flux> fs1 : [Aaron, Bob]
<Flux> fs2 : [11, 22]
switchOnFirst()
根据第一个元素的值来决定后续是使用原始流还是切换到一个备用流。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("switchOnFirst()");
Flux<String> ff1 = Flux.just("Start", "Data1", "Data2", "End")
// Note:flux中包含第一个元素
.switchOnFirst((signal, flux) -> {
// 第一个元素符合预期,使用原始流
if( signal.hasValue() && "Start".equals(signal.get()) ){
return flux.map(String::toUpperCase);
} else {
// 第一个元素不符合预期,使用备用流
return Flux.just("Error1", "Error 2");
}
});
Flux<String> ff2 = Flux.just("Data1", "Data2", "End")
.switchOnFirst((signal, flux) -> {
if( signal.hasValue() && "Start".equals(signal.get()) ){
return flux.map(String::toUpperCase);
} else {
return Flux.just("Error1", "Error 2");
}
});
ReactorLogUtil.log("ff1", ff1);
ReactorLogUtil.log("ff2", ff2);
}
-------------------- switchOnFirst() --------------------
<Flux> ff1 : [START, DATA1, DATA2, END]
<Flux> ff2 : [Error1, Error 2]
startWith()
在数据流的最前面添加指定的一个或多个元素。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("startWith()");
Flux<String> fst1 = Flux.just("11", "33", "55")
.startWith("Hello", "World");
Flux<String> fst2 = Flux.just("11", "33", "55")
.startWith( List.of("-1", "-2") );
Flux<String> fst3 = Flux.just("11", "33", "55")
.startWith( Flux.just("Aaron","Bob") );
ReactorLogUtil.log("fst1", fst1);
ReactorLogUtil.log("fst2", fst2);
ReactorLogUtil.log("fst3", fst3);
}
-------------------- startWith() --------------------
<Flux> fst1 : [Hello, World, 11, 33, 55]
<Flux> fst2 : [-1, -2, 11, 33, 55]
<Flux> fst3 : [Aaron, Bob, 11, 33, 55]
then()
当收到上游的 onComplete 信号后,返回一个空的 Mono 或指定的另一个 Mono。如果上游出现 onError 信号,则错误会继续向下游传递。适用于不关心上游返回的具体数据,只关心整个操作是否正常完成的场景,例如资源清理的确认。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("then()");
Mono<Void> ft1 = Flux.just("Bob", "Tina")
.map(String::toUpperCase)
.doOnNext( str-> System.out.println("ft1 : " + str) )
.then();
ReactorLogUtil.log("ft1", ft1);
Mono<Void> ft2 = Flux.just("23","Aaron")
.map( Integer::parseInt )
.doOnNext( num-> System.out.println("ft2 : " + num) )
.then();
ReactorLogUtil.log("ft2", ft2);
Mono<String> ft3 = Flux.just("11", "33")
.map( Integer::parseInt )
.doOnNext( num-> System.out.println("ft3 : " + num) )
.then(Mono.just("Success"));
ReactorLogUtil.log("ft3", ft3);
Mono<String> ft4 = Flux.just("23","Aaron")
.map( Integer::parseInt )
.doOnNext( num-> System.out.println("ft4 : " + num) )
.then(Mono.just("Success"));
ReactorLogUtil.log("ft4", ft4);
}
-------------------- then() --------------------
ft1 : BOB
ft1 : TINA
<Mono> ft1 : <Empty Mono>
ft2 : 23
<Mono> ft2 : <Throw java.lang.NumberFormatException: For input string: "Aaron">
ft3 : 11
ft3 : 33
<Mono> ft3 : Success
ft4 : 23
<Mono> ft4 : <Throw java.lang.NumberFormatException: For input string: "Aaron">
thenEmpty()
当上游正常完成后,执行另一个返回空 Mono 的异步动作。适用于上游结束后需要执行额外任务(如记录日志、发送通知)但无需返回数据的场景。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("thenEmpty()");
Mono<Void> fte1 = Flux.just("22", "44")
.map(Integer::parseInt)
.doOnNext( num-> System.out.println("fte1 #1 : " + num) )
.thenEmpty(
Mono.just("Clean Resource")
.doOnNext(str -> System.out.println("fte1 #2 : " + str))
.then()
);
ReactorLogUtil.log("fte1", fte1);
Mono<Void> fte2 = Flux.just("33", "Aaron")
.map(Integer::parseInt)
.doOnNext( num-> System.out.println("fte2 #1 : " + num) )
.thenEmpty(
Mono.just("Clean Resource")
.doOnNext(str -> System.out.println("fte2 #2 : " + str))
.then()
);
ReactorLogUtil.log("fte2", fte2);
}
-------------------- thenEmpty() --------------------
fte1 #1 : 22
fte1 #1 : 44
fte1 #2 : Clean Resource
<Mono> fte1 : <Empty Mono>
fte2 #1 : 33
<Mono> fte2 : <Throw java.lang.NumberFormatException: For input string: “Aaron">
可以看到 fte2 因为转换失败抛出异常,后续的清理任务 #2 并没有执行。
thenMany()
当上游正常完成后,执行另一个返回 Flux 的异步动作。适用于需要前置任务成功完成,才能开启后续业务流程的场景,例如系统初始化后开始处理数据。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("thenMany()");
Flux<Integer> ftm1 = Mono.just("Aaron")
.map(String::toUpperCase)
.doOnSuccess(name -> System.out.println(name + ": System Init Done"))
.thenMany(Flux.just("17", "25", "32")
.map(Integer::parseInt)
);
ReactorLogUtil.log("ftm1", ftm1);
Flux<Integer> ftm2 = Mono.just("Aaron")
.map( Integer::parseInt )
.doOnSuccess(name -> System.out.println(name + ": System Init Done"))
.thenMany(Flux.just("69")
.map(Integer::parseInt)
);
ReactorLogUtil.log("ftm2", ftm2);
}
-------------------- thenMany() --------------------
AARON: System Init Done
<Flux> ftm1 : [17, 25, 32]
<Flux> ftm2 : <Throw java.lang.NumberFormatException: For input string: "Aaron">
ftm2 因为初始化失败,后续的 Flux 根本没有机会执行。
延迟操作符
delaySubscription()
延迟对上游所有操作符(包含 delaySubscription 自身)的订阅时机。可以基于指定时间或等待另一个发布者发出信号。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("delaySubscription()");
Flux.just("Tina")
.log("#1")
// 延迟指定时间后,delaySubscription及其上游的所有操作符才开始订阅
.delaySubscription( Duration.ofSeconds(4) )
.log("#2")
.subscribe();
Thread.sleep(1000*8);
System.out.println("\n-----------------------------------\n");
Mono.just("69")
.log("#A")
// 等待另一个发布者发出完成信号,delaySubscription及其上游的所有操作符才开始订阅
.delaySubscription(
Mono.just("S")
.delayElement(Duration.ofSeconds(5))
.log(" sub publisher")
).log("#B")
.subscribe();
Thread.sleep(1000*8);
}
输出日志显示,无论是延迟4秒还是等待另一个 Mono 完成,真正的订阅和数据处理都被相应延迟了。
delayElement()/delayElements()
延迟 Mono 或 Flux 中每个元素(即 onNext 信号)的发出。这是实现定时任务或控制流速的常用操作符。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("delayElement()/delayElements()");
Mono.just("Tony")
.log("delayElement #1")
.map( String::toUpperCase )
.log("delayElement #2")
// 每个元素延迟3秒后发出
.delayElement( Duration.ofSeconds(3) )
.log("delayElement #3")
.subscribe();
Thread.sleep(1000*5);
System.out.println("\n-----------------------------------\n");
Flux.just(11,22,33)
.log("delayElements #A")
.delayElements(Duration.ofSeconds(2))
.log("delayElements #B")
.subscribe();
Thread.sleep(1000*8);
}
输出日志清晰展示了 “TONY” 在 #2 日志后等待了3秒才在 #3 发出。对于 Flux,每个数字都间隔2秒发出。
delaySequence()
仅延迟整个序列第一个元素的发出时间,后续元素仍按原有的时间间隔发出。相当于将整条数据流在时间轴上向右平移。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("delaySequence()");
Flux.just(1,2,3)
.delayElements(Duration.ofSeconds(4))
.log("#1")
// 第一个元素延迟指定时间后发出,后续元素按原有间隔发出
.delaySequence( Duration.ofSeconds(10) )
.log("#2")
.subscribe();
Thread.sleep(1000*30);
}
观察输出时间戳:#1 日志在 55s, 59s, 03s 分别收到元素1,2,3。而经过 delaySequence(10s) 后,#2 日志在 05s, 09s, 13s 才收到这些元素,间隔仍是4秒,但整体推迟了10秒。
delayUntil()
每个元素在发出前,都会延迟等待一个由该元素触发的异步操作完成。这个操作符为每个元素动态创建延迟,非常适合用于实现诸如“每次请求前等待令牌刷新”这样的场景。
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("delayUntil()");
Flux.just(1,2,3)
.log("#1")
.delayUntil( num -> Mono.just("gg").delayElement(Duration.ofSeconds(num)))
.log("#2")
.subscribe();
Thread.sleep(1000*10);
}
输出显示,元素 1 等待1秒后发出,元素 2 等待2秒后发出,元素 3 等待3秒后发出。每个元素的延迟时长由该元素的值决定。
总结与思考
通过以上示例,我们详细梳理了 Reactor 中主要的组合与延迟操作符。理解这些操作符的细微差别(如 merge 与 concat 的顺序、then 系列操作符对错误的处理)对于编写正确、高效的反应式编程代码至关重要。特别是在设计复杂的数据流处理或需要精细控制时序的高并发场景时,选择合适的操作符能让你事半功倍。
掌握这些核心操作符是深入 Java 响应式编程的关键一步。如果你想了解更多关于 Spring WebFlux 或 Reactor 的实战技巧,欢迎到云栈社区与其他开发者交流探讨。