RxJS 主題

2020-09-24 15:21 更新

什么是主題?RxJS 主題是一種特殊的 Observable 類型,它允許將值多播到許多 Observer。普通的 Observable 是單播的(每個訂閱的 Observer 擁有 Observable 的獨立執(zhí)行),而 Subject 是多播的。

主題就像一個可觀察對象,但是可以多播到許多觀察者。主題就像 EventEmitters:它們維護著許多偵聽器的注冊表。

每個主題都是可觀察的。給定一個主題,您可以 subscribe 提供一個觀察者,該觀察者將開始正常接收值。從觀察者的角度來看,它無法確定觀察到的執(zhí)行是來自純單播觀察到的還是主題。

在主題內(nèi)部,subscribe 不調(diào)用傳遞值的新執(zhí)行。它將給定的觀察者簡單地注冊到觀察者列表中,類似于 addListener 其他庫和語言中的正常工作方式。

每個主題都是觀察者。它與方法的對象 next(v),error(e)complete()。要將新值提供給主題,只需調(diào)用 next(theValue),它將被多播到注冊以監(jiān)聽主題的觀察者。

在下面的示例中,我們將兩個觀察者附加到一個主題,并將一些值提供給該主題:

import { Subject } from 'rxjs';


const subject = new Subject<number>();


subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});


subject.next(1);
subject.next(2);


// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

由于主題是觀察者,因此這也意味著您可以將主題作為 subscribe 任何可觀察對象的參數(shù)提供,如以下示例所示:

import { Subject, from } from 'rxjs';


const subject = new Subject<number>();


subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});


const observable = from([1, 2, 3]);


observable.subscribe(subject); // You can subscribe providing a Subject


// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

通過上面的方法,我們基本上只是通過 Subject 將單播 Observable 執(zhí)行轉(zhuǎn)換為多播。這說明了主題是如何將任何可觀察的執(zhí)行共享給多個觀察者的唯一方法。

還有的幾個特例 Subject 類型:BehaviorSubject,ReplaySubject,和AsyncSubject。

組播可觀察物

“多播可觀察”通過可能有許多訂戶的主題傳遞通知,而普通的“單播可觀察”僅將通知發(fā)送給單個觀察者。

多播的 Observable在幕后使用一個 Subject來使多個 Observer看到相同的 Observable 執(zhí)行。

在幕后,這是 multicast 操作員的工作方式:觀察者訂閱了基礎(chǔ)主題,而主題訂閱了源 Observable。以下示例與之前的示例相似 observable.subscribe(subject)

import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';


const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));


// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});


// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

multicast 返回一個看起來像普通 Observable 的 Observable,但是在訂閱時卻像 Subject 一樣工作。 multicast返回一個 ConnectableObservable,它只是該 connect()方法的 Observable 。

connect()方法對于準確確定共享 Observable 執(zhí)行何時開始非常重要。因為 connect()確實 source.subscribe(subject)引擎蓋下,connect()返回訂閱,這可以從以取消共享可觀測執(zhí)行取消訂閱。

參考計數(shù)

connect() 手動調(diào)用和處理訂閱通常很麻煩。通常,我們要在第一個觀察者到達時自動連接,并在最后一個觀察者取消訂閱時自動取消共享執(zhí)行。

請考慮以下示例,在此示例中,如列表所概述的那樣發(fā)生訂閱:

  1. 第一觀察員訂閱多播的 Observable
  2. 多播的 Observable 已連接
  3. next0 傳遞給第一位觀察者
  4. 第二觀察者訂閱多播的 Observable
  5. next1 傳遞給第一位觀察者
  6. next1 傳遞給第二個觀察者
  7. 第一觀察員從多播的 Observable 退訂
  8. next2 傳遞給第二個觀察者
  9. 第二觀察者從多播的 Observable 退訂
  10. 與多播 Observable 的連接已取消訂閱

為了實現(xiàn)對的顯式調(diào)用 connect(),我們編寫了以下代碼:

import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';


const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;


subscription1 = multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();


setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 600);


setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);


// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

如果我們希望避免顯式調(diào)用 connect(),則可以使用 ConnectableObservable 的 refCount()方法(引用計數(shù)),該方法返回一個 Observable,該跟蹤可跟蹤其擁有的訂戶數(shù)量。當訂戶數(shù)量從 0 增加到時 1,它將要求 connect()我們啟動共享執(zhí)行。只有當訂戶數(shù)量從減少 10 時,才會完全取消訂閱,從而停止進一步執(zhí)行。

refCount 使多播的 Observable 在第一個訂戶到達時自動開始執(zhí)行,并在最后一個訂戶離開時停止執(zhí)行。

下面是一個示例:

import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';


const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;


// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});


setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 600);


setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);


// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);


// Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed

refCount()方法僅存在于 ConnectableObservable 上,并且返回 Observable,而不是另一個ConnectableObservable。

行為主體

Subject 的變體之一是 BehaviorSubject,其概念為“當前值”。它存儲了發(fā)給其使用者的最新值,并且每當有新的 Observer 訂閱時,它將立即從接收到“當前值” BehaviorSubject

BehaviorSubjects 對于表示“隨時間變化的值”很有用。例如,生日的事件流是主題,而一個人的年齡流將是 BehaviorSubject。

在以下示例中,將使用 0第一個觀察者訂閱時收到的值初始化 BehaviorSubject 。第二個觀察者2即使2在發(fā)送值后訂閱了該值,也可以接收該值。

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value


subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});


subject.next(1);
subject.next(2);


subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});


subject.next(3);


// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

重播主題

A ReplaySubject 與 a 相似 BehaviorSubject,它可以將舊值發(fā)送給新的訂戶,但是它也可以記錄 Observable 執(zhí)行的一部分。

A ReplaySubject 記錄了來自 Observable 執(zhí)行的多個值,并將它們重放給新的訂戶。

創(chuàng)建時 ReplaySubject,您可以指定要重播的值:

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers


subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});


subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);


subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});


subject.next(5);


// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

除了緩沖區(qū)大小以外,您還可以指定窗口時間(以毫秒為單位),以確定記錄的值可以使用多長時間。在下面的示例中,我們使用較大的緩沖區(qū)大小 100,但是窗口時間參數(shù)僅為 500 毫秒。

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);


subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});


let i = 1;
setInterval(() => subject.next(i++), 200);


setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 1000);


// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

異步主題

AsyncSubject 是一個變體,其中只有 Observable 執(zhí)行的最后一個值發(fā)送到其觀察者,并且僅在執(zhí)行完成時發(fā)送。

import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();


subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});


subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);


subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});


subject.next(5);
subject.complete();


// Logs:
// observerA: 5
// observerB: 5

AsyncSubject 與 last()運算符類似,因為它等待 complete通知以便傳遞單個值。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號