RxJS 可觀察對象

2020-09-25 17:17 更新

可觀察值是多個(gè)值的惰性 Push 集合。它們填補(bǔ)了下表中的缺失點(diǎn):

Function Iterator
Promise Observable

例。以下是可觀察到的該按壓值 12,3 立即(同步)時(shí)所訂閱,并且該值 4 在一秒之后自從訂閱呼叫通過,則完成:

import { Observable } from 'rxjs';


const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

要調(diào)用 Observable 并查看這些值,我們需要訂閱它:

import { Observable } from 'rxjs';


const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});


console.log('just before subscribe');
observable.subscribe({
  next(x) { console.log('got value ' + x); },
  error(err) { console.error('something wrong occurred: ' + err); },
  complete() { console.log('done'); }
});
console.log('just after subscribe');

可以在控制臺(tái)上這樣執(zhí)行:

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

拉與推

PullPush 是兩種不同的協(xié)議,它們描述了數(shù)據(jù)生產(chǎn)者如何與數(shù)據(jù)使用者通信。

什么是拉力?在拉式系統(tǒng)中,使用者確定何時(shí)從數(shù)據(jù)生產(chǎn)者接收數(shù)據(jù)。生產(chǎn)者本身并不知道何時(shí)將數(shù)據(jù)傳遞給消費(fèi)者。

每個(gè) JavaScript 函數(shù)都是一個(gè) Pull 系統(tǒng)。該函數(shù)是數(shù)據(jù)的生產(chǎn)者,并且調(diào)用該函數(shù)的代碼通過從調(diào)用中“拉出” 單個(gè)返回值來使用它。

ES2015 引入了生成器函數(shù)和迭代器function*),這是另一種Pull系統(tǒng)。調(diào)用的代碼 iterator.next()是使用者,從迭代器(生產(chǎn)者)中“抽出” 多個(gè)值。

制片人 消費(fèi)者
被動(dòng):在需要時(shí)產(chǎn)生數(shù)據(jù)。 有效:決定何時(shí)請求數(shù)據(jù)。
活動(dòng):按自己的節(jié)奏生成數(shù)據(jù)。 被動(dòng):對收到的數(shù)據(jù)做出反應(yīng)。

什么是推?在推送系統(tǒng)中,生產(chǎn)者確定何時(shí)將數(shù)據(jù)發(fā)送給消費(fèi)者。消費(fèi)者不知道何時(shí)接收該數(shù)據(jù)。

承諾是當(dāng)今 JavaScript 中最常見的 Push 系統(tǒng)類型。Promise(生產(chǎn)者)將已解析的值傳遞給已注冊的回調(diào)(消費(fèi)者),但與函數(shù)不同的是,Promise 負(fù)責(zé)精確確定何時(shí)將該值“推送”到回調(diào)中。

RxJS 引入了 Observables,這是一個(gè)用于 JavaScript 的新 Push 系統(tǒng)。一個(gè) Observable 是多個(gè)值的生產(chǎn)者,將它們“推送”到觀察者(消費(fèi)者)。

  • 函數(shù)是一個(gè)懶惰地評估計(jì)算該同步返回上調(diào)用的單個(gè)值。
  • 發(fā)生器是一個(gè)懶惰地評估計(jì)算該同步返回零至(潛在地)無窮大的值上迭代。
  • 一個(gè)承諾是一個(gè)計(jì)算可能(也可能不會(huì))最終返回一個(gè)值。
  • 一個(gè)可觀察是懶洋洋地評估計(jì)算,并可以同步或異步從它開始被調(diào)用時(shí)返回零(潛在的)無限值。

可觀測值作為功能的概括

與流行的說法相反,Observables 不像 EventEmitters,也不像是多個(gè)值的 Promises。在某些情況下,即使用 RxJS 主題對可觀察對象進(jìn)行多播時(shí),可觀察對象的行為可能類似于 EventEmitters,但通常它們并不類似于 EventEmitters。

可觀察值就像帶有零參數(shù)的函數(shù),但是將其概括化以允許多個(gè)值。

考慮以下:

function foo() {
  console.log('Hello');
  return 42;
}


const x = foo.call(); // same as foo()
console.log(x);
const y = foo.call(); // same as foo()
console.log(y);

我們希望將其視為輸出:

"Hello"
42
"Hello"
42

您可以使用 Observables 編寫與上面相同的行為:

import { Observable } from 'rxjs';


const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
});


foo.subscribe(x => {
  console.log(x);
});
foo.subscribe(y => {
  console.log(y);
});

和輸出是相同的:

"Hello"
42
"Hello"
42

