2017-06-06 | Android android, reactive-streams, rxjava
最近打算写一篇完整的RxJava/RxAndroid 2.x使用详解,一方面网上没有找到比较完整的教程,完整的教程又不是面向基础读者,或者还停留在RxJava 1.x的版本中,一方面自己可以当做一个学习笔记,什么时候忘记了也可以快速查一下。
本文就不再讨论Rxjava 1.x和2.x两个版本有什么区别,关于这个可以参考官方wiki,可能现在还看不懂,没关系,可以直接学习2.x,我认为新的迟早要普及,旧版项目也会迁移到新的上。
另外本文尽量不使用Lambda表达式,方便读者理解。
首先知道什么是RxJava,Rx是ReactiveX的缩写,而ReactiveX是Reactive Extensions的缩写。RxJava顾名思义即使Java上的异步和基于事件响应式编程库。
RxJava基于观察者模式,主要有四个部分:观察者、被观察者、订阅、事件。这样说很难说的明白,我们马上举一个例子来说明。
首先根据上面说的,首先需要一个观察者,可以通过创建FlowableSubscriber,由于事件处理的数据类型不一样,FlowableSubscriber需要一个泛型来说明这个数据类型,这里假设我们要处理String类型数据,代码如下:
//创建观察者
FlowableSubscriber<String> subscriber = new FlowableSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
//订阅时候的操作
s.request(Long.MAX_VALUE);//请求多少事件,这里表示不限制
}
@Override
public void onNext(String s) {
//onNext事件处理
Log.i("tag", s);
}
@Override
public void onError(Throwable t) {
//onError事件处理
}
@Override
public void onComplete() {
//onComplete事件处理
}
};
可以看到观察者身上有onNext、onError、onComplete这几个事件,接下来我们需要一个被观察者。RxJava中需要使用create()
方法去创建:
//被观察者
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
//订阅观察者时的操作
e.onNext("test1");
e.onNext("test2");
e.onComplete();
}
}, BackpressureStrategy.BUFFER);
这里先忽略BackpressureStrategy是什么东西,后面会说到,被观察者执行各种操作,最后需要通过subscribe()
订阅观察者:
flowable.subscribe(subscriber);
值得注意的是一个被观察者可以订阅多个观察者。然后尝试运行上述代码,发现LogCat顺序输出"test1"和"test2",可以知道onNext是顺序执行的,一个正常的事件队列情况应该如下:
上面可以看到FlowableSubscriber中我们只关心onNext方法,其他方法如果我们我们不需要,那么可以用Consumer(Java 8读者注意导包)来作为观察者:
flowable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
//相当于onNext事件处理
Log.i("Consumer", s);
}
});
当我们查看subscribe()
的重载的时候会发现,如果你关心有多个重载方法,其中有一组用于简化观察者的:
subscribe(onNext) //即上面的例子
subscribe(onNext,onError)
subscribe(onNext,onError,onComplete)
subscribe(onNext,onError,onComplete,onSubscribe)
这里给出一个完整参数的例子,其他重载方法参考即可:
flowable.subscribe(
new Consumer<String>() {//相当于onNext
@Override
public void accept(String s) throws Exception {
}
}, new Consumer<Throwable>() {//相当于onError
@Override
public void accept(Throwable throwable) throws Exception {
}
}, new Action() {//相当于onComplete,注意这里是Action
@Override
public void run() throws Exception {
}
}, new Consumer<Subscription>() {//相当于onSubscribe
@Override
public void accept(Subscription subscription) throws Exception {
}
});
上面出现了一个Action,那么这个Action和Consumer是什么关系呢?实际上,这两个都是属于Actions:
可以看到onComplete方法在FlowableSubscriber中属于无参数方法,而其他属于单一参数方法,自然对应使用Action和Consumer。这时候你可能会产生一个疑问,假如有2个参数呢?3个参数呢?...这里就需要使用:
这里暂时没有使用到,后面用到再说明,就不再展开讨论了。
被观察者除了Flowable以外还有Observable,它的使用方法和Flowable大体相似,也是使用create()
创建:
//被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//订阅观察者时的操作
e.onNext("test1");
e.onNext("test2");
e.onComplete();
}
});//没有背压设置
由于Observable不支持订阅Subscriber观察者,需要使用Observer作为观察者,其实现方式和Subscriber大体相似:
//观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//订阅时候的操作,无需request
}
@Override
public void onNext(String s) {
//onNext事件处理
Log.i("observer", s);
}
@Override
public void onError(Throwable e) {
//onError事件处理
}
@Override
public void onComplete() {
//onComplete事件处理
}
};
订阅操作也是通过subscribe()
来操作:
observable.subscribe(observer);
也可以通过上文提到的Actions来简化观察者,其写法和Flowable的subscribe方法是完全一样的,这里就不再展开讨论了。
下面我们观察下上面两个例子,发现Observable和Flowable其中前者不需要背压(BackPressure)参数和请求资源(request)操作,其他都是大体相似的,那么他们两个有什么区别呢?分别什么时候用呢?
这两者区别十分明显,Observable不支持背压,而Flowable支持背压(背压是什么?后面再说,先明白区别)。关键是什么时候用呢,下面根据官方的建议:
使用Observable - 不超过1000个元素、随着时间流逝基本不会出现OOM - GUI事件或者1000Hz频率以下的元素 - 平台不支持Java Steam(Java8新特性) - Observable开销比Flowable低
使用Flowable - 超过10k+的元素(可以知道上限) - 读取硬盘操作(可以指定读取多少行) - 通过JDBC读取数据库 - 网络(流)IO操作
了解了Observable和Flowable的区别,我们还不知什么叫做背压,下面我们来简单了解下概念。所谓背压就是生产者(被观察者)的生产速度大于消费者(观察者)消费速度从而导致的问题。
举一个简单点的例子,如果被观察者快速发送消息,但是观察者处理消息的很缓慢,如果没有特定的流(Flow)控制,就会导致大量消息积压占用系统资源,最终导致十分缓慢。
怎么优化和减少这种情况后面再探讨,不过可以注意到,Flowable创建的时候已经设置了BackpressureStrategy,而且Subscriber使用了request来控制最大的流量。
如果你使用一个单一连续事件流,即只有一个onNext事件,接着就触发onComplete或者onError,这样你可以使用Single。
Single只包含两个事件,一个是正常处理成功的onSuccess,另一个是处理失败的onError,它只发送一次消息(当然就不存在背压问题),其中Single类似于Observable,其代码如下:
//被观察者
Single<String> single = Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> e) throws Exception {
e.onSuccess("test");
e.onSuccess("test2");//错误写法,重复调用也不会处理
}
});
//订阅观察者SingleObserver
single.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
//相当于onNext和onComplete
}
@Override
public void onError(Throwable e) {
}
});
Single也可以使用Actions来简化Observer,还记得BiConsumer这个双参数的Actions吗,正可以用于Single:
//订阅观察者BiConsumer
single.subscribe(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) throws Exception {
//onSuccess和onError操作都在这里处理
}
});
当然也有单一Actions的模式:
这里就不在展开了。另外Single也可以直接转换成Flowable或者Observable:
single.toFlowable();
single.toObservable();
其实他还以转成接下来要说的Completable等,也是使用toXxx的方法。转换之后就可以使用后者独有的方法,这里先不说,后面会有单独篇章详细说明。
如果你的观察者连onNext事件都不关心,你可以使用Completable,他只有onComplete和onError两个事件:
Completable.create(new CompletableOnSubscribe() {//被观察者
@Override
public void subscribe(CompletableEmitter e) throws Exception {
e.onComplete();//单一onComplete或者onError
}
}).subscribe(new CompletableObserver() {//观察者
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
}
});
同样也可以使用Actions来简化Observer:
要转换成其他类型的被观察者,也是可以使用toFlowable()
、toObservable()
等方法去转换。
如果你有一个需求是可能发送一个数据或者不会发送任何数据,这时候你就需要Maybe,它类似于Single和Completable的混合体。
Maybe可能会调用以下其中一种情况(也就是所谓的Maybe):
可以看到onSuccess和onComplete是互斥的存在,例子代码如下:
//被观察者
Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(MaybeEmitter<String> e) throws Exception {
e.onSuccess("test");//发送一个数据的情况,或者onError,不需要再调用onComplete(调用了也不会触发onComplete回调方法)
//e.onComplete();//不需要发送数据的情况,或者onError
}
});
//订阅观察者
maybe.subscribe(new MaybeObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
//发送一个数据时,相当于onNext和onComplete,但不会触发另一个方法onComplete
Log.i("tag", s);
}
@Override
public void onComplete() {
//无数据发送时候的onComplete事件
Log.i("tag", "onComplete");
}
@Override
public void onError(Throwable e) {
}
});
同样可以是用Actions来简化Observer:
要转换成其他类型的被观察者,也是可以使用toFlowable()
、toObservable()
等方法去转换。
通过上述文章,我们初步了解了RxJava的观察者和被观察者,利用Actions来简化观察者,各种观察者的区别和转换。
但是我们可以看到现阶段的被观察者即使只发送一个onComplete事件也需要一大段的代码,即使使用lambda表达式也显得有点臃肿,下一篇会介绍RxJava的操作符,帮助大家简化日常常用操作,比如通过一个列表创建一个被观察者,这里的话先留下一个悬念了。
RxJava 2.x 使用详解(一) 快速入门: https://maxwell-nc.github.io/android/rxjava2-1.html
RxJava 2.x 使用详解(二) 创建操作符: https://maxwell-nc.github.io/android/rxjava2-2.html
RxJava 2.x 使用详解(三) 过滤操作符: https://maxwell-nc.github.io/android/rxjava2-3.html
RxJava 2.x 使用详解(四) 合并聚合操作符: https://maxwell-nc.github.io/android/rxjava2-4.html
RxJava 2.x 使用详解(五) 条件操作符: https://maxwell-nc.github.io/android/rxjava2-5.html
RxJava 2.x 使用详解(六) 变换操作符: https://maxwell-nc.github.io/android/rxjava2-6.html
原创文章,欢迎转载,请保留出处。有任何错误、疑问或者建议,欢迎指出。
请注明文章出自于:https://maxwell-nc.github.io/android/rxjava2-1.html