RxJava2.x 庖丁解牛 - 操作符使用详解与场景分析

概述

为什么要学习RxJava? 这里就不用在说了吧…. RxJava 可以提供工作效率、RxJava可以优雅的解决复杂的业务场景、RxJava已经非常流行了,大厂的App 必定有RxJava,Java后台也使用了RxJava、RxJava 是Android高级工程师必须会的、学习RxJava的设计原理…… 等等这些理由够了吧(PS:学习优秀的开源库还需要理由吗? - 不需要)

RxJava出现的历史与ReactiveX(简称:Rx)有着千丝万缕的联系,关于Rx的历史可以看Rx的历史 详细介绍了Rx的历史和应用以及函数式编程

同时,还会分析RxJava的使用场景,比如:数据转换、并发请求网络、实现网络重连、网络请求依赖不必嵌套Callback结构、缓存数据等.

现在已经有好多学习RxJava的相关文档,并且质量非常高,此篇文章主要是对自己学习的总结,毕竟学而时习之,下面推荐几个学习RxJava 的学习资料,非常全面,废话不多说拿起键盘撸之.

1 Rxjava项目地址: https://github.com/ReactiveX/Rxjava

2 Rxjava中文文档: https://mcxiaoke.gitbooks.io/rxdocs/content/

3 Rxjava经典资料:https://github.com/lzyzsd/Awesome-RxJava

4 RxJava2.x 教程:https://www.jianshu.com/p/0cd258eecf60

什么是响应式编程(reactive pattern)

在RxJava中存在一个观察者(Observer)和一个被观察者(Observable).观察者(Observer)对被观察者(Observable)发射的数据或数据序列作出响应.这种模式极大的简化了并发操作,相当于创建了一个待命状态的哨兵(Observer),在未来某个时刻响应Observable的通知,不需要阻塞等待Observable发射数据.可以称作为响应式编程.

RxJava 基本操作符

这里只讲在Android项目中常用的操作符,查看其他操作符可以翻阅官方文档

  • RxJava 的基本写法 - RxJava三部曲

    三部曲

// 1 建立被观察者角色 产生需求
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) {
//产生一个事件
emitter.onNext("123");
}
});

// 2 创建观察者 观察是否有需求
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe = [" + d + "]");
d.dispose();//中断
}

@Override
public void onNext(String o) {
System.out.println("onNext:" + o);
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onComplete() {
//??什么时候调用完成呢
System.out.println("onComplete");
}
};

//3 订阅
observable.subscribe(observer);

创建操作符

  • create( ) — 使用一个函数从头创建一个Observable

create操作符和基本写法是一样的,这里并没有什么难度,几乎都能看懂

观察者被调用的顺序:onSubscribe(在此方法中可以解除订阅) -> onNext(可被调用多次) -> onComplete/onError(调用其中任何一个都会终止)

Observable.create(new ObservableOnSubscribe<Integer>() {//被观察者
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
//检查观察者的isUnsubscribe 观察者observer是否订阅了本观察者Observable 在RxJava 2.x 已经没有了,直接是ObservableEmitter
try {
for (int i = 0; i < 5; i++) {
//被观察者发射数据
emitter.onNext(i);
}
//调用观察者的onCompleted正好一次或者它的onError正好一次,而且此后不能再调用观察者的任何其它方法。
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}).subscribe(new Observer<Integer>() {//观察者
@Override
public void onSubscribe(Disposable d) {
if (d.isDisposed()) {//判断是否解除订阅
System.out.println("d = [" + d + "]");
}
d.dispose();//解除订阅
}

@Override
public void onNext(Integer integer) {
//观察者 观察到被观察者发射了数据
System.out.println("integer = [" + integer + "]");
}

@Override
public void onError(Throwable e) {
//错误
System.out.println("e = [" + e + "]");
}

@Override
public void onComplete() {
//完成
System.out.println("onComplete");
}
});
  • just( ) — 将一个或多个对象转换成发射这个或这些对象的一个Observable

just 可以发射多个对象,最多传递10个,just将单个数据转换为发射那个数据的Observable

just

just使用看如下代码

Observable.just("1","2").subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}

@Override
public void onNext(String s) {
System.out.println("onNext:"+s);
}

@Override
public void onError(Throwable e) {
System.out.println("onError:"+e);
}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});

just 类似于fromArray操作符,just内部其实就是fromArray的封装

从源码中可以看出

注意:在RxJava 1.x 中可以传递null值,它会返回一个发射null值的Observable,而在RxJava 2.x 的代码中做了null判断,不能传递null值.如果要传递null值用empty操作符.

