RxJS 主題

2020-09-24 15:21 更新

什么是主題?RxJS 主題是一種特殊的 Observable 類型,它允許將值多播到許多 Observer。普通的 Observable 是單播的(每個(gè)訂閱的 Observer 擁有 Observable 的獨(dú)立執(zhí)行),而 Subject 是多播的。

主題就像一個(gè)可觀察對(duì)象,但是可以多播到許多觀察者。主題就像 EventEmitters:它們維護(hù)著許多偵聽器的注冊(cè)表。

每個(gè)主題都是可觀察的。給定一個(gè)主題,您可以 subscribe 提供一個(gè)觀察者,該觀察者將開始正常接收值。從觀察者的角度來(lái)看,它無(wú)法確定觀察到的執(zhí)行是來(lái)自純單播觀察到的還是主題。

在主題內(nèi)部,subscribe 不調(diào)用傳遞值的新執(zhí)行。它將給定的觀察者簡(jiǎn)單地注冊(cè)到觀察者列表中,類似于 addListener 其他庫(kù)和語(yǔ)言中的正常工作方式。

每個(gè)主題都是觀察者。它與方法的對(duì)象 next(v),error(e)complete()。要將新值提供給主題,只需調(diào)用 next(theValue),它將被多播到注冊(cè)以監(jiān)聽主題的觀察者。

在下面的示例中,我們將兩個(gè)觀察者附加到一個(gè)主題,并將一些值提供給該主題:

import { Subject } from 'rxjs';


const subject = new Subject<number>();


subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});


subject.next(1);
subject.next(2);


// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

由于主題是觀察者,因此這也意味著您可以將主題作為 subscribe 任何可觀察對(duì)象的參數(shù)提供,如以下示例所示:

import { Subject, from } from 'rxjs';


const subject = new Subject<number>();


subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});


const observable = from([1, 2, 3]);


observable.subscribe(subject); // You can subscribe providing a Subject


// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

通過(guò)上面的方法,我們基本上只是通過(guò) Subject 將單播 Observable 執(zhí)行轉(zhuǎn)換為多播。這說(shuō)明了主題是如何將任何可觀察的執(zhí)行共享給多個(gè)觀察者的唯一方法。

還有的幾個(gè)特例 Subject 類型:BehaviorSubject,ReplaySubject,和AsyncSubject

組播可觀察物

“多播可觀察”通過(guò)可能有許多訂戶的主題傳遞通知,而普通的“單播可觀察”僅將通知發(fā)送給單個(gè)觀察者。

多播的 Observable在幕后使用一個(gè) Subject來(lái)使多個(gè) Observer看到相同的 Observable 執(zhí)行。

在幕后,這是 multicast 操作員的工作方式:觀察者訂閱了基礎(chǔ)主題,而主題訂閱了源 Observable。以下示例與之前的示例相似 observable.subscribe(subject)

import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';


const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));


// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});


// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

multicast 返回一個(gè)看起來(lái)像普通 Observable 的 Observable,但是在訂閱時(shí)卻像 Subject 一樣工作。 multicast返回一個(gè) ConnectableObservable,它只是該 connect()方法的 Observable 。

connect()方法對(duì)于準(zhǔn)確確定共享 Observable 執(zhí)行何時(shí)開始非常重要。因?yàn)?connect()確實(shí) source.subscribe(subject)引擎蓋下,connect()返回訂閱,這可以從以取消共享可觀測(cè)執(zhí)行取消訂閱。

參考計(jì)數(shù)

connect() 手動(dòng)調(diào)用和處理訂閱通常很麻煩。通常,我們要在第一個(gè)觀察者到達(dá)時(shí)自動(dòng)連接,并在最后一個(gè)觀察者取消訂閱時(shí)自動(dòng)取消共享執(zhí)行。

