RxJava 2.x 使用详解(三) 过滤操作符

2017-06-28  |   Android   android, reactive-streams, rxjava

回到顶部

前序

  上篇说到RxJava创建操作符,这一节来说说过滤操作符,还记得上节中说到interval操作符如果没有其他限制的话就会无限发送onNext事件,永远也不会调用onComplete事件,如果需要限制的话,可以使用一些过滤操作符进行限制。

filter

  先说一个基本的过滤操作符filter,可以自己设定任意的规则来过滤数据,比如过滤出大于等于2的元素:

Flowable.just(1, 2, 3)
        .filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                //过滤出>=2的数据
                return integer >= 2;
            }
        })
        .subscribe(integer -> Log.i("tag", String.valueOf(integer)));

  上面的结果只会输出2和3,因为boolean test(T t)方法返回true则表示元素数据有效,否则则为无效抛弃。

take

  如果需要使用类似request(long)的方法来限制发射数据的数量,可以使用take操作符:

Flowable.interval(1, TimeUnit.SECONDS)
        .take(5)//只发射5个元素
        .subscribe(integer -> Log.i("tag", String.valueOf(integer)));

  take操作符可以采用时间过滤,例如过滤出5秒之内发送的数据:

Flowable.interval(1, TimeUnit.SECONDS)
        .take(5, TimeUnit.SECONDS)//5秒之内的数据(这里输出0,1,2,3)
        .subscribe(integer -> Log.i("tag", String.valueOf(integer)));

takeLast

  如果要筛选出最后几个元素的话可以使用takeLast操作符,比如选取最后3个元素:

Flowable.just(1, 2, 3, 4, 5)
        .takeLast(3)
        .subscribe(integer -> Log.i("tag", String.valueOf(integer)));

  也可以通过时间来筛选,比如筛选出最后三秒的数据:

Flowable.intervalRange(0, 10, 1, 1, TimeUnit.SECONDS)
        .takeLast(3, TimeUnit.SECONDS)//最后三秒发送的数据
        .subscribe(integer -> Log.i("tag", String.valueOf(integer)));

  另外使用takeLast操作符筛选时间,可以增加delayError参数(不传默认为false)takeLast(3, TimeUnit.SECONDS, true)来延迟筛选过程中接收到的error。

  使用takeLast要特别注意,首先元素数量是可数的,由于takeLast使用的是buffer,所以过滤后的数据会一次性发送(而不是按照例如intervalRange设定的方式发送),当然这里可以指定takeLast使用的bufferSize。

firstElement / lastElement

  如果需要选取第一个元素(允许为空),可以使用firstElement操作符:

Flowable.just(1,2,3)
        .firstElement()
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  同理,如果要选取最后一个元素(允许为空),可以使用lastElement操作符:

Flowable.just(1, 2, 3)
        .lastElement()
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

first / last

  如果要设置一个默认值(当被观察者不发射任何元素的时候)可以使用first操作符:

Flowable.empty()                                             
        .first(2)//这里的2是默认元素,非第二个元素                          
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  上述的代码将会输入“2”这个元素。

  同理如果要设置lastElement为空时发送的元素默认值,可以使用last操作符:

Flowable.empty()
        .last(3)//这里的3是默认发射元素,并非第三个元素
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  上述的代码将会输入“3”这个元素。

firstOrError / lastOrError

  上述说到的firstElement操作符,如果为空元素的时候不会发生任何异常,如果你需要在空的时候抛出异常,可以使用firstOrError操作符:

Flowable.empty()
        .firstOrError()
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  上述代码将会抛出异常。

  上述说到的lastElement操作符也是遇到空元素也是不会发生任何的异常,如果你需要在空的时候抛出异常,可以使用lastOrError操作符:

Flowable.empty()
        .lastOrError()
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  上述代码将会抛出异常。

elementAt / elementAtOrError

  如果需要指定发射第几个元素(注意这里的参数为索引值),可以使用elementAt操作符(支持越界)

Flowable.just("a","b","c")
        .elementAt(2)//指定索引为2的元素,如果不存在则直接完成
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  如果需要设置越界后发送的默认元素,可以添加额外默认值参数:

Flowable.just("a","b","c")
        .elementAt(4,"d")//指定索引为4的元素,如果不存在则发射“d”
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  如果希望越界后抛出异常,可以使用elementAtOrError操作符:

Flowable.just("a","b","c")
        .elementAtOrError(3)//指定索引值为3的元素,如果不存在则抛出异常
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

ofType

  假设你需要筛选特定类型的数据,可以采用ofType操作符:

Flowable.just("a", 1, 3, "b")
        .ofType(Integer.class)
        .subscribe(integer -> Log.i("tag", String.valueOf(integer)));

  上述代码只会输出1、3,其他元素被抛弃。