public static <T> Observable<T> just(T item1, T item2) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");

return fromArray(item1, item2);
}
  • fromArray 产生一个发射数组对象/一组统一的操作符的Observable

fromArray

fromArray 的使用方法,它会发射数据的每一项数据

Integer[] a = new Integer[]{1, 2, 3, 4, 5};
Observable.fromArray(a).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});
  • empty/never/throw

empty 创建一个不发射任何数据但正常终止的Observable

empty 只调用观察者Observer 的onComplete方法,适用于重新渲染UI,重新请求网络,也适用于结合其他的Observable

empty操作符

empty 使用代码如下

Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Object o) {
System.out.println("o = [" + o + "]");
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});

never 创建一个不发射数据也不终止的Observable,这个操作符感觉有点没啥用了,了解即可

Observable.never().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("d = [" + d + "]");
}

@Override
public void onNext(Object o) {
System.out.println("o = [" + o + "]");
}

@Override
public void onError(Throwable e) {
System.out.println("e = [" + e + "]");
}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});

throw 创建一个不发射数据以一个错误终止的Observable,这个操作符在RxJava2.x 去掉了,不建议使用了

  • interval 创建一个固定时间间隔发送整数序列的Observable

interval 操作符返回一个Observable,它按固定的时间间隔发射一个无限递增的整数序列

interval操作符

interval 操作符使用代码如下:

Observable.interval(1, TimeUnit.SECONDS)
.take(5)//限制 拦截产生事件的数量
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("d = [" + d + "]");
}

@Override
public void onNext(Long aLong) {
System.out.println("aLong = [" + aLong + "]");
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});

Interval还可以指定调度器,在某个线程上执行,代码如下,将interval 放到一个新的线程上执行

Observable.interval(1, TimeUnit.SECONDS, Schedulers.newThread())
.take(5)//限制 拦截产生事件的数量
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("d = [" + d + "]");
}

@Override
public void onNext(Long aLong) {
System.out.println("aLong = [" + aLong + "]" + Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});

日志打印如下:

aLong = [0]RxNewThreadScheduler-1
aLong = [1]RxNewThreadScheduler-1
aLong = [2]RxNewThreadScheduler-1
aLong = [3]RxNewThreadScheduler-1
aLong = [4]RxNewThreadScheduler-1

interval 在RxJava2.x添加了许多重载方法,可自行查看文档翻阅

  • Range 创建一个发射特定整数序列的Observable

Range

range 使用代码如下