發(fā)生這種情況是因?yàn)楹瘮?shù)和 Observables 都是惰性計(jì)算。如果不調(diào)用該函數(shù),console.log('Hello')則不會(huì)發(fā)生。同樣在Observables 中,如果您不使用(用 subscribe)“調(diào)用”它,console.log('Hello')則不會(huì)發(fā)生。另外,“調(diào)用”或“訂閱”是一個(gè)獨(dú)立的操作:兩個(gè)函數(shù)調(diào)用觸發(fā)兩個(gè)單獨(dú)的副作用,兩個(gè) Observable 訂閱觸發(fā)兩個(gè)單獨(dú)的副作用。與 EventEmitter 具有共同的副作用并且無論訂閱者的存在如何都渴望執(zhí)行,與之相反,Observables 沒有共享的執(zhí)行并且很懶。

訂閱 Observable 類似于調(diào)用 Function。

有人聲稱 Observable 是異步的。那是不對的。如果在函數(shù)調(diào)用中包含日志,如下所示:

console.log('before');
console.log(foo.call());
console.log('after');

您將看到輸出:

"before"
"Hello"
42
"after"

這與 Observables 相同:

console.log('before');
foo.subscribe(x => {
  console.log(x);
});
console.log('after');

輸出為:

"before"
"Hello"
42
"after"

證明訂閱 foo 就像功能一樣完全同步。

可觀察對象能夠同步或異步傳遞值。

一個(gè) Observable 和一個(gè)函數(shù)有什么區(qū)別?可觀察對象可以隨著時(shí)間的推移“返回”多個(gè)值,而某些功能則不能。您不能這樣做:

function foo() {
  console.log('Hello');
  return 42;
  return 100; // dead code. will never happen
}

函數(shù)只能返回一個(gè)值。但是,可觀察對象可以做到這一點(diǎn):

import { Observable } from 'rxjs';


const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
  subscriber.next(100); // "return" another value
  subscriber.next(200); // "return" yet another
});


console.log('before');
foo.subscribe(x => {
  console.log(x);
});
console.log('after');

帶有同步輸出:

"before"
"Hello"
42
100
200
"after"

但是您也可以異步“返回”值:

import { Observable } from 'rxjs';


const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
  subscriber.next(100);
  subscriber.next(200);
  setTimeout(() => {
    subscriber.next(300); // happens asynchronously
  }, 1000);
});


console.log('before');
foo.subscribe(x => {
  console.log(x);
});
console.log('after');

輸出:

"before"
"Hello"
42
100
200
"after"
300

結(jié)論:

  • func.call()意思是“ 同步給我一個(gè)價(jià)值
  • observable.subscribe()意思是“ 給我同步或異步提供任意數(shù)量的值

可觀察的解剖

觀測量創(chuàng)建使用 new Observable 或產(chǎn)生算符,被訂閱為與觀察,執(zhí)行交付 next/ error/ complete通知給觀察者,和它們的執(zhí)行可能布置。這四個(gè)方面都在 Observable 實(shí)例中編碼,但是其中一些方面與其他類型(例如 Observer 和 Subscription)有關(guān)。

可觀察的核心問題:

  • 創(chuàng)建可觀察物
  • 訂閱可觀察物
  • 執(zhí)行可觀察的
  • 處置可觀察物

創(chuàng)建可觀察物

Observable構(gòu)造函數(shù)有一個(gè)參數(shù):subscribe 功能。

下面的示例創(chuàng)建一個(gè) Observable,以 'hi' 每秒將字符串發(fā)送給訂閱者。

import { Observable } from 'rxjs';


const observable = new Observable(function subscribe(subscriber) {
  const id = setInterval(() => {
    subscriber.next('hi')
  }, 1000);
});

可以使用創(chuàng)建可觀察對象 new Observable。最常見的是,使用創(chuàng)建的功能,如創(chuàng)建觀測of,frominterval,等。

在上面的示例中,subscribe 函數(shù)是描述 Observable 的最重要部分。讓我們看看訂閱的含義。

訂閱可觀察物

observable可以訂閱示例中的 Observable ,如下所示:

observable.subscribe(x => console.log(x));

這不是一個(gè)巧合 observable.subscribe,并 subscribenew Observable(function subscribe(subscriber) {...})具有相同的名稱。在庫中,它們是不同的,但是出于實(shí)際目的,您可以在概念上將它們視為相等。

這顯示了如何 subscribe在同一 Observable 的多個(gè) Observer 之間不共享調(diào)用。當(dāng) observable.subscribe 使用 Observer 進(jìn)行調(diào)用時(shí),subscribein 中的功能 new Observable(function subscribe(subscriber) {...})針對該給定的訂戶運(yùn)行。每次呼叫都會(huì) observable.subscribe 觸發(fā)給定用戶的獨(dú)立設(shè)置。

訂閱 Observable 就像調(diào)用一個(gè)函數(shù),提供將數(shù)據(jù)傳遞到的回調(diào)。

addEventListener/之類的事件處理程序 API 完全不同 removeEventListener。使用 observable.subscribe,給定的 Observer 不會(huì)在 Observable 中注冊為偵聽器。Observable 甚至不維護(hù)附加的 Observers 的列表。