skip / skipLast

  如果需要跳过若干个元素,或者跳过一段时间,可以使用skip或者skipLast操作符。下面是跳过若干个元素的例子:

Flowable.just("a","b","c")
        .skip(1)
        .skipLast(1)
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  假设需要跳过一段时间,可以使用重载方法:

Flowable.intervalRange(0, 10, 1, 1, TimeUnit.SECONDS)
        .skip(3, TimeUnit.SECONDS)
        .skipLast(3, TimeUnit.SECONDS)
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

ignoreElements

  如果你不关心发送的元素,只关心onComplete和onError事件,可以使用ignoreElements操作符,他会把当前被观察者转换成Completable类型的被观察者:

Flowable.just("a","b","c")
        .ignoreElements()
        .subscribe(() -> Log.i("tag", "complete"));

distinct / distinctUntilChanged

  如果想过滤掉重复的元素,可以使用distinct操作符:

Flowable.just("a", "b", "c", "b", "b", "c")
        .distinct()
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  上述代码只会输出a、b、c三个元素。

  如果只需要过滤连续重复的元素,则可以使用distinctUntilChanged操作符:

Flowable.just("a", "b", "c", "b", "b", "c")
        .distinctUntilChanged()
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  上述代码会输出a、b、c、b、c这几个元素。

timeout

  如果需要过滤超时操作,可以使用timeout操作符:

Flowable.intervalRange(0, 10, 0, 2, TimeUnit.SECONDS)
        .timeout(1, TimeUnit.SECONDS)
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  上述代码输出0后超时,抛出异常。

Flowable.intervalRange(0, 10, 0, 2, TimeUnit.SECONDS)
        .timeout(1, TimeUnit.SECONDS,  Flowable.just(-1L))
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  上述代码先输出0,然后超时,使用自定义的Flowable输出-1。

throttleFirst

  如果你需要在一段时间内只响应第一次的操作,比如说一段时间内连续点击按钮只执行第一次的点击操作,throttleFirst操作符就可以满足这个需求,使用例子如下:

Flowable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
        .throttleFirst(1, TimeUnit.SECONDS)//每1秒中只处理第一个元素
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  上述结果为0、2、4、6、8,其中1、3、5、7、9都被过滤了。

throttleLast / sample

  除了throttleFirst,有对应的throttleLast操作符,它的功能和sample操作符相同,都是隔一段时间采集一个元素:

Flowable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
        .throttleLast(2, TimeUnit.SECONDS)//每2秒中采集最后一个元素
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

//等价于
Flowable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
        .sample(2, TimeUnit.SECONDS)//每2秒中采集最后一个元素
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  上述代码只会输出1、3、5、7,之后会直接触发onComplete事件。

throttleWithTimeout / debounce

  假设有一种即时显示搜索结果需求,要求一段时间用户不输入才响应请求搜索结果,这样的需求可以使用throttleWithTimeout操作符或者debounce操作符,举个例子:

Flowable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
        .throttleWithTimeout(2, TimeUnit.SECONDS)//2秒内有新数据则抛弃旧数据
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));
//等价于
Flowable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
        .debounce(2, TimeUnit.SECONDS)//2秒内有新数据则抛弃旧数据
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));

  上述例子中只会输出9这个元素,因为没当接收到一个元素的时候,会等待2秒,如果有新的元素发送,则抛弃旧的元素,使用新的元素,直到2秒过去或者没有新的数据(比如onComplete)。

尾声

  终于把过滤的操作符基本地说了一遍,其实每个操作符可能还有很多重载的形式,大家感兴趣的可以自己逐个试试,由于本人生活和工作上都越来越繁忙,所以以后的更新速度可能会放慢下来了。

相关文章

  RxJava 2.x 使用详解(一) 快速入门: https://maxwell-nc.github.io/android/rxjava2-1.html

  RxJava 2.x 使用详解(二) 创建操作符: https://maxwell-nc.github.io/android/rxjava2-2.html

  RxJava 2.x 使用详解(三) 过滤操作符: https://maxwell-nc.github.io/android/rxjava2-3.html

  RxJava 2.x 使用详解(四) 合并聚合操作符: https://maxwell-nc.github.io/android/rxjava2-4.html

  RxJava 2.x 使用详解(五) 条件操作符: https://maxwell-nc.github.io/android/rxjava2-5.html

  RxJava 2.x 使用详解(六) 变换操作符: https://maxwell-nc.github.io/android/rxjava2-6.html



原创文章,欢迎转载,请保留出处。有任何错误、疑问或者建议,欢迎指出。
请注明文章出自于:https://maxwell-nc.github.io/android/rxjava2-3.html

上一篇:RxJava 2.x 使用详解(二) 创建操作符
下一篇:RxJava 2.x 使用详解(四) 合并聚合操作符