W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗值獎勵
什么是主題?RxJS 主題是一種特殊的 Observable 類型,它允許將值多播到許多 Observer。普通的 Observable 是單播的(每個訂閱的 Observer 擁有 Observable 的獨立執(zhí)行),而 Subject 是多播的。
主題就像一個可觀察對象,但是可以多播到許多觀察者。主題就像 EventEmitters:它們維護著許多偵聽器的注冊表。
每個主題都是可觀察的。給定一個主題,您可以 subscribe
提供一個觀察者,該觀察者將開始正常接收值。從觀察者的角度來看,它無法確定觀察到的執(zhí)行是來自純單播觀察到的還是主題。
在主題內(nèi)部,subscribe
不調(diào)用傳遞值的新執(zhí)行。它將給定的觀察者簡單地注冊到觀察者列表中,類似于 addListener
其他庫和語言中的正常工作方式。
每個主題都是觀察者。它與方法的對象 next(v)
,error(e)
和 complete()
。要將新值提供給主題,只需調(diào)用 next(theValue)
,它將被多播到注冊以監(jiān)聽主題的觀察者。
在下面的示例中,我們將兩個觀察者附加到一個主題,并將一些值提供給該主題:
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
由于主題是觀察者,因此這也意味著您可以將主題作為 subscribe
任何可觀察對象的參數(shù)提供,如以下示例所示:
import { Subject, from } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
const observable = from([1, 2, 3]);
observable.subscribe(subject); // You can subscribe providing a Subject
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
通過上面的方法,我們基本上只是通過 Subject 將單播 Observable 執(zhí)行轉(zhuǎn)換為多播。這說明了主題是如何將任何可觀察的執(zhí)行共享給多個觀察者的唯一方法。
還有的幾個特例 Subject
類型:BehaviorSubject
,ReplaySubject
,和AsyncSubject
。
“多播可觀察”通過可能有許多訂戶的主題傳遞通知,而普通的“單播可觀察”僅將通知發(fā)送給單個觀察者。
多播的 Observable在幕后使用一個 Subject來使多個 Observer看到相同的 Observable 執(zhí)行。
在幕后,這是 multicast
操作員的工作方式:觀察者訂閱了基礎(chǔ)主題,而主題訂閱了源 Observable。以下示例與之前的示例相似 observable.subscribe(subject)
:
import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';
const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();
multicast
返回一個看起來像普通 Observable 的 Observable,但是在訂閱時卻像 Subject 一樣工作。 multicast
返回一個 ConnectableObservable
,它只是該 connect()
方法的 Observable 。
該 connect()
方法對于準確確定共享 Observable 執(zhí)行何時開始非常重要。因為 connect()
確實 source.subscribe(subject)
引擎蓋下,connect()
返回訂閱,這可以從以取消共享可觀測執(zhí)行取消訂閱。
connect()
手動調(diào)用和處理訂閱通常很麻煩。通常,我們要在第一個觀察者到達時自動連接,并在最后一個觀察者取消訂閱時自動取消共享執(zhí)行。
請考慮以下示例,在此示例中,如列表所概述的那樣發(fā)生訂閱:
next
值 0
傳遞給第一位觀察者next
值 1
傳遞給第一位觀察者next
值 1
傳遞給第二個觀察者next
值 2
傳遞給第二個觀察者
為了實現(xiàn)對的顯式調(diào)用 connect()
,我們編寫了以下代碼:
import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';
const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);
setTimeout(() => {
subscription1.unsubscribe();
}, 1200);
// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);
如果我們希望避免顯式調(diào)用 connect()
,則可以使用 ConnectableObservable 的 refCount()
方法(引用計數(shù)),該方法返回一個 Observable,該跟蹤可跟蹤其擁有的訂戶數(shù)量。當訂戶數(shù)量從 0
增加到時 1
,它將要求 connect()
我們啟動共享執(zhí)行。只有當訂戶數(shù)量從減少 1
到 0
時,才會完全取消訂閱,從而停止進一步執(zhí)行。
refCount
使多播的 Observable 在第一個訂戶到達時自動開始執(zhí)行,并在最后一個訂戶離開時停止執(zhí)行。
下面是一個示例:
import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';
const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
// Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed
該 refCount()
方法僅存在于 ConnectableObservable 上,并且返回 Observable
,而不是另一個ConnectableObservable。
Subject 的變體之一是 BehaviorSubject
,其概念為“當前值”。它存儲了發(fā)給其使用者的最新值,并且每當有新的 Observer 訂閱時,它將立即從接收到“當前值” BehaviorSubject
。
BehaviorSubjects 對于表示“隨時間變化的值”很有用。例如,生日的事件流是主題,而一個人的年齡流將是 BehaviorSubject。
在以下示例中,將使用 0
第一個觀察者訂閱時收到的值初始化 BehaviorSubject 。第二個觀察者2
即使2
在發(fā)送值后訂閱了該值,也可以接收該值。
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(3);
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
A ReplaySubject
與 a 相似 BehaviorSubject
,它可以將舊值發(fā)送給新的訂戶,但是它也可以記錄 Observable 執(zhí)行的一部分。
A ReplaySubject
記錄了來自 Observable 執(zhí)行的多個值,并將它們重放給新的訂戶。
創(chuàng)建時 ReplaySubject
,您可以指定要重播的值:
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
除了緩沖區(qū)大小以外,您還可以指定窗口時間(以毫秒為單位),以確定記錄的值可以使用多長時間。在下面的示例中,我們使用較大的緩沖區(qū)大小 100
,但是窗口時間參數(shù)僅為 500
毫秒。
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 1000);
// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...
AsyncSubject 是一個變體,其中只有 Observable 執(zhí)行的最后一個值發(fā)送到其觀察者,并且僅在執(zhí)行完成時發(fā)送。
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
subject.complete();
// Logs:
// observerA: 5
// observerB: 5
AsyncSubject 與 last()
運算符類似,因為它等待 complete
通知以便傳遞單個值。
Copyright©2021 w3cschool編程獅|閩ICP備15016281號-3|閩公網(wǎng)安備35020302033924號
違法和不良信息舉報電話:173-0602-2364|舉報郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號
聯(lián)系方式:
更多建議: