本文深入讲解 ReactiveX 旗下的 RxJava 框架,从核心概念到实战应用,帮助你掌握响应式编程的精髓。
一、引言:为什么选择 RxJava
在现代软件开发中,异步编程已成为必不可少的技能。然而,传统的回调和线程管理复杂等问题一直困扰着开发者。RxJava (Reactive Extensions for Java) 提供了一种优雅的解决方案,让异步编程变得简单而强大。
RxJava 的核心优势
- 声明式编程 - 以数据流的方式描述业务逻辑
- 链式调用 - 操作符组合,代码简洁易读
- 强大的操作符 - 100+ 操作符满足各种场景
- 线程调度灵活 - 轻松切换执行线程
- 错误处理优雅 - 统一的异常处理机制
- 背压支持 - Flowable 处理数据流速控制
二、核心架构解析
RxJava 基于观察者模式,采用响应式编程思想,其核心由四大组件构成。

图1:RxJava 核心架构由 Observable、Observer、Schedulers 和 Operators 构成
核心组件详解
Observable (被观察者)
数据源,负责发射数据流。它是整个响应式流的起点,负责产生数据并发送给订阅者。
public abstract class Observable<T> implements ObservableSource<T> {
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
protected abstract void subscribeActual(Observer<? super T> observer);
}
Observer (观察者)
数据消费者,接收并处理数据。Observer 定义了四个核心方法来处理数据流的不同状态。
public interface Observer<T> {
// 订阅开始
void onSubscribe(Disposable d);
// 接收数据
void onNext(T t);
// 发生错误
void onError(Throwable e);
// 完成
void onComplete();
}
Operators (操作符)
用于转换、过滤、组合数据流的中间操作。操作符是 RxJava 的精髓,提供了丰富的数据处理能力。
Schedulers (调度器)
控制 Observable 和 Observer 在哪个线程执行,实现灵活的线程调度。
数据流类型
RxJava 3 提供了 5 种主要的数据流类型,每种适用于不同的场景:
- Observable - 发射 0 到 N 个数据项,不支持背压,适用于通用场景
- Flowable - 发射 0 到 N 个数据项,支持背压,适用于大数据量场景
- Single - 只发射 1 个数据项,适用于单一结果场景如网络请求
- Maybe - 发射 0 或 1 个数据项,适用于可能有结果的场景
- Completable - 不发射数据,只关心完成状态
三、快速上手:基础用法
创建 Observable 的多种方式
方式 1:just - 发射固定的数据项
Observable<String> observable1 = Observable.just("Hello", "RxJava");
方式 2:from - 从数组或集合创建
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Observable<Integer> observable2 = Observable.fromIterable(numbers);
方式 3:create - 自定义发射逻辑
Observable<String> observable3 = Observable.create(emitter -> {
emitter.onNext("Data 1");
emitter.onNext("Data 2");
emitter.onComplete();
});
方式 4:interval - 定时发射
Observable<Long> observable4 = Observable.interval(1, TimeUnit.SECONDS);
方式 5:range - 发射一系列整数
Observable<Integer> observable5 = Observable.range(1, 10);
订阅 Observable
完整的订阅示例:
Observable.just("Apple", "Banana", "Orange")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("开始订阅");
}
@Override
public void onNext(String item) {
System.out.println("收到数据: " + item);
}
@Override
public void onError(Throwable e) {
System.err.println("发生错误: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("完成");
}
});
输出结果:
开始订阅
收到数据: Apple
收到数据: Banana
收到数据: Orange
完成
简化的订阅方式
实际开发中,我们经常使用 Lambda 表达式简化订阅代码:
// 只处理 onNext
Observable.just(1, 2, 3)
.subscribe(item -> System.out.println(item));
// 处理 onNext 和 onError
Observable.just(1, 2, 3)
.subscribe(
item -> System.out.println(item),
error -> System.err.println(error)
);
// 处理 onNext、onError 和 onComplete
Observable.just(1, 2, 3)
.subscribe(
item -> System.out.println(item),
error -> System.err.println(error),
() -> System.out.println("完成")
);
四、操作符详解:数据变换的艺术
操作符是 RxJava 的核心,掌握常用操作符是使用 RxJava 的关键。

