RxJava 2.x 使用详解(一) 快速入门

2017-06-06  |   Android   android, reactive-streams, rxjava

回到顶部

前序

  最近打算写一篇完整的RxJava/RxAndroid 2.x使用详解,一方面网上没有找到比较完整的教程,完整的教程又不是面向基础读者,或者还停留在RxJava 1.x的版本中,一方面自己可以当做一个学习笔记,什么时候忘记了也可以快速查一下。

  本文就不再讨论Rxjava 1.x和2.x两个版本有什么区别,关于这个可以参考官方wiki,可能现在还看不懂,没关系,可以直接学习2.x,我认为新的迟早要普及,旧版项目也会迁移到新的上。

  另外本文尽量不使用Lambda表达式,方便读者理解。

概念

  首先知道什么是RxJava,Rx是ReactiveX的缩写,而ReactiveX是Reactive Extensions的缩写。RxJava顾名思义即使Java上的异步和基于事件响应式编程库。

  RxJava基于观察者模式,主要有四个部分:观察者、被观察者、订阅、事件。这样说很难说的明白,我们马上举一个例子来说明。

Hello RxJava

  首先根据上面说的,首先需要一个观察者,可以通过创建FlowableSubscriber,由于事件处理的数据类型不一样,FlowableSubscriber需要一个泛型来说明这个数据类型,这里假设我们要处理String类型数据,代码如下:

//创建观察者
FlowableSubscriber<String> subscriber = new FlowableSubscriber<String>() {
    @Override
    public void onSubscribe(Subscription s) {
        //订阅时候的操作
        s.request(Long.MAX_VALUE);//请求多少事件,这里表示不限制
    }
    @Override
    public void onNext(String s) {
        //onNext事件处理
        Log.i("tag", s);
    }
    @Override
    public void onError(Throwable t) {
        //onError事件处理
    }
    @Override
    public void onComplete() {
        //onComplete事件处理
    }
};

  可以看到观察者身上有onNext、onError、onComplete这几个事件,接下来我们需要一个被观察者。RxJava中需要使用create()方法去创建:

//被观察者
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
    @Override
    public void subscribe(FlowableEmitter<String> e) throws Exception {
        //订阅观察者时的操作
        e.onNext("test1");
        e.onNext("test2");
        e.onComplete();
    }
}, BackpressureStrategy.BUFFER);

  这里先忽略BackpressureStrategy是什么东西,后面会说到,被观察者执行各种操作,最后需要通过subscribe()订阅观察者:

flowable.subscribe(subscriber);

  值得注意的是一个被观察者可以订阅多个观察者。然后尝试运行上述代码,发现LogCat顺序输出"test1"和"test2",可以知道onNext是顺序执行的,一个正常的事件队列情况应该如下:

Actions

  上面可以看到FlowableSubscriber中我们只关心onNext方法,其他方法如果我们我们不需要,那么可以用Consumer(Java 8读者注意导包)来作为观察者:

flowable.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) {
        //相当于onNext事件处理
        Log.i("Consumer", s);
    }
});

  当我们查看subscribe()的重载的时候会发现,如果你关心有多个重载方法,其中有一组用于简化观察者的:

subscribe(onNext)   //即上面的例子
subscribe(onNext,onError)
subscribe(onNext,onError,onComplete)
subscribe(onNext,onError,onComplete,onSubscribe)

  这里给出一个完整参数的例子,其他重载方法参考即可:

flowable.subscribe(
        new Consumer<String>() {//相当于onNext
            @Override
            public void accept(String s) throws Exception {
            }
        }, new Consumer<Throwable>() {//相当于onError
            @Override
            public void accept(Throwable throwable) throws Exception {
            }
        }, new Action() {//相当于onComplete,注意这里是Action
            @Override
            public void run() throws Exception {
            }
        }, new Consumer<Subscription>() {//相当于onSubscribe
            @Override
            public void accept(Subscription subscription) throws Exception {
            }
        });

  上面出现了一个Action,那么这个Action和Consumer是什么关系呢?实际上,这两个都是属于Actions

  可以看到onComplete方法在FlowableSubscriber中属于无参数方法,而其他属于单一参数方法,自然对应使用Action和Consumer。这时候你可能会产生一个疑问,假如有2个参数呢?3个参数呢?...这里就需要使用:

  这里暂时没有使用到,后面用到再说明,就不再展开讨论了。