一個(gè) subscribe 呼叫是簡單地啟動(dòng)一個(gè)“可觀察執(zhí)行”,并提供價(jià)值或事件到執(zhí)行的觀測方式。

執(zhí)行可觀察物

里面的代碼 new Observable(function subscribe(subscriber) {...})表示“可觀察的執(zhí)行”,這是一種惰性計(jì)算,僅對每個(gè)訂閱的觀察者發(fā)生。執(zhí)行會(huì)隨時(shí)間同步或異步產(chǎn)生多個(gè)值。

可觀察的執(zhí)行可以提供三種類型的值:

  • “下一個(gè)”通知:發(fā)送一個(gè)值,例如數(shù)字,字符串,對象等。
  • “錯(cuò)誤”通知:發(fā)送 JavaScript 錯(cuò)誤或異常。
  • “完成”通知:不發(fā)送值。

“下一個(gè)”通知是最重要和最常見的類型:它們表示正在傳遞給訂戶的實(shí)際數(shù)據(jù)?!板e(cuò)誤”和“完成”通知在可觀察的執(zhí)行期間只能發(fā)生一次,并且只能有一個(gè)。

這些約束最好以正則表達(dá)式形式寫在所謂的“ 可觀察語法”或“ 契約”中

next*(error|complete)?

在可觀察的執(zhí)行中,可能會(huì)傳遞零到無限的 Next 通知。如果傳遞了錯(cuò)誤或完成通知,則此后將無法傳遞其他任何東西。

以下是 Observable 執(zhí)行的示例,該示例傳遞三個(gè)Next 通知,然后完成:

import { Observable } from 'rxjs';


const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

Observable 嚴(yán)格遵守 Observable Contract,因此以下代碼不會(huì)傳遞 Next 通知4

import { Observable } from 'rxjs';


const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4); // Is not delivered because it would violate the contract
});

一個(gè)好主意是subscribetry/ catch塊包裝任何代碼,如果捕獲到異常,它們將傳遞錯(cuò)誤通知:

import { Observable } from 'rxjs';


const observable = new Observable(function subscribe(subscriber) {
  try {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
  } catch (err) {
    subscriber.error(err); // delivers an error if it caught one
  }
});

處置可觀察的執(zhí)行

因?yàn)榭捎^察的執(zhí)行可能是無限的,并且觀察者想在有限的時(shí)間內(nèi)中止執(zhí)行是很常見的,所以我們需要一個(gè) API 來取消執(zhí)行。由于每次執(zhí)行僅對一個(gè)觀察者專有,因此一旦觀察者完成接收值,它就必須有一種停止執(zhí)行的方式,以避免浪費(fèi)計(jì)算能力或內(nèi)存資源。

當(dāng) observable.subscribe 被調(diào)用時(shí),觀察員被連接到新創(chuàng)建的可觀察的執(zhí)行。此調(diào)用還返回一個(gè)對象 Subscription

const subscription = observable.subscribe(x => console.log(x));

訂閱代表正在進(jìn)行的執(zhí)行,并且具有最小的 API,可讓您取消該執(zhí)行。在 Subscription此處 閱讀有關(guān)類型的更多信息。隨著 subscription.unsubscribe()您可以取消正在進(jìn)行的執(zhí)行:

import { from } from 'rxjs';


const observable = from([10, 20, 30]);
const subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();

訂閱后,您將獲得一個(gè) Subscription,代表正在進(jìn)行的執(zhí)行。只需調(diào)用 unsubscribe()即可取消執(zhí)行。

當(dāng)我們使用創(chuàng)建 Observable 時(shí),每個(gè) Observable 必須定義如何處置該執(zhí)行的資源 create()。您可以通過 unsubscribe 從中返回自定義函數(shù)來實(shí)現(xiàn) function subscribe()。

例如,這就是我們清除帶有的間隔執(zhí)行集的方式 setInterval

const observable = new Observable(function subscribe(subscriber) {
  // Keep track of the interval resource
  const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);


  // Provide a way of canceling and disposing the interval resource
  return function unsubscribe() {
    clearInterval(intervalId);
  };
});

就像相似 observable.subscribe一樣 new Observable(function subscribe() {...}),unsubscribesubscribe概念上我們返回的等于 subscription.unsubscribe。實(shí)際上,如果刪除圍繞這些概念的 ReactiveX 類型,則會(huì)剩下相當(dāng)簡單的 JavaScript。

function subscribe(subscriber) {
  const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);


  return function unsubscribe() {
    clearInterval(intervalId);
  };
}


const unsubscribe = subscribe({next: (x) => console.log(x)});


// Later:
unsubscribe(); // dispose the resources

我們使用諸如 Observable,Observer 和 Subscription 之類的 Rx 類型的原因是為了獲得安全性(例如 Observable Contract)和與操作員的可組合性。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號