找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2228

积分

0

好友

312

主题
发表于 2025-12-30 23:05:24 | 查看: 21| 回复: 0

本文深入讲解 ReactiveX 旗下的 RxJava 框架,从核心概念到实战应用,帮助你掌握响应式编程的精髓。

一、引言:为什么选择 RxJava

在现代软件开发中,异步编程已成为必不可少的技能。然而,传统的回调和线程管理复杂等问题一直困扰着开发者。RxJava (Reactive Extensions for Java) 提供了一种优雅的解决方案,让异步编程变得简单而强大。

RxJava 的核心优势

  • 声明式编程 - 以数据流的方式描述业务逻辑
  • 链式调用 - 操作符组合,代码简洁易读
  • 强大的操作符 - 100+ 操作符满足各种场景
  • 线程调度灵活 - 轻松切换执行线程
  • 错误处理优雅 - 统一的异常处理机制
  • 背压支持 - Flowable 处理数据流速控制

二、核心架构解析

RxJava 基于观察者模式,采用响应式编程思想,其核心由四大组件构成。

RxJava核心架构图
图1:RxJava 核心架构由 Observable、Observer、Schedulers 和 Operators 构成

核心组件详解

Observable (被观察者)
数据源,负责发射数据流。它是整个响应式流的起点,负责产生数据并发送给订阅者。

public abstract class Observable<T> implements ObservableSource<T> {
    public final void subscribe(Observer<? super T> observer) {
        subscribeActual(observer);
    }
    protected abstract void subscribeActual(Observer<? super T> observer);
}

Observer (观察者)
数据消费者,接收并处理数据。Observer 定义了四个核心方法来处理数据流的不同状态。

public interface Observer<T> {
    // 订阅开始
    void onSubscribe(Disposable d);
    // 接收数据
    void onNext(T t);
    // 发生错误
    void onError(Throwable e);
    // 完成
    void onComplete();
}

Operators (操作符)
用于转换、过滤、组合数据流的中间操作。操作符是 RxJava 的精髓,提供了丰富的数据处理能力。

Schedulers (调度器)
控制 Observable 和 Observer 在哪个线程执行,实现灵活的线程调度。

数据流类型

RxJava 3 提供了 5 种主要的数据流类型,每种适用于不同的场景:

  • Observable - 发射 0 到 N 个数据项,不支持背压,适用于通用场景
  • Flowable - 发射 0 到 N 个数据项,支持背压,适用于大数据量场景
  • Single - 只发射 1 个数据项,适用于单一结果场景如网络请求
  • Maybe - 发射 0 或 1 个数据项,适用于可能有结果的场景
  • Completable - 不发射数据,只关心完成状态

三、快速上手:基础用法

创建 Observable 的多种方式

方式 1:just - 发射固定的数据项

Observable<String> observable1 = Observable.just("Hello", "RxJava");

方式 2:from - 从数组或集合创建

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Observable<Integer> observable2 = Observable.fromIterable(numbers);

方式 3:create - 自定义发射逻辑

Observable<String> observable3 = Observable.create(emitter -> {
    emitter.onNext("Data 1");
    emitter.onNext("Data 2");
    emitter.onComplete();
});

方式 4:interval - 定时发射

Observable<Long> observable4 = Observable.interval(1, TimeUnit.SECONDS);

方式 5:range - 发射一系列整数

Observable<Integer> observable5 = Observable.range(1, 10);

订阅 Observable

完整的订阅示例:

Observable.just("Apple", "Banana", "Orange")
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("开始订阅");
        }
        @Override
        public void onNext(String item) {
            System.out.println("收到数据: " + item);
        }
        @Override
        public void onError(Throwable e) {
            System.err.println("发生错误: " + e.getMessage());
        }
        @Override
        public void onComplete() {
            System.out.println("完成");
        }
    });

输出结果:

开始订阅
收到数据: Apple
收到数据: Banana
收到数据: Orange
完成

简化的订阅方式

实际开发中,我们经常使用 Lambda 表达式简化订阅代码:

// 只处理 onNext
Observable.just(1, 2, 3)
    .subscribe(item -> System.out.println(item));

// 处理 onNext 和 onError
Observable.just(1, 2, 3)
    .subscribe(
        item -> System.out.println(item),
        error -> System.err.println(error)
    );

// 处理 onNext、onError 和 onComplete
Observable.just(1, 2, 3)
    .subscribe(
        item -> System.out.println(item),
        error -> System.err.println(error),
        () -> System.out.println("完成")
    );

四、操作符详解:数据变换的艺术

