概述

为什么要学习RxJava? 这里就不用在说了吧…. RxJava 可以提供工作效率、RxJava可以优雅的解决复杂的业务场景、RxJava已经非常流行了,大厂的App 必定有RxJava,Java后台也使用了RxJava、RxJava 是Android高级工程师必须会的、学习RxJava的设计原理…… 等等这些理由够了吧(PS:学习优秀的开源库还需要理由吗? - 不需要)

RxJava出现的历史与ReactiveX(简称:Rx)有着千丝万缕的联系,关于Rx的历史可以看Rx的历史 详细介绍了Rx的历史和应用以及函数式编程

同时,还会分析RxJava的使用场景,比如:数据转换、并发请求网络、实现网络重连、网络请求依赖不必嵌套Callback结构、缓存数据等.

现在已经有好多学习RxJava的相关文档,并且质量非常高,此篇文章主要是对自己学习的总结,毕竟学而时习之,下面推荐几个学习RxJava 的学习资料,非常全面,废话不多说拿起键盘撸之.

1 Rxjava项目地址: https://github.com/ReactiveX/Rxjava

2 Rxjava中文文档: https://mcxiaoke.gitbooks.io/rxdocs/content/

3 Rxjava经典资料:https://github.com/lzyzsd/Awesome-RxJava

4 RxJava2.x 教程:https://www.jianshu.com/p/0cd258eecf60

什么是响应式编程(reactive pattern)

在RxJava中存在一个观察者(Observer)和一个被观察者(Observable).观察者(Observer)对被观察者(Observable)发射的数据或数据序列作出响应.这种模式极大的简化了并发操作,相当于创建了一个待命状态的哨兵(Observer),在未来某个时刻响应Observable的通知,不需要阻塞等待Observable发射数据.可以称作为响应式编程.

RxJava 基本操作符

这里只讲在Android项目中常用的操作符,查看其他操作符可以翻阅官方文档

  • RxJava 的基本写法 - RxJava三部曲

    三部曲

 // 1 建立被观察者角色 产生需求
        Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) {
                //产生一个事件
                emitter.onNext("123");
            }
        });

        // 2 创建观察者  观察是否有需求
        Observer observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe = [" + d + "]");
                d.dispose();//中断
            }

            @Override
            public void onNext(String o) {
                System.out.println("onNext:" + o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                //??什么时候调用完成呢
                System.out.println("onComplete");
            }
        };

        //3 订阅
        observable.subscribe(observer);

创建操作符

  • create( ) — 使用一个函数从头创建一个Observable

create操作符和基本写法是一样的,这里并没有什么难度,几乎都能看懂

观察者被调用的顺序:onSubscribe(在此方法中可以解除订阅) -> onNext(可被调用多次) -> onComplete/onError(调用其中任何一个都会终止)

Observable.create(new ObservableOnSubscribe<Integer>() {//被观察者
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                //检查观察者的isUnsubscribe 观察者observer是否订阅了本观察者Observable 在RxJava 2.x 已经没有了,直接是ObservableEmitter
                try {
                    for (int i = 0; i < 5; i++) {
                        //被观察者发射数据
                        emitter.onNext(i);
                    }
                    //调用观察者的onCompleted正好一次或者它的onError正好一次,而且此后不能再调用观察者的任何其它方法。
                    emitter.onComplete();
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }
        }).subscribe(new Observer<Integer>() {//观察者
            @Override
            public void onSubscribe(Disposable d) {
                if (d.isDisposed()) {//判断是否解除订阅
                    System.out.println("d = [" + d + "]");
                }
                d.dispose();//解除订阅
            }

            @Override
            public void onNext(Integer integer) {
                //观察者 观察到被观察者发射了数据
                System.out.println("integer = [" + integer + "]");
            }

            @Override
            public void onError(Throwable e) {
                //错误
                System.out.println("e = [" + e + "]");
            }

            @Override
            public void onComplete() {
                //完成
                System.out.println("onComplete");
            }
        });
  • just( ) — 将一个或多个对象转换成发射这个或这些对象的一个Observable

