RxJava操作符(一)Creating Observables
发布于 2015-12-11    911 次阅读
RxJava只是ReactiveX(Reactive Extensions)的一种java实现, ReactiveX是一种响应式扩展框架,有很多种实现,如RxAndroid, RxJS, RxSwift, RxRuby等等。RX采用一种类似于观察者的形式来实现各种功能,跟我们一般的写代码思路差别较大。刚开始接触可能会觉得难以理解,但是一旦掌握地话就会体会到其强大之处。其原理就是创建一个Observ...

RxJava只是ReactiveX(Reactive Extensions)的一种java实现, ReactiveX是一种响应式扩展框架,有很多种实现,如RxAndroid, RxJS, RxSwift, RxRuby等等。RX采用一种类似于观察者的形式来实现各种功能,跟我们一般的写代码思路差别较大。刚开始接触可能会觉得难以理解,但是一旦掌握地话就会体会到其强大之处。其原理就是创建一个Observable对象来干活,然后使用各种操作符建立起来的链式操作,就如同流水线一样把你想要处理的数据一步一步地加工成你想要的成品然后发射(emit)给Subscriber。

RxAndroid是对RxJava在Android上的扩展,如果你是做安卓开发的话,各种主线程和子线程的操作肯定会让你觉得头疼,RxAndroid可以很容易地解决你的这种困扰。为了方便测试和编译,本文的demo程序都是基于RxAdroid来实现的。

RX的强大就在其丰富的操作符,所以要灵活地使用RX的话就必须要掌握这些操作符,让我们首先来看一下如何创建Observable的操作符。

一、Create

Create是最基本的创建Observable的操作符,其原理图如下所示(本文中的原理图都使用了官网的图片)
create

创建一个Observable最重要的就是要和合适的时机调用Subscriber的onNext/onComplete/onError方法。onNext就是发射处理好的数据给Subscriber; onComplete用来告诉Subscriber所有的数据都已发射完毕;onError是在发生错误的时候发射一个Throwable对象给Subscriber。需要注意的一点就是Observable必须调用所有的Subscriber的onComplete方法并且只能调用一次,出错的时候调用onError方法也是一样的,并且一旦调用后就不能调用Subscriber的任何其他方法了。下面是Create操作符的使用:

private Observable<Integer> createObserver() {
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if (!subscriber.isUnsubscribed()) {
                    for (int i = 0; i < 5; i++) {
                        int temp = new Random().nextInt(10);
                        if (temp > 8) {
                            //if value>8, we make an error
                            subscriber.onError(new Throwable("value >8"));
                            break;
                        } else {
                            subscriber.onNext(temp);
                        }
                        // on error,complete the job
                        if (i == 4) {
                            subscriber.onCompleted();
                        }
                    }
                }
            }
        });
    }

在这个方法里,我们创建并返回了个Observable,这个Observable会产生5个小于10的随机数并且依次发射出去,如果随机数大于8,我们就认为是一个Error。下面是我们对这个Observable的使用:

mLButton.setOnClickListener(e -> createObserver().subscribe(new Subscriber<Integer>() {
           @Override
           public void onCompleted() {
               log("onComplete!");
           }

           @Override
           public void onError(Throwable e) {
               log("onError:" + e.getMessage());
           }

           @Override
           public void onNext(Integer integer) {
               log("onNext:" + integer);
           }
       }));
   }

当点击button的时候,我们就会建立一个Subscriber对象并将其注册给创建的Observable对象,然后接收其发射来的数据。测试的时候共点击了两次,第一次顺利发射完了5个数据,第二次在发射了2个数据后产生了错误。运行结果如下:

二、Range

Range操作符根据出入的初始值n和数目m发射一系列大于等于n的m个值

其使用也非常方便,仅仅制定初始值和数目就可以了,不用自己去实现对Subscriber的调用

private Observable<Integer> rangeObserver() {
        return Observable.range(10, 5);
}

对其订阅:

mRButton.setOnClickListener(e -> rangeObserver().subscribe(i -> log(i)));

运行结果输出了10-14的5个数:

三、Defer、Just