Observable.range(1, 10)
.scan(new BiFunction<Integer, Integer, Integer>() {//处理数据 改变发射的数据
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Integer integer) {
System.out.println("integer = [" + integer + "]");
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

scan 处理上一个结果 作为 下一个参数,打印的结果:

integer = [1]
integer = [3]
integer = [6]
integer = [10]
integer = [15]
integer = [21]
integer = [28]
integer = [36]
integer = [45]
integer = [55]

变换操作符

变换操作符可用于Observable 发射的数据执行变换操作的各种操作符

  • map 对Observable发射的每一项数据应用一个函数,执行变换操作

map

Map 操作符使用代码如下:

Observable.just("head", "bit").map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {//对每一项数据进行 数据变换
// 进行网络请求
System.out.println("s = [" + s + "]");
return s+".png";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String bitmap) {
System.out.println("bitmap = [" + bitmap + "]");

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

上述代码是 将head bit 添加一个.png对后缀,打印结果如下,可以看到是将每一项数据进行变化操作

s = [head]
bitmap = [head.png]
s = [bit]
bitmap = [bit.png]

map场景更多的是应用在网络请求中,RxJava 与 OkHttp3、RxJava与Retrofit 搭配map操作符使用主要是转换json数据,在onNext中直接使用转换的json数据.

下面来写一个RxJava 与 OkHttp3搭配的一个网络请求

首先先分析步骤:

  1. 通过Observable.create() 方法,调用OkHttp - 子线程中执行
  2. 通过map操作符转换json数据为bean类 - 子线程中执行
  3. 通过doOnNext()方法,将bean中的数据保存到数据库等操作 - 子线程中执行
  4. 调度线程,主线程中更新UI
  5. 通过subscribe(),成功或者失败来更新UI

代码如下:

Disposable disposable = Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(ObservableEmitter<Response> emitter) throws Exception {
System.out.println("emitter = [" + Thread.currentThread().getName() + "]");
//请求网络
Request.Builder builder = new Request.Builder().url("https://music.aityp.com/banner?type=1").get();
Request request = builder.build();
Call newCall = new OkHttpClient().newCall(request);
Response response = newCall.execute();
emitter.onNext(response);
}
}).map(new Function<Response, MyClass>() {
@Override
public MyClass apply(Response response) throws Exception {
//转换数据
if (response.isSuccessful()) {
ResponseBody responseBody = response.body();
if (responseBody != null) {
return new Gson().fromJson(responseBody.string(), MyClass.class);
}
}
return null;
}
}).doOnNext(new Consumer<MyClass>() {
@Override
public void accept(MyClass myClass) throws Exception {
System.out.println("将数据保存到数据库,保存成功 myClass = [" + Thread.currentThread().getName() + ":" + myClass.toString() + "]");
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<MyClass>() {
@Override
public void accept(MyClass myClass) throws Exception {
System.out.println("获取数据成功 myClass = [" + Thread.currentThread().getName() + ":" + myClass.toString() + "]");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("throwable = [" + throwable + "]");
}
});

注意:如果想让doOnNext 在主线程中执行则在doOnNext()方法前面添加.observeOn(AndroidSchedulers.mainThread()),doOnNext 就会在主线程中执行.

通过RxJava 可以将网络请求的逻辑分离的更加清晰

  • concat 聚合操作符,不交错的发射两个或多个Observable的发射物

concat

concat 操作符链接多个Observable的输出,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,也就是说只有前面的调用了onComplete才会开始后面的Observable,直到前面的Observable终止,concat才会订阅额外的一个Observable.

注意:如果去链接一个”热”Observable(这种Observable在创建后立即开始发射数据,即使没有观察者),concat将不会看到也不会发射它之前发射的任何数据

concat 使用的经典场景

使用concat操作符先读取缓存数据再通过网络请求获取数据,在实际的项目中,为了更好的用户体验,将数据更快的呈现给用户,一般都会将网络数据缓存下来,有缓存数据直接使用缓存数据,如果没有缓存数据再通过网络请求获取

下面我们模拟一下这个场景:

创建读取缓存的observable,如果有缓存解析缓存的数据,更新UI,

//查询缓存的observable
Observable<String> cacheObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String cache = CacheManage.getInstance().get("cache");
if (!TextUtils.isEmpty(cache)) {
Log.e(TAG, "subscribe: 读取缓存数据 -》" + cache);
emitter.onNext(cache);
//这里可以更具设置的缓存时长 超过了缓存的时长 请求网络 更新缓存数据 更新UI
emitter.onComplete();
} else {
Log.e(TAG, "subscribe: 没有缓存请求网络");
emitter.onComplete();
}
}
});

创建请求网络的observable,需要注意的是两个Observable的泛型必须保持一致

//请求网络的observable
Observable<String> webObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e(TAG, "subscribe 网络请求线程: " + Thread.currentThread().getName());
Log.e(TAG, "subscribe: 请求网络数据");
//请求网络
Request.Builder builder = new Request.Builder().url("https://music.aityp.com/banner?type=1").get();
Request request = builder.build();
Call newCall = new OkHttpClient().newCall(request);
Response response = newCall.execute();
if (response.isSuccessful()) {
if (response.body() != null) {
String string = response.body().string();
CacheManage.getInstance().put("cache", string);
emitter.onNext(string);
}
}
}
});

最后将得到的json字符串,进行解析

private class ParaseFunction implements Function<String, MyClass> {

@Override
public MyClass apply(String response) throws Exception {
Log.e(TAG, "subscribe 解析json线程: " + Thread.currentThread().getName());
Log.e(TAG, "apply: 解析json字符串");
return new Gson().fromJson(response, MyClass.class);
}
}

通过concat 合并

Observable.concat(cacheObservable, webObservable)
.map(new ParaseFunction())//解析json 字符串
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<MyClass>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
}

@Override
public void onNext(MyClass myClass) {
Log.e(TAG, "subscribe UI更新线程: " + Thread.currentThread().getName());
Log.e(TAG, "onNext: " + myClass.toString());
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e);
}

@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
  • flatMap 将一个发射数据的Observable变成多个Observables,然后将它们发射的数据合并后放进一个单独的Observable

flatMap

利用flatMap的特性,可以实现多个网络请求依次请求,比如用户没有注册,用户注册成功后自动登录的场景,比如用户没有登录进行收藏,调用登录接口,登录成功后调用自动收藏接口

代码如下:

