RxJS 運算符

2020-09-25 17:12 更新

盡管 Observable 是基礎(chǔ),但 RxJS 對它的運算符最有用。運算符是使復(fù)雜的異步代碼易于以聲明的方式編寫的基本組成部分。

什么是運算符?

運算符就是功能。運算符有兩種:

可管道運算符是可以使用語法通過管道傳遞給 Observables 的類型 observableInstance.pipe(operator())。這些包括filter(...)mergeMap(...)。調(diào)用時,它們不會更改現(xiàn)有的 Observable 實例。相反,它們返回一個新的 Observable,其訂閱邏輯基于第一個 Observable。

管道運算符是一個將 Observable 作為其輸入并返回另一個 Observable 的函數(shù)。這是一個純粹的操作:以前的 Observable 保持不變。

管道運算符本質(zhì)上是一個純函數(shù),它將一個 Observable 用作輸入并生成另一個 Observable 作為輸出。訂閱輸出 Observable 也將訂閱輸入 Observable。

創(chuàng)建運算符是另一種運算符,可以稱為獨立函數(shù)來創(chuàng)建新的 Observable。例如:of(1, 2, 3)創(chuàng)建一個可觀察物體,該物體將依次發(fā)射 1、2 和 3。創(chuàng)建運算符將在后面的部分中詳細討論。

例如,被調(diào)用的運算符 map 類似于同名的 Array 方法。就像 [1, 2, 3].map(x => x * x)yield一樣[1, 4, 9],Observable 創(chuàng)建如下:

import { of } from 'rxjs';
import { map } from 'rxjs/operators';


map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));


// Logs:
// value: 1 
// value: 4
// value: 9

會散發(fā)出1,4,9。另一個有用的運算符是 first

import { of } from 'rxjs';
import { first } from 'rxjs/operators';


first()(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));


// Logs:
// value: 1

請注意,map邏輯上必須動態(tài)構(gòu)建,因為必須為其提供映射功能。相比之下,它 first 可能是一個常數(shù),但是仍然是動態(tài)構(gòu)建的。通常,無論是否需要參數(shù),都構(gòu)造所有運算符。

管道

Pipeable 運營商的功能,所以他們可以像使用普通的功能:op()(obs)-但在實踐中,往往是很多人一起卷積,并迅速成為不可讀:op4()(op3()(op2()(op1()(obs))))。因此,Observables 具有一種稱為的方法.pipe(),該方法可以完成相同的操作,但更易于閱讀:

obs.pipe(
  op1(),
  op2(),
  op3(),
  op3(),
)

從風(fēng)格上講 op()(obs),即使只有一個運算符,也不要使用。obs.pipe(op())是普遍首選的。

創(chuàng)建運算符

什么是創(chuàng)作運算符?與管道運算符不同,創(chuàng)建運算符是可用于創(chuàng)建具有某些常見預(yù)定義行為或通過加入其他 Observable 的 Observable 的函數(shù)。

創(chuàng)建運算符的典型示例是 interval 函數(shù)。它以數(shù)字(不是 Observable)作為輸入?yún)?shù),并產(chǎn)生 Observable 作為輸出:

import { interval } from 'rxjs';


const observable = interval(1000 /* number of milliseconds */);

在這里查看所有靜態(tài)創(chuàng)建運算符的列表。

高階可觀察物

可觀察對象最通常發(fā)出諸如字符串和數(shù)字之類的普通值,但令人驚訝的是,經(jīng)常需要處理可觀察對象可觀察對象,即所謂的高階可觀察對象。例如,假設(shè)您有一個Observable發(fā)射字符串,這些字符串是您想要查看的文件的URL。代碼可能看起來像這樣:

const fileObservable = urlObservable.pipe(
   map(url => http.get(url)),
);

http.get()為每個單獨的 URL 返回一個 Observable(可能是字符串或字符串?dāng)?shù)組)?,F(xiàn)在您有了一個 Observables Observables,一個更高階的 Observable。

但是如何處理高階 Observable?通常,通過展:通過(以某種方式)將高階 Observable 轉(zhuǎn)換為普通 Observable。例如:

const fileObservable = urlObservable.pipe(
   map(url => http.get(url)),
);

concatAll()操作者訂閱了各“內(nèi)部”可觀察所散發(fā)出來的“外”觀察的,和復(fù)制所有所發(fā)射的值,直到該可觀察完成,并繼續(xù)到下一個。所有值都以這種方式連接在一起。其他有用的扁平化運算符(稱為連接運算符)是

  • mergeAll() —訂閱每個內(nèi)部 Observable 的到達,然后在到達時發(fā)出每個值
  • switchAll() —在第一個內(nèi)部 Observable 到達時訂閱它,并在到達時發(fā)出每個值,但是在下一個內(nèi)部Observable到達時,取消訂閱前一個,并訂閱新值。
  • exhaust() —在第一個內(nèi)部 Observable 到達時訂閱它,并在到達時發(fā)出每個值,并丟棄所有新到達的內(nèi)部Observable,直到第一個完成時,然后等待下一個內(nèi)部Observable。

正如許多陣列庫結(jié)合 map()flat()(或 flatten())成一個單一的 flatMap(),也有全部的 RxJS 映射當(dāng)量壓扁運營商 concatMap()``mergeMap()``switchMap(),和 exhaustMap()。

大理石圖

為了解釋操作員的工作方式,文字描述通常是不夠的。許多操作員都與時間有關(guān),例如,他們可能以不同的方式延遲,采樣,節(jié)流或消除反跳值。圖通常是一個更好的工具。大理石圖是操作員如何工作的直觀表示,包括輸入的 Observable,操作員及其參數(shù)以及輸出的 Observable。