請(qǐng)考慮以下示例,在此示例中,如列表所概述的那樣發(fā)生訂閱:

  1. 第一觀察員訂閱多播的 Observable
  2. 多播的 Observable 已連接
  3. next0 傳遞給第一位觀察者
  4. 第二觀察者訂閱多播的 Observable
  5. next1 傳遞給第一位觀察者
  6. next1 傳遞給第二個(gè)觀察者
  7. 第一觀察員從多播的 Observable 退訂
  8. next2 傳遞給第二個(gè)觀察者
  9. 第二觀察者從多播的 Observable 退訂
  10. 與多播 Observable 的連接已取消訂閱

為了實(shí)現(xiàn)對(duì)的顯式調(diào)用 connect(),我們編寫了以下代碼:

import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';


const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;


subscription1 = multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();


setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 600);


setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);


// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

如果我們希望避免顯式調(diào)用 connect(),則可以使用 ConnectableObservable 的 refCount()方法(引用計(jì)數(shù)),該方法返回一個(gè) Observable,該跟蹤可跟蹤其擁有的訂戶數(shù)量。當(dāng)訂戶數(shù)量從 0 增加到時(shí) 1,它將要求 connect()我們啟動(dòng)共享執(zhí)行。只有當(dāng)訂戶數(shù)量從減少 10 時(shí),才會(huì)完全取消訂閱,從而停止進(jìn)一步執(zhí)行。

refCount 使多播的 Observable 在第一個(gè)訂戶到達(dá)時(shí)自動(dòng)開始執(zhí)行,并在最后一個(gè)訂戶離開時(shí)停止執(zhí)行。

下面是一個(gè)示例:

import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';


const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;


// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});


setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 600);


setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);


// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);


// Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed

refCount()方法僅存在于 ConnectableObservable 上,并且返回 Observable,而不是另一個(gè)ConnectableObservable。

行為主體

Subject 的變體之一是 BehaviorSubject,其概念為“當(dāng)前值”。它存儲(chǔ)了發(fā)給其使用者的最新值,并且每當(dāng)有新的 Observer 訂閱時(shí),它將立即從接收到“當(dāng)前值” BehaviorSubject。

BehaviorSubjects 對(duì)于表示“隨時(shí)間變化的值”很有用。例如,生日的事件流是主題,而一個(gè)人的年齡流將是 BehaviorSubject。

在以下示例中,將使用 0第一個(gè)觀察者訂閱時(shí)收到的值初始化 BehaviorSubject 。第二個(gè)觀察者2即使2在發(fā)送值后訂閱了該值,也可以接收該值。

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value


subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});


subject.next(1);
subject.next(2);


subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});


subject.next(3);


// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

重播主題

A ReplaySubject 與 a 相似 BehaviorSubject,它可以將舊值發(fā)送給新的訂戶,但是它也可以記錄 Observable 執(zhí)行的一部分。

A ReplaySubject 記錄了來(lái)自 Observable 執(zhí)行的多個(gè)值,并將它們重放給新的訂戶。

創(chuàng)建時(shí) ReplaySubject,您可以指定要重播的值:

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers


subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});


subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);


subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});


subject.next(5);


// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

除了緩沖區(qū)大小以外,您還可以指定窗口時(shí)間(以毫秒為單位),以確定記錄的值可以使用多長(zhǎng)時(shí)間。在下面的示例中,我們使用較大的緩沖區(qū)大小 100,但是窗口時(shí)間參數(shù)僅為 500 毫秒。

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);


subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});


let i = 1;
setInterval(() => subject.next(i++), 200);


setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 1000);


// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

異步主題

AsyncSubject 是一個(gè)變體,其中只有 Observable 執(zhí)行的最后一個(gè)值發(fā)送到其觀察者,并且僅在執(zhí)行完成時(shí)發(fā)送。

import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();


subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});


subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);


subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});


subject.next(5);
subject.complete();


// Logs:
// observerA: 5
// observerB: 5

AsyncSubject 與 last()運(yùn)算符類似,因?yàn)樗却?complete通知以便傳遞單個(gè)值。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)