final boolean[] isRegister = {false};
//先去注册 再去登录
Observable.just("register", "login").flatMap(new Function<String, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(String s) throws Exception {
if (!isRegister[0]) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//调用注册接口
isRegister[0] = true;
emitter.onNext("注册成功");
}
});
} else {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//调用登录接口
emitter.onNext("登录成功");
}
});
}
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Object o) {
System.out.println("o = [" + o + "]");
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

当我在注册的时候调用onComplete时订阅并没有解除,而且没有调用Observer中的onComplete方法,而是继续调用下一个Observable,当我在注册时调用onError`时就会直接结束,并没有继续调用下一个Observable.

flatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后flatMap合并这些Observables发射的数据,最后将合并后的结果当作它自己的数据序列发射.

注意:flatMap 对这些Observable发射的数据做的是合并操作,它们可能是交错的.

注意:如果任何一个通过这个flatMap操作产生的单独的Observable调用onError异常终止了,这个Observable自身会立即调用onError并终止。

还可以通过flatMap实现一次性token,先请求token在访问接口,将token请求和实际数据的请求连贯串起来.

  • zip 通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项

zip

zip理解起来很简单,就是将多个Observable发射的数据结合然后发射这个函数返回的结果.

zip使用的场景:实现多个接口并行请求将获取到的数据糅合在一起,共同更新UI

如下代码,只是模拟,多个请求网络的Observable,通过map将json数据转换成bean类

Observable<String> zip1 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
System.out.println("emitter = [" + Thread.currentThread().getName() + "]");
emitter.onNext("test");
}
}).map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
System.out.println("emitter = [" + Thread.currentThread().getName() + "]");
//解析json数据 返回bean
return "json:" + s;
}
});

Observable<Integer> zip2 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
System.out.println("emitter = [" + Thread.currentThread().getName() + "]");
emitter.onNext(111);
}
}).map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer o) throws Exception {
System.out.println("emitter = [" + Thread.currentThread().getName() + "]");
//解析json数据 返回bean
return 111 + o;
}
});

将多个Observables并行执行,结果进行糅合

Observable.zip(zip1, zip2, new BiFunction<String, Integer, Object>() {
@Override
public Object apply(String s, Integer integer) throws Exception {
return "合并后的数据为:" + s + integer;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Object o) {
System.out.println("o = [" + o + "]");
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

通过zip操作符,可以很简单的实现并行请求网络,获取结果:

o = [合并后的数据为:json:test222]

  • zipWith 操作符总是接受两个参数,第一个参数是一个Observable或者一个Iterable

zipWith

zipWith 会返回一个指定的新的类型

observable.zipWith(Observable.range(1, 1 + count), new BiFunction<Throwable, Integer, Warrp>() {
@Override
public Warrp apply(Throwable throwable, Integer integer) throws Exception {
Log.e(TAG, "apply: 合并重试次数和异常信息:" + integer);
return new Warrp(integer, throwable);
}
})
  • retryWhen 如果原始Observable遇到错误,可以重新订阅它期望它能正常终止 允许重新订阅已结束的Observable

retryWhen

retryWhen 将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,retryWhen观察它的结果再决定是不是要重新订阅原始的Observable.如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止.

注意: retryWhen 默认在trampoline调度器上执行,你可以通过参数指定其它的调度器(调度器在文章最后有讲解)

retryWhen 的场景非常适合,网络请求或者其他错误重试,以及设置重试的次数.

我们通过一个例子来实现,网络请求重试的逻辑

首先设置重试的次数和等待时长

final int count = 3;//重试次数为3次
final int delay = 5000;//等待时长 重新链接的时间间隔
String cacheToken = null;

模拟网络请求,模拟第二次重试时网络请求成功

Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
Log.e(TAG, "subscribe: 1 - 请求网络");
//模拟网络请求
if (cacheToken == null) {//模拟网络请求失败
emitter.onError(new NullPointerException());
} else {
//模拟网络请求成功
emitter.onNext("网络请求成功");//当调用next 时不会走retryWhen
}
}
}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> observable) throws Exception {
return observable.zipWith(Observable.range(1, 1 + count), new BiFunction<Throwable, Integer, Warrp>() {
@Override
public Warrp apply(Throwable throwable, Integer integer) throws Exception {
Log.e(TAG, "apply: 合并重试次数和异常信息:" + integer);
return new Warrp(integer, throwable);
}
}).flatMap(new Function<Warrp, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Warrp warrp) throws Exception {
//收到异常信息 且重试次数小于设置的次数
if (warrp.throwable instanceof NullPointerException && (warrp.count < count + 1)) {
Log.e(TAG, "apply: 重试次数:" + warrp.count);
Log.e(TAG, "apply: 收到错误准备重试:" + delay + " ms后开始重试");
if (warrp.count == 2) {//模拟 第二次重试 网络请求正常了
cacheToken = "1212213";
}
//重新链接 请求网络
return Observable.timer(delay, TimeUnit.MILLISECONDS);
}
return Observable.error(warrp.throwable);
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.e(TAG, "accept: next - " + o);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "accept: error - " + throwable);
}
});

输出结果:

subscribe: 1 - 请求网络
apply: 合并重试次数和异常信息:1
apply: 重试次数:1
apply: 收到错误准备重试:5000 ms后开始重试
subscribe: 1 - 请求网络
apply: 合并重试次数和异常信息:2
apply: 重试次数:2
apply: 收到错误准备重试:5000 ms后开始重试
subscribe: 1 - 请求网络
accept: next - 请求网络

如此简单的写法就可以实现这种复杂的需求,RxJava确实是十分强大,如果我们不用RxJava实现,普通的写法会有一大堆的判断并且难以维护.

  • 常用的过滤操作符 filter/distinct/elementAt/all/contains 这些操作符使用很简单了解即可

filter 过滤小于2的数

Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate<Integer>() {//过滤器
@Override
public boolean test(Integer integer) throws Exception {// false 当前等事件不被处理 true 当前等事件处理
return integer > 2;
}
})

distince 过滤重复元素

Observable.just(1, 2, 3, 3, 3, 4, 45, 6, 6).distinct()

elementAt 获取指定元素 位置从0开始

Observable.just(1, 2, 3, 4).elementAt(2)

All 判断事件是否满足条件 如下代码 如果全部大于2则返回true

Observable.just(4, 3, 4, 5).all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 2;//是否全部大于 2
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println("aBoolean = [" + aBoolean + "]");
}
});

contain 判断事件是否包含 如果包含则返回true

Observable.just("abc", "a", "er").contains("a").subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println("aBoolean = [" + aBoolean + "]");
}
});
  • groupBy 将事件类型 转换为 结果类型

groupBy

Observable.just(1, 2, 3, 4, 5).groupBy(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
System.out.println("integer = [" + integer + "]");

//根据参数进行分类

return integer > 2 ? "A" : "B";
}
}).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
@Override
public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
//stringIntegerGroupedObservable 是被观察者
stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {// 通过观察者拿到数据
@Override
public void accept(Integer integer) throws Exception {
String key = stringIntegerGroupedObservable.getKey();
System.out.println("key--" + key + " | " + integer);

}
});
}
});

输出结果:

integer = [1]
key–B | 1
integer = [2]
key–B | 2
integer = [3]
key–A | 3
integer = [4]
key–A | 4
integer = [5]
key–A | 5

  • buffer 定期收集Observable的数据,放入一个数据包裹中,然后发射这些数据包裹而不是一次发射一个值.

buffer

使用场景
* 100000条数据插入到数据库
* 每一条数据产生都需要时间
* 如果产生一条 插入一条比较浪费时间 全部一次性插入用户等太久
* 采取buffer等形式 将100000 条 分成 一小段执行
Observable.just(1, 2, 3, 4, 5, 6).buffer(3).subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(List<Integer> integers) {
System.out.println("integers = [" + integers + "]");
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

输出结果:123 456

注意:如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

背压

背压主要用于并发处理,在Android中用的不多.背压就是:生产速度 与 消费速度的比值

// 背压策略
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
for (int i = 0; i < 10000000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER)
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("onComplete");
s.request(Integer.MAX_VALUE);//最大的处理能力 最大处理数
}

@Override
public void onNext(Object o) {
System.out.println("o = [" + o + "]");

}

@Override
public void onError(Throwable t) {
System.out.println("onComplete");
}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});

线程调度

线程调度,主要的作用就是线程切换.在Android中常用的场景就是从在子线程切换到主线程

subscribeOn是指Observable(被观察者)在哪个调度器(线程)上执行,于此搭配的还有unsubscribeOn.多次指定Observable发射事件的线程只有第一次指定有效.

observerOn是指observer(观察者)在哪个调度器(线程)上执行,在Android中多用于在Android主线程中执行.多次指定Observer(观察者)的线程,下游的线程就会切换一次.

subscribeOn

RxJava 中,已经内置了很多线程选项,常用的例如有:

  • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;
  • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作;
  • Schedulers.newThread() 代表一个常规的新线程;
  • AndroidSchedulers.mainThread() 代表Android的主线程

至此,RxJava 基本常用的操作符已经分析完毕,RxJava不仅可以简化项目中复杂的逻辑,同时提高了代码的维护性,我们更应该学习这种设计思想,分析RxJava的源码.而不止在于使用框架上,知其然,知其所以然.才能融汇贯通成长起来.

文章作者: JakePrim
文章链接: https://jakeprim.cn/2019/05/09/rxjava-1/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 JakePrim技术研究院
打赏