图2:RxJava 操作符主要分为创建、转换、过滤、组合、错误处理和工具等类别
创建操作符
创建操作符用于生成 Observable 数据源:
// just - 发射预定义的数据项
Observable.just(1, 2, 3);
// fromArray - 从数组创建
Integer[] array = {1, 2, 3, 4, 5};
Observable.fromArray(array);
// fromCallable - 延迟执行
Observable.fromCallable(() -> {
// 只有订阅时才执行
return expensiveOperation();
});
// defer - 为每个订阅者创建新的 Observable
Observable.defer(() -> Observable.just(System.currentTimeMillis()));
转换操作符
map - 一对一转换
最常用的转换操作符,将每个数据项转换为另一种形式:
Observable.just(1, 2, 3, 4, 5)
.map(number -> number * 2)
.subscribe(System.out::println);
// 输出: 2, 4, 6, 8, 10
flatMap - 一对多转换并合并
适用于需要为每个数据项发起异步操作的场景:
// 场景:搜索用户并获取每个用户的订单
Observable.just("user1", "user2", "user3")
.flatMap(userId -> {
// 为每个用户发起网络请求
return getUserOrders(userId);
})
.subscribe(order -> System.out.println(order));
concatMap - 顺序的 flatMap
当需要保证执行顺序时使用 concatMap:
Observable.just(1, 2, 3)
.concatMap(i -> Observable.just(i * 10)
.delay(100, TimeUnit.MILLISECONDS))
.subscribe(System.out::println);
// 输出严格按顺序: 10, 20, 30
过滤操作符
过滤操作符用于筛选数据流中的数据:
// filter - 过滤数据
Observable.range(1, 10)
.filter(number -> number % 2 == 0)
.subscribe(System.out::println);
// 输出: 2, 4, 6, 8, 10
// take - 只取前 N 个
Observable.range(1, 100)
.take(5)
.subscribe(System.out::println);
// 输出: 1, 2, 3, 4, 5
// skip - 跳过前 N 个
Observable.range(1, 10)
.skip(5)
.subscribe(System.out::println);
// 输出: 6, 7, 8, 9, 10
// distinct - 去重
Observable.just(1, 2, 2, 3, 3, 4)
.distinct()
.subscribe(System.out::println);
// 输出: 1, 2, 3, 4
debounce - 防抖 (搜索框常用)
在用户停止输入一段时间后才触发搜索,避免频繁请求:
Observable.create(emitter -> {
emitter.onNext("a");
Thread.sleep(100);
emitter.onNext("ab");
Thread.sleep(100);
emitter.onNext("abc");
Thread.sleep(500); // 停顿超过300ms
emitter.onComplete();
})
.debounce(300, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
// 只输出: abc
组合操作符
组合操作符用于合并多个数据源:
// merge - 合并多个 Observable
Observable<String> obs1 = Observable.just("A", "B");
Observable<String> obs2 = Observable.just("C", "D");
Observable.merge(obs1, obs2)
.subscribe(System.out::println);
// 输出: A, B, C, D (顺序不保证)
// zip - 组合对应位置的数据
Observable<Integer> numbers = Observable.just(1, 2, 3);
Observable<String> letters = Observable.just("A", "B", "C");
Observable.zip(numbers, letters,
(number, letter) -> number + letter)
.subscribe(System.out::println);
// 输出: 1A, 2B, 3C
// combineLatest - 任一数据源变化时组合最新值
Observable<Long> timer1 = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> timer2 = Observable.interval(2, TimeUnit.SECONDS);
Observable.combineLatest(timer1, timer2,
(t1, t2) -> "T1:" + t1 + " T2:" + t2)
.subscribe(System.out::println);
错误处理操作符
优雅的错误处理是 RxJava 的一大优势:
// onErrorReturn - 错误时返回默认值
Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("出错了"));
})
.onErrorReturn(throwable -> -1)
.subscribe(System.out::println);
// 输出: 1, 2, -1
// onErrorResumeNext - 错误时切换到另一个 Observable
Observable.create(emitter -> {
emitter.onNext("A");
emitter.onError(new Exception("出错"));
})
.onErrorResumeNext(Observable.just("B", "C"))
.subscribe(System.out::println);
// 输出: A, B, C
// retry - 重试
AtomicInteger count = new AtomicInteger(0);
Observable.create(emitter -> {
int attempt = count.incrementAndGet();
System.out.println("尝试次数: " + attempt);
if (attempt < 3) {
emitter.onError(new Exception("失败"));
} else {
emitter.onNext("成功");
emitter.onComplete();
}
})
.retry(3)
.subscribe(System.out::println);
五、线程调度:掌控异步执行
合理的线程调度是保证应用性能的关键。