Observable和Observer

  被观察者除了Flowable以外还有Observable,它的使用方法和Flowable大体相似,也是使用create()创建:

//被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        //订阅观察者时的操作
        e.onNext("test1");
        e.onNext("test2");
        e.onComplete();
    }
});//没有背压设置

  由于Observable不支持订阅Subscriber观察者,需要使用Observer作为观察者,其实现方式和Subscriber大体相似:

//观察者
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        //订阅时候的操作,无需request
    }
    @Override
    public void onNext(String s) {
        //onNext事件处理
        Log.i("observer", s);
    }
    @Override
    public void onError(Throwable e) {
        //onError事件处理
    }
    @Override
    public void onComplete() {
        //onComplete事件处理
    }
};

  订阅操作也是通过subscribe()来操作:

observable.subscribe(observer);

  也可以通过上文提到的Actions来简化观察者,其写法和Flowable的subscribe方法是完全一样的,这里就不再展开讨论了。

Observable和Flowable

  下面我们观察下上面两个例子,发现Observable和Flowable其中前者不需要背压(BackPressure)参数和请求资源(request)操作,其他都是大体相似的,那么他们两个有什么区别呢?分别什么时候用呢?

  这两者区别十分明显,Observable不支持背压,而Flowable支持背压(背压是什么?后面再说,先明白区别)。关键是什么时候用呢,下面根据官方的建议:

使用Observable - 不超过1000个元素、随着时间流逝基本不会出现OOM - GUI事件或者1000Hz频率以下的元素 - 平台不支持Java Steam(Java8新特性) - Observable开销比Flowable低

使用Flowable - 超过10k+的元素(可以知道上限) - 读取硬盘操作(可以指定读取多少行) - 通过JDBC读取数据库 - 网络(流)IO操作

BackPressure(背压)

  了解了Observable和Flowable的区别,我们还不知什么叫做背压,下面我们来简单了解下概念。所谓背压就是生产者(被观察者)的生产速度大于消费者(观察者)消费速度从而导致的问题。

  举一个简单点的例子,如果被观察者快速发送消息,但是观察者处理消息的很缓慢,如果没有特定的流(Flow)控制,就会导致大量消息积压占用系统资源,最终导致十分缓慢。

  怎么优化和减少这种情况后面再探讨,不过可以注意到,Flowable创建的时候已经设置了BackpressureStrategy,而且Subscriber使用了request来控制最大的流量。

Single和SingleObserver

  如果你使用一个单一连续事件流,即只有一个onNext事件,接着就触发onComplete或者onError,这样你可以使用Single。

  Single只包含两个事件,一个是正常处理成功的onSuccess,另一个是处理失败的onError,它只发送一次消息(当然就不存在背压问题),其中Single类似于Observable,其代码如下:

//被观察者
Single<String> single = Single.create(new SingleOnSubscribe<String>() {
    @Override
    public void subscribe(SingleEmitter<String> e) throws Exception {
        e.onSuccess("test");
        e.onSuccess("test2");//错误写法,重复调用也不会处理
    }
});

//订阅观察者SingleObserver
single.subscribe(new SingleObserver<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onSuccess(String s) {
        //相当于onNext和onComplete
    }

    @Override
    public void onError(Throwable e) {

    }
});

  Single也可以使用Actions来简化Observer,还记得BiConsumer这个双参数的Actions吗,正可以用于Single:

