ReactiveX
一个使用可观察数据流进行异步编程的编程接口
Rx = Observables + LINQ + Schedulers
观察者模式 + 迭代器模式 + 函数式编程
operation
线程 scheduler
subject
Rx
模型
- 流程
1、定义一个方法,它完成某些任务,然后从异步调用中返回一个值,这个方法是观察者的一部分 2、将这个异步调用本身定义为一个Observable 3、观察者通过订阅(Subscribe)操作关联到那个Observable 4、继续你的业务逻辑,等方法返回时,Observable会发射结果,观察者的方法会开始处理结果或结果集 // defines, but does not invoke, the Subscriber's onNext handler // (in this example, the observer is very simple and has only an onNext handler) def myOnNext = { it -> do something useful with it }; // defines, but does not invoke, the Observable def myObservable = someObservable(itsParameters); // subscribes the Subscriber to the Observable, and invokes the Observable myObservable.subscribe(myOnNext); // go on about my business - 订阅 Subscribe()
将观察者连接到Observable
- 回调观察者方法
- onNext(T item)
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。
- onError(Exception ex)
当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。
- onComplete
正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
- onNext(T item)
- 取消订阅 Unsubscribe())
- 发射数据序列的时机
一个"热"的Observable可能一创建完就开始发射数据,因此所有后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)
一个"冷"的Observable会一直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。
在一些ReactiveX实现里,还存在一种被称作Connectable的Observable,不管有没有观察者订阅它,这种Observable都不会开始发射数据,除非Connect方法被调用。