操作符是 RxJava 的核心,掌握常用操作符是使用 RxJava 的关键。

RxJava操作符分类图
图2:RxJava 操作符主要分为创建、转换、过滤、组合、错误处理和工具等类别

创建操作符

创建操作符用于生成 Observable 数据源:

// just - 发射预定义的数据项
Observable.just(1, 2, 3);

// fromArray - 从数组创建
Integer[] array = {1, 2, 3, 4, 5};
Observable.fromArray(array);

// fromCallable - 延迟执行
Observable.fromCallable(() -> {
    // 只有订阅时才执行
    return expensiveOperation();
});

// defer - 为每个订阅者创建新的 Observable
Observable.defer(() -> Observable.just(System.currentTimeMillis()));

转换操作符

map - 一对一转换
最常用的转换操作符,将每个数据项转换为另一种形式:

Observable.just(1, 2, 3, 4, 5)
    .map(number -> number * 2)
    .subscribe(System.out::println);
// 输出: 2, 4, 6, 8, 10

flatMap - 一对多转换并合并
适用于需要为每个数据项发起异步操作的场景:

// 场景:搜索用户并获取每个用户的订单
Observable.just("user1", "user2", "user3")
    .flatMap(userId -> {
        // 为每个用户发起网络请求
        return getUserOrders(userId);
    })
    .subscribe(order -> System.out.println(order));

concatMap - 顺序的 flatMap
当需要保证执行顺序时使用 concatMap:

Observable.just(1, 2, 3)
    .concatMap(i -> Observable.just(i * 10)
        .delay(100, TimeUnit.MILLISECONDS))
    .subscribe(System.out::println);
// 输出严格按顺序: 10, 20, 30

过滤操作符

过滤操作符用于筛选数据流中的数据:

// filter - 过滤数据
Observable.range(1, 10)
    .filter(number -> number % 2 == 0)
    .subscribe(System.out::println);
// 输出: 2, 4, 6, 8, 10

// take - 只取前 N 个
Observable.range(1, 100)
    .take(5)
    .subscribe(System.out::println);
// 输出: 1, 2, 3, 4, 5

// skip - 跳过前 N 个
Observable.range(1, 10)
    .skip(5)
    .subscribe(System.out::println);
// 输出: 6, 7, 8, 9, 10

// distinct - 去重
Observable.just(1, 2, 2, 3, 3, 4)
    .distinct()
    .subscribe(System.out::println);
// 输出: 1, 2, 3, 4

debounce - 防抖 (搜索框常用)
在用户停止输入一段时间后才触发搜索,避免频繁请求:

Observable.create(emitter -> {
    emitter.onNext("a");
    Thread.sleep(100);
    emitter.onNext("ab");
    Thread.sleep(100);
    emitter.onNext("abc");
    Thread.sleep(500); // 停顿超过300ms
    emitter.onComplete();
})
.debounce(300, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
// 只输出: abc

组合操作符

组合操作符用于合并多个数据源:

// merge - 合并多个 Observable
Observable<String> obs1 = Observable.just("A", "B");
Observable<String> obs2 = Observable.just("C", "D");
Observable.merge(obs1, obs2)
    .subscribe(System.out::println);
// 输出: A, B, C, D (顺序不保证)

// zip - 组合对应位置的数据
Observable<Integer> numbers = Observable.just(1, 2, 3);
Observable<String> letters = Observable.just("A", "B", "C");
Observable.zip(numbers, letters,
    (number, letter) -> number + letter)
    .subscribe(System.out::println);
// 输出: 1A, 2B, 3C

// combineLatest - 任一数据源变化时组合最新值
Observable<Long> timer1 = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> timer2 = Observable.interval(2, TimeUnit.SECONDS);
Observable.combineLatest(timer1, timer2,
    (t1, t2) -> "T1:" + t1 + " T2:" + t2)
    .subscribe(System.out::println);

错误处理操作符

优雅的错误处理是 RxJava 的一大优势:

// onErrorReturn - 错误时返回默认值
Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onError(new Exception("出错了"));
})
.onErrorReturn(throwable -> -1)
.subscribe(System.out::println);
// 输出: 1, 2, -1

// onErrorResumeNext - 错误时切换到另一个 Observable
Observable.create(emitter -> {
    emitter.onNext("A");
    emitter.onError(new Exception("出错"));
})
.onErrorResumeNext(Observable.just("B", "C"))
.subscribe(System.out::println);
// 输出: A, B, C