图3:RxJava 提供了多种调度器,分别适用于 IO、计算、新建线程等不同场景
Schedulers 类型
RxJava 提供了多种调度器,每种适用于不同的场景:
Schedulers.io() - IO 密集型任务
适用于网络请求、文件读写等 IO 操作:
Observable.fromCallable(() -> {
// 网络请求、文件读写
return downloadFile();
})
.subscribeOn(Schedulers.io())
.subscribe(result -> System.out.println(result));
Schedulers.computation() - 计算密集型
适用于 CPU 密集型计算:
Observable.range(1, 1000000)
.map(i -> i * i)
.subscribeOn(Schedulers.computation())
.subscribe();
Schedulers.newThread() - 每次创建新线程
Observable.just(1)
.subscribeOn(Schedulers.newThread())
.subscribe();
Schedulers.single() - 单线程执行
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.single())
.subscribe();
AndroidSchedulers.mainThread() - Android 主线程
在 Android 开发中更新 UI 时使用:
Observable.just("更新UI")
.observeOn(AndroidSchedulers.mainThread())
.subscribe(text -> textView.setText(text));
subscribeOn vs observeOn
理解这两个方法的区别是掌握线程调度的关键:
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io()) // 指定数据源在IO线程
.map(number -> {
System.out.println("map线程: " + Thread.currentThread().getName());
return number * 2;
})
.observeOn(Schedulers.computation()) // 切换到计算线程
.filter(number -> {
System.out.println("filter线程: " + Thread.currentThread().getName());
return number > 2;
})
.observeOn(Schedulers.newThread()) // 再次切换线程
.subscribe(number -> {
System.out.println("subscribe线程: " + Thread.currentThread().getName());
System.out.println("结果: " + number);
});
关键区别:
- subscribeOn - 指定 Observable 自身在哪个线程执行 (只有第一次有效)
- observeOn - 指定后续操作在哪个线程执行 (可多次使用)
六、实战应用:结合 Retrofit

