W3Cschool
恭喜您成為首批注冊(cè)用戶
獲得88經(jīng)驗(yàn)值獎(jiǎng)勵(lì)
什么是主題?RxJS 主題是一種特殊的 Observable 類型,它允許將值多播到許多 Observer。普通的 Observable 是單播的(每個(gè)訂閱的 Observer 擁有 Observable 的獨(dú)立執(zhí)行),而 Subject 是多播的。
主題就像一個(gè)可觀察對(duì)象,但是可以多播到許多觀察者。主題就像 EventEmitters:它們維護(hù)著許多偵聽器的注冊(cè)表。
每個(gè)主題都是可觀察的。給定一個(gè)主題,您可以 subscribe
提供一個(gè)觀察者,該觀察者將開始正常接收值。從觀察者的角度來(lái)看,它無(wú)法確定觀察到的執(zhí)行是來(lái)自純單播觀察到的還是主題。
在主題內(nèi)部,subscribe
不調(diào)用傳遞值的新執(zhí)行。它將給定的觀察者簡(jiǎn)單地注冊(cè)到觀察者列表中,類似于 addListener
其他庫(kù)和語(yǔ)言中的正常工作方式。
每個(gè)主題都是觀察者。它與方法的對(duì)象 next(v)
,error(e)
和 complete()
。要將新值提供給主題,只需調(diào)用 next(theValue)
,它將被多播到注冊(cè)以監(jiān)聽主題的觀察者。
在下面的示例中,我們將兩個(gè)觀察者附加到一個(gè)主題,并將一些值提供給該主題:
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
由于主題是觀察者,因此這也意味著您可以將主題作為 subscribe
任何可觀察對(duì)象的參數(shù)提供,如以下示例所示:
import { Subject, from } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
const observable = from([1, 2, 3]);
observable.subscribe(subject); // You can subscribe providing a Subject
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
通過(guò)上面的方法,我們基本上只是通過(guò) Subject 將單播 Observable 執(zhí)行轉(zhuǎn)換為多播。這說(shuō)明了主題是如何將任何可觀察的執(zhí)行共享給多個(gè)觀察者的唯一方法。
還有的幾個(gè)特例 Subject
類型:BehaviorSubject
,ReplaySubject
,和AsyncSubject
。
“多播可觀察”通過(guò)可能有許多訂戶的主題傳遞通知,而普通的“單播可觀察”僅將通知發(fā)送給單個(gè)觀察者。
多播的 Observable在幕后使用一個(gè) Subject來(lái)使多個(gè) Observer看到相同的 Observable 執(zhí)行。
在幕后,這是 multicast
操作員的工作方式:觀察者訂閱了基礎(chǔ)主題,而主題訂閱了源 Observable。以下示例與之前的示例相似 observable.subscribe(subject)
:
import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';
const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();
multicast
返回一個(gè)看起來(lái)像普通 Observable 的 Observable,但是在訂閱時(shí)卻像 Subject 一樣工作。 multicast
返回一個(gè) ConnectableObservable
,它只是該 connect()
方法的 Observable 。
該 connect()
方法對(duì)于準(zhǔn)確確定共享 Observable 執(zhí)行何時(shí)開始非常重要。因?yàn)?connect()
確實(shí) source.subscribe(subject)
引擎蓋下,connect()
返回訂閱,這可以從以取消共享可觀測(cè)執(zhí)行取消訂閱。
connect()
手動(dòng)調(diào)用和處理訂閱通常很麻煩。通常,我們要在第一個(gè)觀察者到達(dá)時(shí)自動(dòng)連接,并在最后一個(gè)觀察者取消訂閱時(shí)自動(dòng)取消共享執(zhí)行。
請(qǐng)考慮以下示例,在此示例中,如列表所概述的那樣發(fā)生訂閱:
next
值 0
傳遞給第一位觀察者next
值 1
傳遞給第一位觀察者next
值 1
傳遞給第二個(gè)觀察者next
值 2
傳遞給第二個(gè)觀察者
為了實(shí)現(xiàn)對(duì)的顯式調(diào)用 connect()
,我們編寫了以下代碼:
import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';
const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);
setTimeout(() => {
subscription1.unsubscribe();
}, 1200);
// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);
如果我們希望避免顯式調(diào)用 connect()
,則可以使用 ConnectableObservable 的 refCount()
方法(引用計(jì)數(shù)),該方法返回一個(gè) Observable,該跟蹤可跟蹤其擁有的訂戶數(shù)量。當(dāng)訂戶數(shù)量從 0
增加到時(shí) 1
,它將要求 connect()
我們啟動(dòng)共享執(zhí)行。只有當(dāng)訂戶數(shù)量從減少 1
到 0
時(shí),才會(huì)完全取消訂閱,從而停止進(jìn)一步執(zhí)行。
refCount
使多播的 Observable 在第一個(gè)訂戶到達(dá)時(shí)自動(dòng)開始執(zhí)行,并在最后一個(gè)訂戶離開時(shí)停止執(zhí)行。
下面是一個(gè)示例:
import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';
const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
// Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed
該 refCount()
方法僅存在于 ConnectableObservable 上,并且返回 Observable
,而不是另一個(gè)ConnectableObservable。
Subject 的變體之一是 BehaviorSubject
,其概念為“當(dāng)前值”。它存儲(chǔ)了發(fā)給其使用者的最新值,并且每當(dāng)有新的 Observer 訂閱時(shí),它將立即從接收到“當(dāng)前值” BehaviorSubject
。
BehaviorSubjects 對(duì)于表示“隨時(shí)間變化的值”很有用。例如,生日的事件流是主題,而一個(gè)人的年齡流將是 BehaviorSubject。
在以下示例中,將使用 0
第一個(gè)觀察者訂閱時(shí)收到的值初始化 BehaviorSubject 。第二個(gè)觀察者2
即使2
在發(fā)送值后訂閱了該值,也可以接收該值。
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(3);
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
A ReplaySubject
與 a 相似 BehaviorSubject
,它可以將舊值發(fā)送給新的訂戶,但是它也可以記錄 Observable 執(zhí)行的一部分。
A ReplaySubject
記錄了來(lái)自 Observable 執(zhí)行的多個(gè)值,并將它們重放給新的訂戶。
創(chuàng)建時(shí) ReplaySubject
,您可以指定要重播的值:
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
除了緩沖區(qū)大小以外,您還可以指定窗口時(shí)間(以毫秒為單位),以確定記錄的值可以使用多長(zhǎng)時(shí)間。在下面的示例中,我們使用較大的緩沖區(qū)大小 100
,但是窗口時(shí)間參數(shù)僅為 500
毫秒。
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 1000);
// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...
AsyncSubject 是一個(gè)變體,其中只有 Observable 執(zhí)行的最后一個(gè)值發(fā)送到其觀察者,并且僅在執(zhí)行完成時(shí)發(fā)送。
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
subject.complete();
// Logs:
// observerA: 5
// observerB: 5
AsyncSubject 與 last()
運(yùn)算符類似,因?yàn)樗却?complete
通知以便傳遞單個(gè)值。
Copyright©2021 w3cschool編程獅|閩ICP備15016281號(hào)-3|閩公網(wǎng)安備35020302033924號(hào)
違法和不良信息舉報(bào)電話:173-0602-2364|舉報(bào)郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號(hào)
聯(lián)系方式:
更多建議: