RxJS排程器

2020-09-25 16:18 更新

什么是調(diào)度程序?調(diào)度程序控制何時(shí)開始訂閱以及何時(shí)傳遞通知。它由三個(gè)部分組成。

  • 調(diào)度程序是一種數(shù)據(jù)結(jié)構(gòu)。它知道如何根據(jù)優(yōu)先級(jí)或其他條件存儲(chǔ)和排隊(duì)任務(wù)。
  • 調(diào)度程序是一個(gè)執(zhí)行上下文。它表示執(zhí)行任務(wù)的位置和時(shí)間(例如,立即執(zhí)行或在其他回調(diào)機(jī)制中執(zhí)行,例如 setTimeout 或 process.nextTick 或動(dòng)畫幀)。
  • 調(diào)度程序具有(虛擬)時(shí)鐘。它通過(guò) now()調(diào)度程序上的 getter 方法提供“時(shí)間”的概念。在特定的調(diào)度程序上調(diào)度的任務(wù)將僅遵守該時(shí)鐘指示的時(shí)間。

調(diào)度程序使您可以定義 Observable 在哪個(gè)執(zhí)行上下文中將通知傳遞給其 Observer。

在下面的例子中,我們采取了一貫的簡(jiǎn)約可觀察到發(fā)射值 1,2,3同步,并使用運(yùn)營(yíng)商 observeOn指定的 async調(diào)度用于提供這些值。

在 Stackblitz 上查看

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';


const observable = new Observable((observer) => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
}).pipe(
  observeOn(asyncScheduler)
);


console.log('just before subscribe');
observable.subscribe({
  next(x) {
    console.log('got value ' + x)
  },
  error(err) {
    console.error('something wrong occurred: ' + err);
  },
  complete() {
     console.log('done');
  }
});
console.log('just after subscribe');

使用輸出執(zhí)行:

just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

請(qǐng)注意,之后的通知 got value...是如何傳遞的 just after subscribe,這與到目前為止我們看到的默認(rèn)行為不同。這是因?yàn)樵谧罱K觀察者 observeOn(asyncScheduler)之間引入了代理 new Observable觀察者。讓我們重命名一些標(biāo)識(shí)符,以使該區(qū)別在示例代碼中顯而易見:

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';


var observable = new Observable((proxyObserver) => {
  proxyObserver.next(1);
  proxyObserver.next(2);
  proxyObserver.next(3);
  proxyObserver.complete();
}).pipe(
  observeOn(asyncScheduler)
);


var finalObserver = {
  next(x) {
    console.log('got value ' + x)
  },
  error(err) {
    console.error('something wrong occurred: ' + err);
  },
  complete() {
     console.log('done');
  }
};


console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');

proxyObserver中創(chuàng)建 observeOn(asyncScheduler),其 next(val)功能大致如下:

const proxyObserver = {
  next(val) {
    asyncScheduler.schedule(
      (x) => finalObserver.next(x),
      0 /* delay */,
      val /* will be the x for the function above */
    );
  },


  // ...
}

async 計(jì)劃與經(jīng)營(yíng) setTimeout或者 setInterval,即使給定 delay為零。像往常一樣,在 JavaScript 中,setTimeout(fn, 0)已知 fn最早在下一個(gè)事件循環(huán)迭代時(shí)運(yùn)行該函數(shù)。這解釋了為什么 got value 1交付到finalObserver后來(lái) just after subscribe發(fā)生。

schedule()調(diào)度程序的方法帶有一個(gè) delay參數(shù),該參數(shù)指的是相對(duì)于調(diào)度程序自身內(nèi)部時(shí)鐘的時(shí)間量。調(diào)度程序的時(shí)鐘與實(shí)際的掛鐘時(shí)間沒(méi)有任何關(guān)系。像這樣的時(shí)間運(yùn)算符 delay不是按實(shí)際時(shí)間而是按調(diào)度程序的時(shí)鐘指示的時(shí)間進(jìn)行操作。這在測(cè)試中特別有用,在該測(cè)試中,虛擬時(shí)間調(diào)度程序可用于偽造壁鐘時(shí)間,而實(shí)際上卻是同步執(zhí)行調(diào)度的任務(wù)。

調(diào)度程序類型

async 計(jì)劃是一個(gè)內(nèi)置的 RxJS 提供調(diào)度??梢允褂?Scheduler 對(duì)象的靜態(tài)屬性來(lái)創(chuàng)建和返回每個(gè)對(duì)象。

排程器 目的
null 通過(guò)不傳遞任何調(diào)度程序,可以同步和遞歸地傳遞通知。將此用于恒定時(shí)間操作或尾部遞歸操作。
queueScheduler 在當(dāng)前事件框架中的隊(duì)列上進(jìn)行調(diào)度(蹦床調(diào)度程序)。使用它進(jìn)行迭代操作。
asapScheduler 微型任務(wù)隊(duì)列上的計(jì)劃,這與用于承諾的隊(duì)列相同?;旧显诋?dāng)前工作之后,但是在下一個(gè)工作之前。使用它進(jìn)行異步轉(zhuǎn)換。
asyncScheduler 時(shí)間表與配合使用 setInterval。將此用于基于時(shí)間的操作。
animationFrameScheduler 計(jì)劃將在下一次瀏覽器內(nèi)容重新繪制之前發(fā)生的任務(wù)。可用于創(chuàng)建流暢的瀏覽器動(dòng)畫。

使用調(diào)度程序

您可能已經(jīng)在 RxJS 代碼中使用了調(diào)度程序,而沒(méi)有明確說(shuō)明要使用的調(diào)度程序的類型。這是因?yàn)樗刑幚聿l(fā)的 Observable 運(yùn)算符都有可選的調(diào)度程序。如果不提供調(diào)度程序,則 RxJS 將使用最小并發(fā)原理選擇默認(rèn)調(diào)度程序。這意味著將選擇調(diào)度程序,該調(diào)度程序?qū)⒁胱钌俚牟l(fā)量以滿足操作員的需求。例如,對(duì)于返回有限且少量消息的 Observable 的運(yùn)算符,RxJS 不使用 Scheduler,即 nullor undefined。對(duì)于返回大量或無(wú)限數(shù)量消息的操作員,使用 queueScheduler。對(duì)于使用計(jì)時(shí)器的操作員,async 將使用。

由于 RxJS 使用最少的并發(fā)調(diào)度程序,因此如果您出于性能目的引入并發(fā),則可以選擇其他調(diào)度程序。要指定特定的調(diào)度程序,可以使用采用調(diào)度程序的那些運(yùn)算符方法,例如 from([10, 20, 30], asyncScheduler)

靜態(tài)創(chuàng)建運(yùn)算符通常將 Scheduler 作為參數(shù)。例如,from(array, scheduler)讓您指定在傳遞從轉(zhuǎn)換的每個(gè)通知時(shí)要使用的 Scheduler array。它通常是運(yùn)算符的最后一個(gè)參數(shù)。以下靜態(tài)創(chuàng)建運(yùn)算符采用 Scheduler 參數(shù):

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

使用 subscribeOn預(yù)定計(jì)劃在什么情況下會(huì)在 subscribe()呼叫發(fā)生。默認(rèn)情況下,subscribe()對(duì) Observable 的調(diào)用將立即同步進(jìn)行。但是,您可以使用 instance operator subscribeOn(scheduler)(其中scheduler 提供的參數(shù))來(lái)延遲或調(diào)度實(shí)際訂閱在給定 Scheduler 上發(fā)生。

使用 observeOn預(yù)定計(jì)劃在什么情況下會(huì)通知交付。正如我們?cè)谏厦娴氖纠锌吹降哪菢?,?shí)例運(yùn)算符 observeOn(scheduler)在源 Observable 和目標(biāo) Observer 之間引入了一個(gè)中介者 Observer,其中中介者使用您的給定調(diào)度對(duì)目標(biāo) Observer 的調(diào)用 scheduler

實(shí)例運(yùn)算符可以將Scheduler作為參數(shù)。

時(shí)間相關(guān)的運(yùn)營(yíng)商如 bufferTime,debounceTime,delayauditTime,sampleTime,throttleTime,timeInterval,timeout,timeoutWith,windowTime 都將一個(gè)調(diào)度程序作為最后一個(gè)參數(shù),否則默認(rèn)情況下的操作 asyncScheduler。

其他實(shí)例操作符采取調(diào)度作為參數(shù):cachecombineLatest,concat,expandmerge,publishReplaystartWith。

注意兩個(gè)cachepublishReplay 接受調(diào)度,因?yàn)樗麄兝?ReplaySubject。ReplaySubjects 的構(gòu)造函數(shù)將可選的 Scheduler 作為最后一個(gè)參數(shù),因?yàn)?ReplaySubject 可能會(huì)處理時(shí)間,這僅在 Scheduler 的上下文中才有意義。默認(rèn)情況下,ReplaySubject 使用 queue 調(diào)度程序提供時(shí)鐘。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)