图4:RxJava 响应式数据流从创建 Observable 到发射数据的完整流程
网络请求 - 结合 Retrofit
作为一款强大的 Java 响应式框架,RxJava 与 Retrofit 的结合是 Android 开发中的经典模式。
API 接口定义:
public interface ApiService {
@GET("users/{id}")
Single<User> getUser(@Path("id") int userId);
@GET("users/{id}/posts")
Observable<List<Post>> getUserPosts(@Path("id") int userId);
}
Retrofit 配置:
public class NetworkModule {
private static ApiService apiService;
public static ApiService getApiService() {
if (apiService == null) {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.example.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava3CallAdapterFactory.create())
.build();
apiService = retrofit.create(ApiService.class);
}
return apiService;
}
}
使用示例:
public class UserRepository {
private final ApiService apiService;
private final CompositeDisposable disposables = new CompositeDisposable();
public UserRepository() {
this.apiService = NetworkModule.getApiService();
}
// 获取用户信息
public void loadUser(int userId, Consumer<User> onSuccess, Consumer<Throwable> onError) {
Disposable disposable = apiService.getUser(userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onSuccess, onError);
disposables.add(disposable);
}
// 清理订阅
public void clear() {
disposables.clear();
}
}
搜索防抖
实现智能搜索,避免频繁请求:
public class SearchActivity extends AppCompatActivity {
private EditText searchEditText;
private final CompositeDisposable disposables = new CompositeDisposable();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_search);
searchEditText = findViewById(R.id.search_edit_text);
// 监听搜索框输入
Disposable disposable = RxTextView.textChanges(searchEditText)
.debounce(500, TimeUnit.MILLISECONDS) // 防抖500ms
.filter(text -> text.length() >= 2) // 至少2个字符
.distinctUntilChanged() // 内容变化才触发
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(text -> {
// 执行搜索
performSearch(text.toString());
});
disposables.add(disposable);
}
private void performSearch(String keyword) {
apiService.search(keyword)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
results -> updateSearchResults(results),
error -> showError(error)
);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear(); // 防止内存泄漏
}
}
多数据源合并
同时加载多个数据源,等待全部完成后统一处理,这在设计后端系统架构时非常有用。
public class HomeRepository {
// 同时加载多个数据源
public Single<HomeData> loadHomeData() {
Single<User> userSingle = apiService.getCurrentUser();
Single<List<Post>> postsSingle = apiService.getLatestPosts();
Single<List<Notification>> notificationsSingle = apiService.getNotifications();
return Single.zip(userSingle, postsSingle, notificationsSingle,
(user, posts, notifications) ->
new HomeData(user, posts, notifications))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
}

图5:RxJava 在网络请求、搜索防抖、多数据源合并、定时任务等场景下的应用
定时刷新
实现自动刷新功能:
public class RefreshManager {
private Disposable refreshDisposable;
// 每30秒自动刷新
public void startAutoRefresh(Runnable refreshAction) {
refreshDisposable = Observable.interval(30, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(tick -> refreshAction.run());
}
public void stopAutoRefresh() {
if (refreshDisposable != null && !refreshDisposable.isDisposed()) {
refreshDisposable.dispose();
}
}
}
七、最佳实践:避坑指南

图6:RxJava 使用中的六个最佳实践要点,包括订阅管理、内存泄漏避免等
正确做法:
public class MyActivity extends AppCompatActivity {
private final CompositeDisposable disposables = new CompositeDisposable();
private void loadData() {
Disposable disposable = observable
.subscribe(data -> process(data));
disposables.add(disposable);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear(); // 清理所有订阅
}
}
统一的错误处理
创建一个统一的错误处理器:
public class RxErrorHandler {
public static Consumer<Throwable> handleError() {
return throwable -> {
if (throwable instanceof IOException) {
// 网络错误
showToast("网络连接失败");
} else if (throwable instanceof HttpException) {
// HTTP错误
int code = ((HttpException) throwable).code();
showToast("服务器错误: " + code);
} else {
// 其他错误
showToast("发生未知错误");
}
// 记录日志
Log.e("RxError", "Error occurred", throwable);
};
}
}
// 使用
observable.subscribe(
data -> process(data),
RxErrorHandler.handleError()
);
操作符选择
根据场景选择合适的操作符:
// flatMap - 不保证顺序,适合并发
Observable.just("A", "B", "C")
.flatMap(item -> processAsync(item))
.subscribe();
// concatMap - 保证顺序,适合顺序执行
Observable.just("A", "B", "C")
.concatMap(item -> processAsync(item))
.subscribe();
// switchMap - 只保留最新的,适合搜索
searchObservable
.switchMap(keyword -> apiService.search(keyword))
.subscribe();
背压处理
处理大数据量时使用 Flowable:
Flowable.range(1, 1000000)
.onBackpressureBuffer(100) // 缓冲区
.observeOn(Schedulers.computation(), false, 10)
.subscribe(
data -> process(data),
error -> handleError(error)
);
// 背压策略
Flowable.create(emitter -> {
for (int i = 0; i < 10000; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}, BackpressureStrategy.DROP) // 丢弃策略
.subscribe();
八、总结
RxJava 核心要点
- 响应式思维 - 以数据流的方式思考问题
- 操作符链 - 灵活组合操作符完成复杂逻辑
- 线程调度 - 合理使用 subscribeOn 和 observeOn
- 订阅管理 - 及时 dispose 避免内存泄漏
- 错误处理 - 使用专门的错误处理操作符
- 背压控制 - 大数据量场景使用 Flowable
RxJava 通过其声明式的编程模型和强大的操作符集合,极大地简化了异步和事件驱动程序的开发。掌握其核心概念、常用操作符以及线程调度机制,能够让你在面对复杂的并发场景时游刃有余。希望本文能帮助你更好地理解和应用 RxJava。如果你想与更多开发者交流技术心得,欢迎访问 云栈社区 。