在大理石圖中,時間向右流動,并且該圖描述了如何在 Observable 執(zhí)行中發(fā)出值(“大理石”)。

您可以在下面看到大理石圖的解剖圖。

img

在整個文檔站點中,我們廣泛使用大理石圖來說明操作員的工作方式。它們在其他環(huán)境中也可能確實有用,例如在白板上,甚至在我們的單元測試中(如ASCII圖)。

運營商類別

存在用于不同目的的運算符,它們可以歸類為:創(chuàng)建,轉(zhuǎn)換,過濾,聯(lián)接,多播,錯誤處理,實用程序等。在下面的列表中,您將找到按類別組織的所有運算符。

有關(guān)完整概述,請參見參考頁。

創(chuàng)建運算符

  • ajax
  • bindCallback
  • bindNodeCallback
  • defer
  • empty
  • from
  • fromEvent
  • fromEventPattern
  • generate
  • interval
  • of
  • range
  • throwError
  • timer
  • iif

加入創(chuàng)作運營商

這些是 Observable 創(chuàng)建運算符,它們也具有聯(lián)接功能-發(fā)出多個源 Observable 的值。

  • combineLatest
  • concat
  • forkJoin
  • merge
  • partition
  • race
  • zip

轉(zhuǎn)型運營商

  • buffer
  • bufferCount
  • bufferTime
  • bufferToggle
  • bufferWhen
  • concatMap
  • concatMapTo
  • exhaust
  • exhaustMap
  • expand
  • groupBy
  • map
  • mapTo
  • mergeMap
  • mergeMapTo
  • mergeScan
  • pairwise
  • partition
  • pluck
  • scan
  • switchMap
  • switchMapTo
  • window
  • windowCount
  • windowTime
  • windowToggle
  • windowWhen

篩選運算符

  • audit
  • auditTime
  • debounce
  • debounceTime
  • distinct
  • distinctKey
  • distinctUntilChanged
  • distinctUntilKeyChanged
  • elementAt
  • filter
  • first
  • ignoreElements
  • last
  • sample
  • sampleTime
  • single
  • skip
  • skipLast
  • skipUntil
  • skipWhile
  • take
  • takeLast
  • takeUntil
  • takeWhile
  • throttle
  • throttleTime

加盟運營商

另請參見上面的“ 加入創(chuàng)建運算符”部分。

  • combineAll
  • concatAll
  • exhaust
  • mergeAll
  • startWith
  • withLatestFrom

組播運營商

  • multicast
  • publish
  • publishBehavior
  • publishLast
  • publishReplay
  • share

錯誤處理運算符

  • catchError
  • retry
  • retryWhen

公用事業(yè)運營商

  • tap
  • delay
  • delayWhen
  • dematerialize
  • materialize
  • observeOn
  • subscribeOn
  • timeInterval
  • timestamp
  • timeout
  • timeoutWith
  • toArray

條件運算符和布爾運算符

  • defaultIfEmpty
  • every
  • find
  • findIndex
  • isEmpty

數(shù)學(xué)運算符和聚合運算符

  • count
  • max
  • min
  • reduce

創(chuàng)建自定義運算符

使用該 pipe()函數(shù)創(chuàng)建新運算符

如果您的代碼中有一個常用的運算符序列,請使用該 pipe()函數(shù)將該序列提取到新的運算符中。即使序列不是那么常見,將其分解為單個運算符也可以提高可讀性。

例如,您可以創(chuàng)建一個將奇數(shù)值丟棄并且將偶數(shù)值加倍的函數(shù),如下所示:

import { pipe } from 'rxjs';
import { filter, map } from 'rxjs/operators';


function discardOddDoubleEven() {
  return pipe(
    filter(v => ! (v % 2)),
    map(v => v + v),
  );
}

(該 pipe()功能與.pipe()Observable上的方法類似,但不相同。)

從頭開始創(chuàng)建新的運算符

它更復(fù)雜,但是如果您必須編寫不能由現(xiàn)有運算符的組合構(gòu)成的運算符(這種情況很少發(fā)生),則可以使用 Observable 構(gòu)造函數(shù)從頭開始編寫運算符,如下所示:

import { Observable } from 'rxjs';


function delay(delayInMillis) {
  return (observable) => new Observable(observer => {
    // this function will called each time this
    // Observable is subscribed to.
    const allTimerIDs = new Set();
    const subscription = observable.subscribe({
      next(value) {
        const timerID = setTimeout(() => {
          observer.next(value);
          allTimerIDs.delete(timerID);
        }, delayInMillis);
        allTimerIDs.add(timerID);
      },
      error(err) {
        observer.error(err);
      },
      complete() {
        observer.complete();
      }
    });
    // the return value is the teardown function,
    // which will be invoked when the new
    // Observable is unsubscribed from.
    return () => {
      subscription.unsubscribe();
      allTimerIDs.forEach(timerID => {
        clearTimeout(timerID);
      });
    }
  });
}

請注意,您必須

  1. 實現(xiàn)所有三個觀察功能 next(),error()以及 complete()訂閱輸入可觀察的時候。
  2. 實現(xiàn)“刪除”功能,該功能在 Observable 完成時清除(在這種情況下,通過取消訂閱并清除所有待處理的超時)。
  3. 從傳遞給 Observable 構(gòu)造函數(shù)的函數(shù)中返回該拆解函數(shù)。

當(dāng)然,這僅是示例。該delay()運營商已經(jīng)存在。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號