// retry - 重试
AtomicInteger count = new AtomicInteger(0);
Observable.create(emitter -> {
    int attempt = count.incrementAndGet();
    System.out.println("尝试次数: " + attempt);
    if (attempt < 3) {
        emitter.onError(new Exception("失败"));
    } else {
        emitter.onNext("成功");
        emitter.onComplete();
    }
})
.retry(3)
.subscribe(System.out::println);

五、线程调度:掌控异步执行

合理的线程调度是保证应用性能的关键。

RxJava线程调度器介绍图
图3:RxJava 提供了多种调度器,分别适用于 IO、计算、新建线程等不同场景

Schedulers 类型

RxJava 提供了多种调度器,每种适用于不同的场景:

Schedulers.io() - IO 密集型任务
适用于网络请求、文件读写等 IO 操作:

Observable.fromCallable(() -> {
    // 网络请求、文件读写
    return downloadFile();
})
.subscribeOn(Schedulers.io())
.subscribe(result -> System.out.println(result));

Schedulers.computation() - 计算密集型
适用于 CPU 密集型计算:

Observable.range(1, 1000000)
    .map(i -> i * i)
    .subscribeOn(Schedulers.computation())
    .subscribe();

Schedulers.newThread() - 每次创建新线程

Observable.just(1)
    .subscribeOn(Schedulers.newThread())
    .subscribe();

Schedulers.single() - 单线程执行

Observable.just(1, 2, 3)
    .subscribeOn(Schedulers.single())
    .subscribe();

AndroidSchedulers.mainThread() - Android 主线程
在 Android 开发中更新 UI 时使用:

Observable.just("更新UI")
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(text -> textView.setText(text));

subscribeOn vs observeOn

理解这两个方法的区别是掌握线程调度的关键:

Observable.just(1, 2, 3)
    .subscribeOn(Schedulers.io())       // 指定数据源在IO线程
    .map(number -> {
        System.out.println("map线程: " + Thread.currentThread().getName());
        return number * 2;
    })
    .observeOn(Schedulers.computation()) // 切换到计算线程
    .filter(number -> {
        System.out.println("filter线程: " + Thread.currentThread().getName());
        return number > 2;
    })
    .observeOn(Schedulers.newThread())   // 再次切换线程
    .subscribe(number -> {
        System.out.println("subscribe线程: " + Thread.currentThread().getName());
        System.out.println("结果: " + number);
    });

关键区别:

  • subscribeOn - 指定 Observable 自身在哪个线程执行 (只有第一次有效)
  • observeOn - 指定后续操作在哪个线程执行 (可多次使用)

六、实战应用:结合 Retrofit

RxJava响应式数据流流程图
图4:RxJava 响应式数据流从创建 Observable 到发射数据的完整流程

网络请求 - 结合 Retrofit

作为一款强大的 Java 响应式框架,RxJava 与 Retrofit 的结合是 Android 开发中的经典模式。

API 接口定义:

public interface ApiService {
    @GET("users/{id}")
    Single<User> getUser(@Path("id") int userId);

    @GET("users/{id}/posts")
    Observable<List<Post>> getUserPosts(@Path("id") int userId);
}

Retrofit 配置:

public class NetworkModule {
    private static ApiService apiService;

    public static ApiService getApiService() {
        if (apiService == null) {
            Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://api.example.com/")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava3CallAdapterFactory.create())
                .build();
            apiService = retrofit.create(ApiService.class);
        }
        return apiService;
    }
}

使用示例:

public class UserRepository {
    private final ApiService apiService;
    private final CompositeDisposable disposables = new CompositeDisposable();

    public UserRepository() {
        this.apiService = NetworkModule.getApiService();
    }

    // 获取用户信息
    public void loadUser(int userId, Consumer<User> onSuccess, Consumer<Throwable> onError) {
        Disposable disposable = apiService.getUser(userId)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(onSuccess, onError);
        disposables.add(disposable);
    }

    // 清理订阅
    public void clear() {
        disposables.clear();
    }
}

搜索防抖

实现智能搜索,避免频繁请求:

public class SearchActivity extends AppCompatActivity {
    private EditText searchEditText;
    private final CompositeDisposable disposables = new CompositeDisposable();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_search);
        searchEditText = findViewById(R.id.search_edit_text);

        // 监听搜索框输入
        Disposable disposable = RxTextView.textChanges(searchEditText)
            .debounce(500, TimeUnit.MILLISECONDS)  // 防抖500ms
            .filter(text -> text.length() >= 2)     // 至少2个字符
            .distinctUntilChanged()                 // 内容变化才触发
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(text -> {
                // 执行搜索
                performSearch(text.toString());
            });
        disposables.add(disposable);
    }

    private void performSearch(String keyword) {
        apiService.search(keyword)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                results -> updateSearchResults(results),
                error -> showError(error)
            );
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposables.clear();  // 防止内存泄漏
    }
}

