在RxJS中,Observable和Observer的关系,就是前者在播放内容,后者
在收听内容。对于多播的可以理解为:一个数据流的内容被多个Observer订阅。
RxJS是?持?个Observable被多次subscribe的,
所以,RxJS?持多播,但是,表?上看到的是多播,实质上还是单播。
// 多播
const tick$ = Observable.interval(1000).pipe(take(3));
tick$.subscribe(value => console.log(‘observer 1: ‘ + value));
setTimeout(() => {
tick$.subscribe(value => console.log(‘observer 2: ‘ + value));
}, 2000);
运行结果:
observer 1: 0
observer 1: 1
observer 2: 0
observer 1: 2
observer 2: 1
observer 2: 2
??? 为啥是这个结果 因为 interval 这个操作符产?的是?个Cold Observable对象
所谓Cold Observable,就是每次被subscribe都产??个全新的数据序列的数据流。比如 interval, range
Hot Observable的数据流都在外部, 或者来自 promise、DOM、 EventEmitter。
真正的多播,必定是?论有多少Observer来subscribe,推给Observer的
都是?样的数据源,满?这种条件的,就是Hot Observable,因为Hot
Observable中的内容创建和订阅者?关。
在函数式编程的世界中,有一个要求,是保持不可变性。(Immutable)。所以, 把Cold Observable变成Hot Observable,不是改变Cold Observable本身, 而是产生一个新的Observable来包装之前的Cold Observable。
这样,新的 Observable就成了下游,想要hot数据的observer要订阅这个新的Observable,就OK了。
那么这个产生一个新的Observable来包装之前的Cold Observable这个功能就有subject这个中间人来实现,所以subject具有以下这些功能:
import {Subject} from ‘rxjs‘;
const subject = new Subject();
subject.subscribe({
next:(v)=>console.log(`A:${v}`)
})
subject.subscribe({
next:(v)=>console.log(`B:${v}`)
})
subject.next(1);
subject.next(2);
subject.complete();
用subject实现多播和使用
const tick$ = Observable.interval(1000).take(3);
const subject = new Subject();
tick$.subscribe(subject);
subject.subscribe(value => console.log(‘observer 1: ‘ + value));
setTimeout(() => {
subject.subscribe(value => console.log(‘observer 2: ‘ + value));
}, 1500);
makeHot操作符
Observable.prototype.makeHot = function () {
const cold$ = this;
const subject = new Subject();
cold$.subscribe(subject);
return Observable.create((observer) => subject.subscribe(observer));
}
const hotTick$ = Observable.interval(1000).take(3).makeHot();
hotTick$.subscribe(value => console.log(‘observer 1: ‘ + value));
setTimeout(() => {
hotTick$.subscribe(value => console.log(‘observer 2: ‘ + value));
}, 1500);
其他使用 Subject注意点:
// multicast操作符:多播的实现。 需要开启 multicasted.connect();
const source = from ([1,2,3]);
const subject1 = new Subject();
const multicasted = source.pipe(multicast(subject1));
multicasted.subscribe({
next:(v)=>console.log(`A:${v}`)
})
multicasted.subscribe({
next:(v)=>console.log(`B:${v}`)
})
multicasted.connect();
function shareSubjectFactory() {
return new Subject();
}
function share() {
return multicast.call(this, shareSubjectFactory).refCount();
}
function publish(selector) {
if (selector) {
return this.multicast(() => new Subject(), selector);
} else {
return this.multicast(new Subject());
}
// publish相当于封装了multicast和创建?个新Subject对象这两个动作,
// 让代码更加简洁,最终返回的是?个ConnectableObservable对象
multicast(new Subject())
原文:https://www.cnblogs.com/coppsing/p/12546326.html