一、Rx介绍
1. 可以把Observable当做Iterable的推送方式的等价物。
2. Observable类型给GOF的观察者模式添加了两种缺少的语义,这样就和Iterable类型中可用的操作一致了:
有了这两种功能,Rx就能使Observable与Iterable保持一致了,唯一的不同是数据流的方向。任何对Iterable的操作,你都可以对Observable使用。
1. 在异步模型中流程更像这样的:
2. 取消订阅的结果会传递给这个Observable的操作符链,而且会导致这个链条上的每个环节都停止发射数据项。这些并不保证会立即发生,然而,对一个Observable来说,即使没有观察者了,它也可以在一个while循环中继续生成并尝试发射数据项。
3. Observable什么时候开始发射数据序列?这取决于Observable的实现,一个"热"的Observable可能一创建完就开始发射数据,因此所有后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的Observable会一直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。
三、Single
1. Single类似于Observable,不同的是,它总是只发射一个值,或者一个错误通知,而不是发射一系列的值。
因此,不同于Observable需要三个方法onNext, onError, onCompleted,订阅Single只需要两个方法:
Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。
四、Subject
1. Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。
由于一个Observable订阅一个Observable,它可以触发这个Observable开始发射数据(如果那个Observable是"冷"的--就是说,它等待有订阅才开始发射数据)。因此有这样的效果,Subject可以把原来那个"冷"的Observable变成"热"的。
1. Schedulers.io(?):
用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io(?)默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器。
2. Schedulers.trampoline(?):当其它排队的任务完成后,在当前线程排队开始执行。
3. 可以用Scheduler.Worker调度你自己的任务:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {...});
4. Worker同时是Subscription,因此你可以(通常也应该)调用它的unsubscribe方法通知可以挂起任务和释放资源了:
worker.unsubscribe();
5. 延时和周期调度器:
你可以使用schedule(action,delayTime,timeUnit)在指定的调度器上延时执行你的任务;
schedulePeriodically(action,initialDelay,period,timeUnit)方法让你可以安排一个定期执行的任务。
1. FlatMap:可以认为是一个将嵌套的数据结构展开的过程。
2. Map:实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项。
3. Scan:
Scan
操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做accumulator(累加器)
。
4. Do:
doOnEach,doOnNext,doOnSubscribe,doOnUnsubscribe,doOnCompleted,doOnError,doOnTerminate,finallyDo;
5. ObserveOn:指定观察者观察Observable的调度程序(工作线程);
6. SubscribeOn:指定Observable应该在哪个调度程序上执行;
7. Subscribe:收到Observable发射的数据和通知后执行的操作;
8. Connect:指示一个可连接的Observable开始发射数据给订阅者;
9. Publish:将一个普通的Observable转换为可连接的;
10. RefCount:使一个可连接的Observable表现得像一个普通的Observable;
11. Replay:确保所有的观察者收到同样的数据序列,即使他们在Observable开始发射数据之后才订阅;
12. To:将Observable或者Observable发射的数据序列转换为另一个对象或数据结构;
13. Blocking:
BlockingObservable
的方法不是将一个Observable变换为另一个,也不是过滤Observables,它们会打断Observable的调用链,会阻塞等待直到Observable发射了想要的数据,然后返回这个数据(而不是一个Observable);
forEach( ),
first( ),
last( ),
getIterator( )
14. Unsubscribe:这个方法很重要,因为在subscribe()
之后, Observable
会持有 Subscriber
的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause()
onStop()
等方法中)调用 unsubscribe()
来解除引用关系,以避免内存泄露的发生。
原文:http://www.cnblogs.com/veins/p/4917534.html