多数据源合并

同时加载多个数据源,等待全部完成后统一处理,这在设计后端系统架构时非常有用。

public class HomeRepository {
    // 同时加载多个数据源
    public Single<HomeData> loadHomeData() {
        Single<User> userSingle = apiService.getCurrentUser();
        Single<List<Post>> postsSingle = apiService.getLatestPosts();
        Single<List<Notification>> notificationsSingle = apiService.getNotifications();

        return Single.zip(userSingle, postsSingle, notificationsSingle,
            (user, posts, notifications) ->
                new HomeData(user, posts, notifications))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
    }
}

RxJava实际应用场景介绍图
图5:RxJava 在网络请求、搜索防抖、多数据源合并、定时任务等场景下的应用

定时刷新

实现自动刷新功能:

public class RefreshManager {
    private Disposable refreshDisposable;

    // 每30秒自动刷新
    public void startAutoRefresh(Runnable refreshAction) {
        refreshDisposable = Observable.interval(30, TimeUnit.SECONDS)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(tick -> refreshAction.run());
    }

    public void stopAutoRefresh() {
        if (refreshDisposable != null && !refreshDisposable.isDisposed()) {
            refreshDisposable.dispose();
        }
    }
}

七、最佳实践:避坑指南

RxJava最佳实践要点图
图6:RxJava 使用中的六个最佳实践要点,包括订阅管理、内存泄漏避免等

正确做法:

public class MyActivity extends AppCompatActivity {
    private final CompositeDisposable disposables = new CompositeDisposable();

    private void loadData() {
        Disposable disposable = observable
            .subscribe(data -> process(data));
        disposables.add(disposable);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposables.clear();  // 清理所有订阅
    }
}

统一的错误处理

创建一个统一的错误处理器:

public class RxErrorHandler {
    public static Consumer<Throwable> handleError() {
        return throwable -> {
            if (throwable instanceof IOException) {
                // 网络错误
                showToast("网络连接失败");
            } else if (throwable instanceof HttpException) {
                // HTTP错误
                int code = ((HttpException) throwable).code();
                showToast("服务器错误: " + code);
            } else {
                // 其他错误
                showToast("发生未知错误");
            }
            // 记录日志
            Log.e("RxError", "Error occurred", throwable);
        };
    }
}

// 使用
observable.subscribe(
    data -> process(data),
    RxErrorHandler.handleError()
);

操作符选择

根据场景选择合适的操作符:

// flatMap - 不保证顺序,适合并发
Observable.just("A", "B", "C")
    .flatMap(item -> processAsync(item))
    .subscribe();

// concatMap - 保证顺序,适合顺序执行
Observable.just("A", "B", "C")
    .concatMap(item -> processAsync(item))
    .subscribe();

// switchMap - 只保留最新的,适合搜索
searchObservable
    .switchMap(keyword -> apiService.search(keyword))
    .subscribe();

背压处理

处理大数据量时使用 Flowable:

Flowable.range(1, 1000000)
    .onBackpressureBuffer(100)  // 缓冲区
    .observeOn(Schedulers.computation(), false, 10)
    .subscribe(
        data -> process(data),
        error -> handleError(error)
    );

// 背压策略
Flowable.create(emitter -> {
    for (int i = 0; i < 10000; i++) {
        emitter.onNext(i);
    }
    emitter.onComplete();
}, BackpressureStrategy.DROP)  // 丢弃策略
.subscribe();

八、总结

RxJava 核心要点

  • 响应式思维 - 以数据流的方式思考问题
  • 操作符链 - 灵活组合操作符完成复杂逻辑
  • 线程调度 - 合理使用 subscribeOn 和 observeOn
  • 订阅管理 - 及时 dispose 避免内存泄漏
  • 错误处理 - 使用专门的错误处理操作符
  • 背压控制 - 大数据量场景使用 Flowable

RxJava 通过其声明式的编程模型和强大的操作符集合,极大地简化了异步和事件驱动程序的开发。掌握其核心概念、常用操作符以及线程调度机制,能够让你在面对复杂的并发场景时游刃有余。希望本文能帮助你更好地理解和应用 RxJava。如果你想与更多开发者交流技术心得,欢迎访问 云栈社区




上一篇:英伟达200亿美元收购Groq技术,强化AI推理市场布局
下一篇:Spring中@Autowired与@Resource的核心区别:查找逻辑、常见问题与选型建议
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-10 18:36 , Processed in 0.242626 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表