just 可以发射多个对象,最多传递10个,just将单个数据转换为发射那个数据的Observable

just

just使用看如下代码

 Observable.just("1","2").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext:"+s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError:"+e);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

just 类似于fromArray操作符,just内部其实就是fromArray的封装

从源码中可以看出

注意:在RxJava 1.x 中可以传递null值,它会返回一个发射null值的Observable,而在RxJava 2.x 的代码中做了null判断,不能传递null值.如果要传递null值用empty操作符.

public static <T> Observable<T> just(T item1, T item2) {
        ObjectHelper.requireNonNull(item1, "The first item is null");
        ObjectHelper.requireNonNull(item2, "The second item is null");

        return fromArray(item1, item2);
    }
  • fromArray 产生一个发射数组对象/一组统一的操作符的Observable

fromArray

fromArray 的使用方法,它会发射数据的每一项数据

Integer[] a = new Integer[]{1, 2, 3, 4, 5};
        Observable.fromArray(a).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("onNext:" + integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
  • empty/never/throw

empty 创建一个不发射任何数据但正常终止的Observable

empty 只调用观察者Observer 的onComplete方法,适用于重新渲染UI,重新请求网络,也适用于结合其他的Observable

empty操作符

empty 使用代码如下

Observable.empty().subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {
                System.out.println("o = [" + o + "]");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

never 创建一个不发射数据也不终止的Observable,这个操作符感觉有点没啥用了,了解即可

Observable.never().subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("d = [" + d + "]");
            }

            @Override
            public void onNext(Object o) {
                System.out.println("o = [" + o + "]");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("e = [" + e + "]");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

throw 创建一个不发射数据以一个错误终止的Observable,这个操作符在RxJava2.x 去掉了,不建议使用了

  • interval 创建一个固定时间间隔发送整数序列的Observable

interval 操作符返回一个Observable,它按固定的时间间隔发射一个无限递增的整数序列

interval操作符

interval 操作符使用代码如下:

Observable.interval(1, TimeUnit.SECONDS)
                .take(5)//限制 拦截产生事件的数量
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("d = [" + d + "]");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("aLong = [" + aLong + "]");
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

Interval还可以指定调度器,在某个线程上执行,代码如下,将interval 放到一个新的线程上执行

Observable.interval(1, TimeUnit.SECONDS, Schedulers.newThread())
                .take(5)//限制 拦截产生事件的数量
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("d = [" + d + "]");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("aLong = [" + aLong + "]" + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

日志打印如下:

aLong = [0]RxNewThreadScheduler-1
aLong = [1]RxNewThreadScheduler-1
aLong = [2]RxNewThreadScheduler-1
aLong = [3]RxNewThreadScheduler-1
aLong = [4]RxNewThreadScheduler-1

interval 在RxJava2.x添加了许多重载方法,可自行查看文档翻阅

  • Range 创建一个发射特定整数序列的Observable

Range

range 使用代码如下

Observable.range(1, 10)
                .scan(new BiFunction<Integer, Integer, Integer>() {//处理数据 改变发射的数据
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("integer = [" + integer + "]");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

scan 处理上一个结果 作为 下一个参数,打印的结果:

integer = [1]
integer = [3]
integer = [6]
integer = [10]
integer = [15]
integer = [21]
integer = [28]
integer = [36]
integer = [45]
integer = [55]

变换操作符

变换操作符可用于Observable 发射的数据执行变换操作的各种操作符

  • map 对Observable发射的每一项数据应用一个函数,执行变换操作

map

Map 操作符使用代码如下:

Observable.just("head", "bit").map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {//对每一项数据进行 数据变换
                // 进行网络请求
                System.out.println("s = [" + s + "]");
                return s+".png";
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String bitmap) {
                System.out.println("bitmap = [" + bitmap + "]");

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

上述代码是 将head bit 添加一个.png对后缀,打印结果如下,可以看到是将每一项数据进行变化操作

s = [head]
bitmap = [head.png]
s = [bit]
bitmap = [bit.png]

map场景更多的是应用在网络请求中,RxJava 与 OkHttp3、RxJava与Retrofit 搭配map操作符使用主要是转换json数据,在onNext中直接使用转换的json数据.

下面来写一个RxJava 与 OkHttp3搭配的一个网络请求

首先先分析步骤:

  1. 通过Observable.create() 方法,调用OkHttp - 子线程中执行
  2. 通过map操作符转换json数据为bean类 - 子线程中执行
  3. 通过doOnNext()方法,将bean中的数据保存到数据库等操作 - 子线程中执行
  4. 调度线程,主线程中更新UI
  5. 通过subscribe(),成功或者失败来更新UI

代码如下:

 Disposable disposable = Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(ObservableEmitter<Response> emitter) throws Exception {
                System.out.println("emitter = [" + Thread.currentThread().getName() + "]");
                //请求网络
                Request.Builder builder = new Request.Builder().url("https://music.aityp.com/banner?type=1").get();
                Request request = builder.build();
                Call newCall = new OkHttpClient().newCall(request);
                Response response = newCall.execute();
                emitter.onNext(response);
            }
        }).map(new Function<Response, MyClass>() {
            @Override
            public MyClass apply(Response response) throws Exception {
                //转换数据
                if (response.isSuccessful()) {
                    ResponseBody responseBody = response.body();
                    if (responseBody != null) {
                        return new Gson().fromJson(responseBody.string(), MyClass.class);
                    }
                }
                return null;
            }
        }).doOnNext(new Consumer<MyClass>() {
            @Override
            public void accept(MyClass myClass) throws Exception {
                System.out.println("将数据保存到数据库,保存成功 myClass = [" + Thread.currentThread().getName() + ":" + myClass.toString() + "]");
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<MyClass>() {
                    @Override
                    public void accept(MyClass myClass) throws Exception {
                        System.out.println("获取数据成功 myClass = [" + Thread.currentThread().getName() + ":" + myClass.toString() + "]");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        System.out.println("throwable = [" + throwable + "]");
                    }
                });

注意:如果想让doOnNext 在主线程中执行则在doOnNext()方法前面添加.observeOn(AndroidSchedulers.mainThread()),doOnNext 就会在主线程中执行.

通过RxJava 可以将网络请求的逻辑分离的更加清晰

  • concat 聚合操作符,不交错的发射两个或多个Observable的发射物

concat

concat 操作符链接多个Observable的输出,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,也就是说只有前面的调用了onComplete才会开始后面的Observable,直到前面的Observable终止,concat才会订阅额外的一个Observable.

注意:如果去链接一个”热”Observable(这种Observable在创建后立即开始发射数据,即使没有观察者),concat将不会看到也不会发射它之前发射的任何数据

concat 使用的经典场景

使用concat操作符先读取缓存数据再通过网络请求获取数据,在实际的项目中,为了更好的用户体验,将数据更快的呈现给用户,一般都会将网络数据缓存下来,有缓存数据直接使用缓存数据,如果没有缓存数据再通过网络请求获取

下面我们模拟一下这个场景:

创建读取缓存的observable,如果有缓存解析缓存的数据,更新UI,

//查询缓存的observable
        Observable<String> cacheObservable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                String cache = CacheManage.getInstance().get("cache");
                if (!TextUtils.isEmpty(cache)) {
                    Log.e(TAG, "subscribe: 读取缓存数据 -》" + cache);
                    emitter.onNext(cache);
                    //这里可以更具设置的缓存时长 超过了缓存的时长 请求网络 更新缓存数据 更新UI
                    emitter.onComplete();
                } else {
                    Log.e(TAG, "subscribe: 没有缓存请求网络");
                    emitter.onComplete();
                }
            }
        });

创建请求网络的observable,需要注意的是两个Observable的泛型必须保持一致

//请求网络的observable
        Observable<String> webObservable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.e(TAG, "subscribe 网络请求线程: " + Thread.currentThread().getName());
                Log.e(TAG, "subscribe: 请求网络数据");
                //请求网络
                Request.Builder builder = new Request.Builder().url("https://music.aityp.com/banner?type=1").get();
                Request request = builder.build();
                Call newCall = new OkHttpClient().newCall(request);
                Response response = newCall.execute();
                if (response.isSuccessful()) {
                    if (response.body() != null) {
                        String string = response.body().string();
                        CacheManage.getInstance().put("cache", string);
                        emitter.onNext(string);
                    }
                }
            }
        });