//订阅观察者BiConsumer
single.subscribe(new BiConsumer<String, Throwable>() {
    @Override
    public void accept(String s, Throwable throwable) throws Exception {
        //onSuccess和onError操作都在这里处理
    }
});

  当然也有单一Actions的模式:

  这里就不在展开了。另外Single也可以直接转换成Flowable或者Observable:

single.toFlowable();
single.toObservable();

  其实他还以转成接下来要说的Completable等,也是使用toXxx的方法。转换之后就可以使用后者独有的方法,这里先不说,后面会有单独篇章详细说明。

Completable和CompletableObserver

  如果你的观察者连onNext事件都不关心,你可以使用Completable,他只有onComplete和onError两个事件:

Completable.create(new CompletableOnSubscribe() {//被观察者

    @Override
    public void subscribe(CompletableEmitter e) throws Exception {
        e.onComplete();//单一onComplete或者onError
    }

}).subscribe(new CompletableObserver() {//观察者
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onComplete() {

    }

    @Override
    public void onError(Throwable e) {

    }
});

  同样也可以使用Actions来简化Observer:

  要转换成其他类型的被观察者,也是可以使用toFlowable()toObservable()等方法去转换。

Maybe和MaybeObserver

  如果你有一个需求是可能发送一个数据或者不会发送任何数据,这时候你就需要Maybe,它类似于Single和Completable的混合体。

  Maybe可能会调用以下其中一种情况(也就是所谓的Maybe):

  可以看到onSuccess和onComplete是互斥的存在,例子代码如下:

//被观察者
Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {
    @Override
    public void subscribe(MaybeEmitter<String> e) throws Exception {
        e.onSuccess("test");//发送一个数据的情况,或者onError,不需要再调用onComplete(调用了也不会触发onComplete回调方法)
        //e.onComplete();//不需要发送数据的情况,或者onError
    }
});

//订阅观察者
maybe.subscribe(new MaybeObserver<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onSuccess(String s) {
        //发送一个数据时,相当于onNext和onComplete,但不会触发另一个方法onComplete
        Log.i("tag", s);
    }

    @Override
    public void onComplete() {
        //无数据发送时候的onComplete事件
        Log.i("tag", "onComplete");
    }

    @Override
    public void onError(Throwable e) {

    }

});

  同样可以是用Actions来简化Observer:

  要转换成其他类型的被观察者,也是可以使用toFlowable()toObservable()等方法去转换。

尾声

  通过上述文章,我们初步了解了RxJava的观察者和被观察者,利用Actions来简化观察者,各种观察者的区别和转换。

  但是我们可以看到现阶段的被观察者即使只发送一个onComplete事件也需要一大段的代码,即使使用lambda表达式也显得有点臃肿,下一篇会介绍RxJava的操作符,帮助大家简化日常常用操作,比如通过一个列表创建一个被观察者,这里的话先留下一个悬念了。

相关文章

  RxJava 2.x 使用详解(一) 快速入门: https://maxwell-nc.github.io/android/rxjava2-1.html

  RxJava 2.x 使用详解(二) 创建操作符: https://maxwell-nc.github.io/android/rxjava2-2.html

  RxJava 2.x 使用详解(三) 过滤操作符: https://maxwell-nc.github.io/android/rxjava2-3.html

  RxJava 2.x 使用详解(四) 合并聚合操作符: https://maxwell-nc.github.io/android/rxjava2-4.html

  RxJava 2.x 使用详解(五) 条件操作符: https://maxwell-nc.github.io/android/rxjava2-5.html

  RxJava 2.x 使用详解(六) 变换操作符: https://maxwell-nc.github.io/android/rxjava2-6.html



原创文章,欢迎转载,请保留出处。有任何错误、疑问或者建议,欢迎指出。
请注明文章出自于:https://maxwell-nc.github.io/android/rxjava2-1.html

上一篇:Android中的Lambda表达式详解
下一篇:RxJava 2.x 使用详解(二) 创建操作符