Defer操作符只有当有Subscriber来订阅的时候才会创建一个新的Observable对象,也就是说每次订阅都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的,其特点我们将在下面和Just进行对比理解。

Just操作符将某个对象转化为Observable对象,并且将其发射出去,可以使一个数字、一个字符串、数组、Iterate对象等,是一种非常快捷的创建Observable对象的方法,在以后的例子里会大量使用。

下面我们来分别使用defer和just创建一个Observable,来返回当前的毫秒数

private Observable<Long> DeferObserver() {
        return Observable.defer(() -> Observable.just(System.currentTimeMillis()));
    }
    
private Observable<Long> JustObserver() {
        return Observable.just(System.currentTimeMillis());
    }

分别对其订阅:

mLButton.setOnClickListener(e -> deferObservable.subscribe(time -> log("defer:" + time)));
mRButton.setOnClickListener(e -> justObservable.subscribe(time -> log("just:" + time)));

好了,来看一下运行结果吧,可以看到使用Defer操作符创建Observable对象每次调用我们都可以得到最新的的当前时间,而使用just只会返回同一个时间。

四、From

From操作符用来将某个对象转化为Observable对象,并且依次将其内容发射出去。这个类似于just,但是just会将这个对象整个发射出去。比如说一个含有10个数字的数组,使用from就会发射10次,每次发射一个数字,而使用just会发射一次来将整个的数组发射出去。

使用from创建两个Observable对象,来源分别是一个数组和list

private Observable<Integer> FromArray() {
    return Observable.from(arrays);
}

private Observable<Integer> FromIterable() {
    return Observable.from(list);
}

进行订阅

mLButton.setOnClickListener(e -> FromArray().subscribe(i -> log("FromArray:" + i)));
mRButton.setOnClickListener(e -> FromIterable().subscribe(i -> log("FromIterable:" + i)));

运行结果如下,可以看到数组和list中的数据被依次地发射出来。

五、Interval

Interval所创建的Observable对象会从0开始,每隔固定的时间发射一个数字。需要注意的是这个对象是运行在computation Scheduler,所以如果需要在view中显示结果,要在主线程中订阅。

使用interval创建一个Observable对象,其间隔为1秒钟。

private Observable<Long> interval() {
    return Observable.interval(1, TimeUnit.SECONDS)
    //interva operates by default on the computation Scheduler,so observe on main Thread
    observeOn(AndroidSchedulers.mainThread());
}

进行订阅和反订阅,反订阅后将不会再收到Observable发射来的数据。

Observable<Long> observable = interval();
Subscriber<Long> subscriber = new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        log("onCompleted" );
    }

    @Override
    public void onError(Throwable e) {
        log("onError:" + e.getMessage());
    }

    @Override
    public void onNext(Long aLong) {
        log("interval:" + aLong);
    }

};
mLButton.setText("Interval");
mRButton.setText("UnSubsCribe");
mLButton.setOnClickListener(e -> observable.subscribe(subscriber));
mRButton.setOnClickListener(e -> subscriber.unsubscribe());

运行结果

六、Repeat、Timer

Repeat会将一个Observable对象重复发射,我们可以指定其发射的次数

Timer会在指定时间后发射一个数字0,注意其也是运行在computation Scheduler

分别使用Repeat和Timer创建一个Observable对象:

private Observable<Integer> repeatObserver() {
    return Observable.just(1).repeat(5);
}

private Observable<Long> timerObserver() {
    //timer by default operates on the computation Scheduler
    return Observable.timer(1, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread());
}

进行订阅:

mLButton.setOnClickListener(e -> repeatObserver().subscribe(i -> log("repeat:" + i)));
mRButton.setOnClickListener(e -> timerObserver().subscribe(i -> log("timer:" + i)));

运行结果如下,可以看到Repeat创建的对象发射了5个1,Timer创建的对象在1秒钟后发射了一个0。

创建操作符还有Nerver/Empty/Throw等,非常简单但是我感觉可能用到的机会不多,就不细说了。

本文的demo程序见github

版权说明 : 本文为转载文章, 版权为原作者所有

原文标题 : RxJava操作符(一)Creating Observables

原文连接 : http://mushuichuan.com/2015/12/11/rxjava-operator-1/