首页 > Web开发 > 详细

RxJS中的多播

时间:2020-03-22 16:21:44      阅读:76      评论:0      收藏:0      [点我收藏+]

目标

  1. 什么是多播
  2. Hot和Cold 数据流差异
  3. 理解RxJS中的Subject
  4. 支持多播的操作符
  5. 了解高级多播功能

1. 什么是多播

在RxJS中,Observable和Observer的关系,就是前者在播放内容,后者
在收听内容。对于多播的可以理解为:一个数据流的内容被多个Observer订阅。

技术分享图片

RxJS是?持?个Observable被多次subscribe的,
所以,RxJS?持多播,但是,表?上看到的是多播,实质上还是单播。

// 多播
const tick$ = Observable.interval(1000).pipe(take(3));
tick$.subscribe(value => console.log(‘observer 1: ‘ + value));
setTimeout(() => {
tick$.subscribe(value => console.log(‘observer 2: ‘ + value));
}, 2000);

运行结果:

observer 1: 0
observer 1: 1
observer 2: 0
observer 1: 2
observer 2: 1
observer 2: 2

??? 为啥是这个结果 因为 interval 这个操作符产?的是?个Cold Observable对象

2. Hot和Cold数据流差异

  1. 所谓Cold Observable,就是每次被subscribe都产??个全新的数据序列的数据流。比如 interval, range

  2. Hot Observable的数据流都在外部, 或者来自 promise、DOM、 EventEmitter。

真正的多播,必定是?论有多少Observer来subscribe,推给Observer的
都是?样的数据源,满?这种条件的,就是Hot Observable,因为Hot
Observable中的内容创建和订阅者?关。

技术分享图片

3. subject

在函数式编程的世界中,有一个要求,是保持不可变性。(Immutable)。所以, 把Cold Observable变成Hot Observable,不是改变Cold Observable本身, 而是产生一个新的Observable来包装之前的Cold Observable。
这样,新的 Observable就成了下游,想要hot数据的observer要订阅这个新的Observable,就OK了。

那么这个产生一个新的Observable来包装之前的Cold Observable这个功能就有subject这个中间人来实现,所以subject具有以下这些功能:

  1. 提供subscribe方法,让其他人能够订阅自己的数据流。 (Observable)
  2. 接受推送的数据,包括 cold Observable 推送的数据 (observer)
 import {Subject} from ‘rxjs‘;

 const subject = new Subject();

 subject.subscribe({
     next:(v)=>console.log(`A:${v}`)
 })

 subject.subscribe({
    next:(v)=>console.log(`B:${v}`)
})

subject.next(1);
subject.next(2);
subject.complete();

用subject实现多播和使用

const tick$ = Observable.interval(1000).take(3);
const subject = new Subject();
tick$.subscribe(subject);

subject.subscribe(value => console.log(‘observer 1: ‘ + value));

setTimeout(() => {
subject.subscribe(value => console.log(‘observer 2: ‘ + value));
}, 1500);

makeHot操作符

Observable.prototype.makeHot = function () {
const cold$ = this;
const subject = new Subject();
cold$.subscribe(subject);
return Observable.create((observer) => subject.subscribe(observer));
}


const hotTick$ = Observable.interval(1000).take(3).makeHot();
hotTick$.subscribe(value => console.log(‘observer 1: ‘ + value));
setTimeout(() => {
hotTick$.subscribe(value => console.log(‘observer 2: ‘ + value));
}, 1500);

其他使用 Subject注意点:

  1. Subject不能重复使?
  2. Subject可以有多个上游
  3. Subject的错误处理

4.支持多播的操作符

  1. multicast : 实例操作符,能够以上游的Observable为数据源产??个新的Hot Observable对象。
// multicast操作符:多播的实现。 需要开启 multicasted.connect();
const source = from ([1,2,3]);
const subject1 = new Subject();

const multicasted = source.pipe(multicast(subject1));

multicasted.subscribe({
    next:(v)=>console.log(`A:${v}`)
})
multicasted.subscribe({
    next:(v)=>console.log(`B:${v}`)
})

multicasted.connect();
  1. share
function shareSubjectFactory() {
    return new Subject();
}
function share() {
    return multicast.call(this, shareSubjectFactory).refCount();
}
  1. publish
function publish(selector) {
    if (selector) {
        return this.multicast(() => new Subject(), selector);
    } else {
        return this.multicast(new Subject());
    }
// publish相当于封装了multicast和创建?个新Subject对象这两个动作,
// 让代码更加简洁,最终返回的是?个ConnectableObservable对象
multicast(new Subject())

5.?级多播功能

  1. publishLast
  2. publishReplay
  3. publishBehavior

RxJS中的多播

原文:https://www.cnblogs.com/coppsing/p/12546326.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!