最后将得到的json字符串,进行解析

private class ParaseFunction implements Function<String, MyClass> {

        @Override
        public MyClass apply(String response) throws Exception {
            Log.e(TAG, "subscribe 解析json线程: " + Thread.currentThread().getName());
            Log.e(TAG, "apply: 解析json字符串");
            return new Gson().fromJson(response, MyClass.class);
        }
    }

通过concat 合并

Observable.concat(cacheObservable, webObservable)
                .map(new ParaseFunction())//解析json 字符串
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<MyClass>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(MyClass myClass) {
                        Log.e(TAG, "subscribe UI更新线程: " + Thread.currentThread().getName());
                        Log.e(TAG, "onNext: " + myClass.toString());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: " + e);
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "onComplete: ");
                    }
                });
  • flatMap 将一个发射数据的Observable变成多个Observables,然后将它们发射的数据合并后放进一个单独的Observable

flatMap

利用flatMap的特性,可以实现多个网络请求依次请求,比如用户没有注册,用户注册成功后自动登录的场景,比如用户没有登录进行收藏,调用登录接口,登录成功后调用自动收藏接口

代码如下:

final boolean[] isRegister = {false};
        //先去注册 再去登录
        Observable.just("register", "login").flatMap(new Function<String, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(String s) throws Exception {
                if (!isRegister[0]) {
                    return Observable.create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                            //调用注册接口
                            isRegister[0] = true;
                            emitter.onNext("注册成功");
                        }
                    });
                } else {
                    return Observable.create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                            //调用登录接口
                            emitter.onNext("登录成功");
                        }
                    });
                }
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {
                System.out.println("o = [" + o + "]");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

当我在注册的时候调用onComplete时订阅并没有解除,而且没有调用Observer中的onComplete方法,而是继续调用下一个Observable,当我在注册时调用onError`时就会直接结束,并没有继续调用下一个Observable.

flatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后flatMap合并这些Observables发射的数据,最后将合并后的结果当作它自己的数据序列发射.

注意:flatMap 对这些Observable发射的数据做的是合并操作,它们可能是交错的.

注意:如果任何一个通过这个flatMap操作产生的单独的Observable调用onError异常终止了,这个Observable自身会立即调用onError并终止。

还可以通过flatMap实现一次性token,先请求token在访问接口,将token请求和实际数据的请求连贯串起来.

  • zip 通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项

zip

zip理解起来很简单,就是将多个Observable发射的数据结合然后发射这个函数返回的结果.

zip使用的场景:实现多个接口并行请求将获取到的数据糅合在一起,共同更新UI

如下代码,只是模拟,多个请求网络的Observable,通过map将json数据转换成bean类

Observable<String> zip1 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                System.out.println("emitter = [" + Thread.currentThread().getName() + "]");
                emitter.onNext("test");
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                System.out.println("emitter = [" + Thread.currentThread().getName() + "]");
                //解析json数据 返回bean
                return "json:" + s;
            }
        });

        Observable<Integer> zip2 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                System.out.println("emitter = [" + Thread.currentThread().getName() + "]");
                emitter.onNext(111);
            }
        }).map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer o) throws Exception {
                System.out.println("emitter = [" + Thread.currentThread().getName() + "]");
                //解析json数据 返回bean
                return 111 + o;
            }
        });

将多个Observables并行执行,结果进行糅合

Observable.zip(zip1, zip2, new BiFunction<String, Integer, Object>() {
            @Override
            public Object apply(String s, Integer integer) throws Exception {
                return "合并后的数据为:" + s + integer;
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Object o) {
                        System.out.println("o = [" + o + "]");
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

通过zip操作符,可以很简单的实现并行请求网络,获取结果:

o = [合并后的数据为:json:test222]

  • zipWith 操作符总是接受两个参数,第一个参数是一个Observable或者一个Iterable

zipWith

zipWith 会返回一个指定的新的类型

observable.zipWith(Observable.range(1, 1 + count), new BiFunction<Throwable, Integer, Warrp>() {
                    @Override
                    public Warrp apply(Throwable throwable, Integer integer) throws Exception {
                        Log.e(TAG, "apply: 合并重试次数和异常信息:" + integer);
                        return new Warrp(integer, throwable);
                    }
                })
  • retryWhen 如果原始Observable遇到错误,可以重新订阅它期望它能正常终止 允许重新订阅已结束的Observable

retryWhen

retryWhen 将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,retryWhen观察它的结果再决定是不是要重新订阅原始的Observable.如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止.

注意: retryWhen 默认在trampoline调度器上执行,你可以通过参数指定其它的调度器(调度器在文章最后有讲解)

retryWhen 的场景非常适合,网络请求或者其他错误重试,以及设置重试的次数.

我们通过一个例子来实现,网络请求重试的逻辑

首先设置重试的次数和等待时长

final int count = 3;//重试次数为3次
final int delay = 5000;//等待时长 重新链接的时间间隔
String cacheToken = null;



模拟网络请求,模拟第二次重试时网络请求成功

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                Log.e(TAG, "subscribe: 1 - 请求网络");
                //模拟网络请求
                if (cacheToken == null) {//模拟网络请求失败
                    emitter.onError(new NullPointerException());
                } else {
                    //模拟网络请求成功
                    emitter.onNext("网络请求成功");//当调用next 时不会走retryWhen
                }
            }
        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> observable) throws Exception {
                return observable.zipWith(Observable.range(1, 1 + count), new BiFunction<Throwable, Integer, Warrp>() {
                    @Override
                    public Warrp apply(Throwable throwable, Integer integer) throws Exception {
                        Log.e(TAG, "apply: 合并重试次数和异常信息:" + integer);
                        return new Warrp(integer, throwable);
                    }
                }).flatMap(new Function<Warrp, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Warrp warrp) throws Exception {
                        //收到异常信息 且重试次数小于设置的次数
                        if (warrp.throwable instanceof NullPointerException && (warrp.count < count + 1)) {
                            Log.e(TAG, "apply: 重试次数:" + warrp.count);
                            Log.e(TAG, "apply: 收到错误准备重试:" + delay + " ms后开始重试");
                            if (warrp.count == 2) {//模拟 第二次重试 网络请求正常了
                                cacheToken = "1212213";
                            }
                            //重新链接 请求网络
                            return Observable.timer(delay, TimeUnit.MILLISECONDS);
                        }
                        return Observable.error(warrp.throwable);
                    }
                });
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        Log.e(TAG, "accept: next - " + o);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: error - " + throwable);
                    }
                });

输出结果:

subscribe: 1 - 请求网络
apply: 合并重试次数和异常信息:1
apply: 重试次数:1
apply: 收到错误准备重试:5000 ms后开始重试
subscribe: 1 - 请求网络
apply: 合并重试次数和异常信息:2
apply: 重试次数:2
apply: 收到错误准备重试:5000 ms后开始重试
subscribe: 1 - 请求网络
accept: next - 请求网络

如此简单的写法就可以实现这种复杂的需求,RxJava确实是十分强大,如果我们不用RxJava实现,普通的写法会有一大堆的判断并且难以维护.

  • 常用的过滤操作符 filter/distinct/elementAt/all/contains 这些操作符使用很简单了解即可

filter 过滤小于2的数

Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate<Integer>() {//过滤器
            @Override
            public boolean test(Integer integer) throws Exception {// false 当前等事件不被处理  true 当前等事件处理
                return integer > 2;
            }
        })

distince 过滤重复元素

Observable.just(1, 2, 3, 3, 3, 4, 45, 6, 6).distinct()

elementAt 获取指定元素 位置从0开始

 Observable.just(1, 2, 3, 4).elementAt(2)

All 判断事件是否满足条件 如下代码 如果全部大于2则返回true

Observable.just(4, 3, 4, 5).all(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer > 2;//是否全部大于 2
            }
        }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                System.out.println("aBoolean = [" + aBoolean + "]");
            }
        });

contain 判断事件是否包含 如果包含则返回true

 Observable.just("abc", "a", "er").contains("a").subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                System.out.println("aBoolean = [" + aBoolean + "]");
            }
        });
  • groupBy 将事件类型 转换为 结果类型

groupBy

Observable.just(1, 2, 3, 4, 5).groupBy(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                System.out.println("integer = [" + integer + "]");

                //根据参数进行分类

                return integer > 2 ? "A" : "B";
            }
        }).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
            @Override
            public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
                //stringIntegerGroupedObservable 是被观察者
                stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {// 通过观察者拿到数据
                    @Override
                    public void accept(Integer integer) throws Exception {
                        String key = stringIntegerGroupedObservable.getKey();
                        System.out.println("key--" + key + " | " + integer);

                    }
                });
            }
        });

输出结果:

integer = [1]
key–B | 1
integer = [2]
key–B | 2
integer = [3]
key–A | 3
integer = [4]
key–A | 4
integer = [5]
key–A | 5

  • buffer 定期收集Observable的数据,放入一个数据包裹中,然后发射这些数据包裹而不是一次发射一个值.

buffer

使用场景
* 100000条数据插入到数据库
* 每一条数据产生都需要时间
* 如果产生一条 插入一条比较浪费时间 全部一次性插入用户等太久
* 采取buffer等形式 将100000 条 分成 一小段执行
Observable.just(1, 2, 3, 4, 5, 6).buffer(3).subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(List<Integer> integers) {
                System.out.println("integers = [" + integers + "]");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

输出结果:123 456

注意:如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

背压

背压主要用于并发处理,在Android中用的不多.背压就是:生产速度 与 消费速度的比值

 // 背压策略
        Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
                for (int i = 0; i < 10000000; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.BUFFER)
                .subscribe(new Subscriber<Object>() {
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("onComplete");
                s.request(Integer.MAX_VALUE);//最大的处理能力 最大处理数
            }

            @Override
            public void onNext(Object o) {
                System.out.println("o = [" + o + "]");

            }

            @Override
            public void onError(Throwable t) {
                System.out.println("onComplete");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

线程调度

线程调度,主要的作用就是线程切换.在Android中常用的场景就是从在子线程切换到主线程

subscribeOn是指Observable(被观察者)在哪个调度器(线程)上执行,于此搭配的还有unsubscribeOn.多次指定Observable发射事件的线程只有第一次指定有效.

observerOn是指observer(观察者)在哪个调度器(线程)上执行,在Android中多用于在Android主线程中执行.多次指定Observer(观察者)的线程,下游的线程就会切换一次.

subscribeOn

RxJava 中,已经内置了很多线程选项,常用的例如有:

  • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;
  • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作;
  • Schedulers.newThread() 代表一个常规的新线程;
  • AndroidSchedulers.mainThread() 代表Android的主线程

至此,RxJava 基本常用的操作符已经分析完毕,RxJava不仅可以简化项目中复杂的逻辑,同时提高了代码的维护性,我们更应该学习这种设计思想,分析RxJava的源码.而不止在于使用框架上,知其然,知其所以然.才能融汇贯通成长起来.