Angular9 Observable 概覽

2020-07-02 14:49 更新

使用可觀察對(duì)象(Observable)來(lái)傳遞值

可觀察對(duì)象對(duì)在應(yīng)用的各個(gè)部分之間傳遞消息提供了支持。 它們?cè)?Angular 中頻繁使用,并且推薦把它們用于事件處理、異步編程以及處理多個(gè)值等場(chǎng)景。

觀察者(Observer)模式是一個(gè)軟件設(shè)計(jì)模式,它有一個(gè)對(duì)象,稱(chēng)之為主體 Subject,負(fù)責(zé)維護(hù)一個(gè)依賴(lài)項(xiàng)(稱(chēng)之為觀察者 Observer)的列表,并且在狀態(tài)變化時(shí)自動(dòng)通知它們。 該模式和發(fā)布/訂閱模式非常相似(但不完全一樣)。

可觀察對(duì)象是聲明式的 —— 也就是說(shuō),雖然你定義了一個(gè)用于發(fā)布值的函數(shù),但是在有消費(fèi)者訂閱它之前,這個(gè)函數(shù)并不會(huì)實(shí)際執(zhí)行。 訂閱之后,當(dāng)這個(gè)函數(shù)執(zhí)行完或取消訂閱時(shí),訂閱者就會(huì)收到通知。

可觀察對(duì)象可以發(fā)送多個(gè)任意類(lèi)型的值 —— 字面量、消息、事件。無(wú)論這些值是同步發(fā)送的還是異步發(fā)送的,接收這些值的 API 都是一樣的。 由于準(zhǔn)備(setup)和清場(chǎng)(teardown)的邏輯都是由可觀察對(duì)象自己處理的,因此你的應(yīng)用代碼只管訂閱并消費(fèi)這些值就可以了,做完之后,取消訂閱。無(wú)論這個(gè)流是擊鍵流、HTTP 響應(yīng)流還是定時(shí)器,對(duì)這些值進(jìn)行監(jiān)聽(tīng)和停止監(jiān)聽(tīng)的接口都是一樣的。

由于這些優(yōu)點(diǎn),可觀察對(duì)象在 Angular 中得到廣泛使用,也同樣建議應(yīng)用開(kāi)發(fā)者好好使用它。

基本用法和詞匯

作為發(fā)布者,你創(chuàng)建一個(gè) Observable 的實(shí)例,其中定義了一個(gè)訂閱者(subscriber)函數(shù)。 當(dāng)有消費(fèi)者調(diào)用 subscribe() 方法時(shí),這個(gè)函數(shù)就會(huì)執(zhí)行。 訂閱者函數(shù)用于定義“如何獲取或生成那些要發(fā)布的值或消息”。

要執(zhí)行所創(chuàng)建的可觀察對(duì)象,并開(kāi)始從中接收通知,你就要調(diào)用它的 subscribe() 方法,并傳入一個(gè)觀察者(observer)。 這是一個(gè) JavaScript 對(duì)象,它定義了你收到的這些消息的處理器(handler)。 subscribe() 調(diào)用會(huì)返回一個(gè) Subscription 對(duì)象,該對(duì)象具有一個(gè) unsubscribe() 方法。 當(dāng)調(diào)用該方法時(shí),你就會(huì)停止接收通知。

下面這個(gè)例子中示范了這種基本用法,它展示了如何使用可觀察對(duì)象來(lái)對(duì)當(dāng)前地理位置進(jìn)行更新。

//Observe geolocation updates


// Create an Observable that will start listening to geolocation updates
// when a consumer subscribes.
const locations = new Observable((observer) => {
  let watchId: number;


  // Simple geolocation API check provides values to publish
  if ('geolocation' in navigator) {
    watchId = navigator.geolocation.watchPosition((position: Position) => {
      observer.next(position);
    }, (error: PositionError) => {
      observer.error(error);
    });
  } else {
    observer.error('Geolocation not available');
  }


  // When the consumer unsubscribes, clean up data ready for next subscription.
  return {
    unsubscribe() {
      navigator.geolocation.clearWatch(watchId);
    }
  };
});


// Call subscribe() to start listening for updates.
const locationsSubscription = locations.subscribe({
  next(position) {
    console.log('Current Position: ', position);
  },
  error(msg) {
    console.log('Error Getting Location: ', msg);
  }
});


// Stop listening for location after 10 seconds
setTimeout(() => {
  locationsSubscription.unsubscribe();
}, 10000);

定義觀察者

用于接收可觀察對(duì)象通知的處理器要實(shí)現(xiàn) Observer 接口。這個(gè)對(duì)象定義了一些回調(diào)函數(shù)來(lái)處理可觀察對(duì)象可能會(huì)發(fā)來(lái)的三種通知:

通知類(lèi)型 說(shuō)明
next 必要。用來(lái)處理每個(gè)送達(dá)值。在開(kāi)始執(zhí)行后可能執(zhí)行零次或多次。
error 可選。用來(lái)處理錯(cuò)誤通知。錯(cuò)誤會(huì)中斷這個(gè)可觀察對(duì)象實(shí)例的執(zhí)行過(guò)程。
complete 可選。用來(lái)處理執(zhí)行完畢(complete)通知。當(dāng)執(zhí)行完畢后,這些值就會(huì)繼續(xù)傳給下一個(gè)處理器。

觀察者對(duì)象可以定義這三種處理器的任意組合。如果你不為某種通知類(lèi)型提供處理器,這個(gè)觀察者就會(huì)忽略相應(yīng)類(lèi)型的通知。

訂閱

只有當(dāng)有人訂閱 Observable 的實(shí)例時(shí),它才會(huì)開(kāi)始發(fā)布值。 訂閱時(shí)要先調(diào)用該實(shí)例的 subscribe() 方法,并把一個(gè)觀察者對(duì)象傳給它,用來(lái)接收通知。

為了展示訂閱的原理,我們需要?jiǎng)?chuàng)建新的可觀察對(duì)象。它有一個(gè)構(gòu)造函數(shù)可以用來(lái)創(chuàng)建新實(shí)例,但是為了更簡(jiǎn)明,也可以使用 Observable 上定義的一些靜態(tài)方法來(lái)創(chuàng)建一些常用的簡(jiǎn)單可觀察對(duì)象:

- `of(...items)` —— 返回一個(gè) `Observable` 實(shí)例,它用同步的方式把參數(shù)中提供的這些值發(fā)送出來(lái)。

- `from(iterable)` —— 把它的參數(shù)轉(zhuǎn)換成一個(gè) `Observable` 實(shí)例。 該方法通常用于把一個(gè)數(shù)組轉(zhuǎn)換成一個(gè)(發(fā)送多個(gè)值的)可觀察對(duì)象。

下面的例子會(huì)創(chuàng)建并訂閱一個(gè)簡(jiǎn)單的可觀察對(duì)象,它的觀察者會(huì)把接收到的消息記錄到控制臺(tái)中:

//Subscribe using observer


// Create simple observable that emits three values
const myObservable = of(1, 2, 3);


// Create observer object
const myObserver = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};


// Execute with the observer object
myObservable.subscribe(myObserver);
// Logs:
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification

另外,subscribe() 方法還可以接收定義在同一行中的回調(diào)函數(shù),無(wú)論 next、error 還是 complete 處理器。比如,下面的 subscribe() 調(diào)用和前面指定預(yù)定義觀察者的例子是等價(jià)的。

//Subscribe with positional arguments


myObservable.subscribe(
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
);

無(wú)論哪種情況,next 處理器都是必要的,而 errorcomplete 處理器是可選的。

注意,next() 函數(shù)可以接受消息字符串、事件對(duì)象、數(shù)字值或各種結(jié)構(gòu),具體類(lèi)型取決于上下文。 為了更通用一點(diǎn),我們把由可觀察對(duì)象發(fā)布出來(lái)的數(shù)據(jù)統(tǒng)稱(chēng)為流。任何類(lèi)型的值都可以表示為可觀察對(duì)象,而這些值會(huì)被發(fā)布為一個(gè)流。

創(chuàng)建可觀察對(duì)象

使用 Observable 構(gòu)造函數(shù)可以創(chuàng)建任何類(lèi)型的可觀察流。 當(dāng)執(zhí)行可觀察對(duì)象的 subscribe() 方法時(shí),這個(gè)構(gòu)造函數(shù)就會(huì)把它接收到的參數(shù)作為訂閱函數(shù)來(lái)運(yùn)行。 訂閱函數(shù)會(huì)接收一個(gè) Observer 對(duì)象,并把值發(fā)布給觀察者的 next() 方法。

比如,要?jiǎng)?chuàng)建一個(gè)與前面的 of(1, 2, 3) 等價(jià)的可觀察對(duì)象,你可以這樣做:

//Create observable with constructor


// This function runs when subscribe() is called
function sequenceSubscriber(observer) {
  // synchronously deliver 1, 2, and 3, then complete
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();


  // unsubscribe function doesn't need to do anything in this
  // because values are delivered synchronously
  return {unsubscribe() {}};
}


// Create a new Observable that will deliver the above sequence
const sequence = new Observable(sequenceSubscriber);


// execute the Observable and print the result of each notification
sequence.subscribe({
  next(num) { console.log(num); },
  complete() { console.log('Finished sequence'); }
});


// Logs:
// 1
// 2
// 3
// Finished sequence

如果要略微加強(qiáng)這個(gè)例子,我們可以創(chuàng)建一個(gè)用來(lái)發(fā)布事件的可觀察對(duì)象。在這個(gè)例子中,訂閱函數(shù)是用內(nèi)聯(lián)方式定義的。

//Create with custom fromEvent function


function fromEvent(target, eventName) {
  return new Observable((observer) => {
    const handler = (e) => observer.next(e);


    // Add the event handler to the target
    target.addEventListener(eventName, handler);


    return () => {
      // Detach the event handler from the target
      target.removeEventListener(eventName, handler);
    };
  });
}

現(xiàn)在,你就可以使用這個(gè)函數(shù)來(lái)創(chuàng)建可發(fā)布 keydown 事件的可觀察對(duì)象了:

//Use custom fromEvent function


const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;


const subscription = fromEvent(nameInput, 'keydown')
  .subscribe((e: KeyboardEvent) => {
    if (e.keyCode === ESC_KEY) {
      nameInput.value = '';
    }
  });

多播

典型的可觀察對(duì)象會(huì)為每一個(gè)觀察者創(chuàng)建一次新的、獨(dú)立的執(zhí)行。 當(dāng)觀察者進(jìn)行訂閱時(shí),該可觀察對(duì)象會(huì)連上一個(gè)事件處理器,并且向那個(gè)觀察者發(fā)送一些值。當(dāng)?shù)诙€(gè)觀察者訂閱時(shí),這個(gè)可觀察對(duì)象就會(huì)連上一個(gè)新的事件處理器,并獨(dú)立執(zhí)行一次,把這些值發(fā)送給第二個(gè)可觀察對(duì)象。

有時(shí)候,不應(yīng)該對(duì)每一個(gè)訂閱者都獨(dú)立執(zhí)行一次,你可能會(huì)希望每次訂閱都得到同一批值 —— 即使是那些你已經(jīng)發(fā)送過(guò)的。這在某些情況下有用,比如用來(lái)發(fā)送 document 上的點(diǎn)擊事件的可觀察對(duì)象。

多播用來(lái)讓可觀察對(duì)象在一次執(zhí)行中同時(shí)廣播給多個(gè)訂閱者。借助支持多播的可觀察對(duì)象,你不必注冊(cè)多個(gè)監(jiān)聽(tīng)器,而是復(fù)用第一個(gè)(next)監(jiān)聽(tīng)器,并且把值發(fā)送給各個(gè)訂閱者。

當(dāng)創(chuàng)建可觀察對(duì)象時(shí),你要決定你希望別人怎么用這個(gè)對(duì)象以及是否對(duì)它的值進(jìn)行多播。

來(lái)看一個(gè)從 1 到 3 進(jìn)行計(jì)數(shù)的例子,它每發(fā)出一個(gè)數(shù)字就會(huì)等待 1 秒。

//Create a delayed sequence


function sequenceSubscriber(observer) {
  const seq = [1, 2, 3];
  let timeoutId;


  // Will run through an array of numbers, emitting one value
  // per second until it gets to the end of the array.
  function doSequence(arr, idx) {
    timeoutId = setTimeout(() => {
      observer.next(arr[idx]);
      if (idx === arr.length - 1) {
        observer.complete();
      } else {
        doSequence(arr, ++idx);
      }
    }, 1000);
  }


  doSequence(seq, 0);


  // Unsubscribe should clear the timeout to stop execution
  return {unsubscribe() {
    clearTimeout(timeoutId);
  }};
}


// Create a new Observable that will deliver the above sequence
const sequence = new Observable(sequenceSubscriber);


sequence.subscribe({
  next(num) { console.log(num); },
  complete() { console.log('Finished sequence'); }
});


// Logs:
// (at 1 second): 1
// (at 2 seconds): 2
// (at 3 seconds): 3
// (at 3 seconds): Finished sequence

注意,如果你訂閱了兩次,就會(huì)有兩個(gè)獨(dú)立的流,每個(gè)流都會(huì)每秒發(fā)出一個(gè)數(shù)字。代碼如下:

//Two subscriptions


// Subscribe starts the clock, and will emit after 1 second
sequence.subscribe({
  next(num) { console.log('1st subscribe: ' + num); },
  complete() { console.log('1st sequence finished.'); }
});


// After 1/2 second, subscribe again.
setTimeout(() => {
  sequence.subscribe({
    next(num) { console.log('2nd subscribe: ' + num); },
    complete() { console.log('2nd sequence finished.'); }
  });
}, 500);


// Logs:
// (at 1 second): 1st subscribe: 1
// (at 1.5 seconds): 2nd subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2.5 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3.5 seconds): 2nd subscribe: 3
// (at 3.5 seconds): 2nd sequence finished

修改這個(gè)可觀察對(duì)象以支持多播,代碼如下:

//Create a multicast subscriber


function multicastSequenceSubscriber() {
  const seq = [1, 2, 3];
  // Keep track of each observer (one for every active subscription)
  const observers = [];
  // Still a single timeoutId because there will only ever be one
  // set of values being generated, multicasted to each subscriber
  let timeoutId;


  // Return the subscriber function (runs when subscribe()
  // function is invoked)
  return (observer) => {
    observers.push(observer);
    // When this is the first subscription, start the sequence
    if (observers.length === 1) {
      timeoutId = doSequence({
        next(val) {
          // Iterate through observers and notify all subscriptions
          observers.forEach(obs => obs.next(val));
        },
        complete() {
          // Notify all complete callbacks
          observers.slice(0).forEach(obs => obs.complete());
        }
      }, seq, 0);
    }


    return {
      unsubscribe() {
        // Remove from the observers array so it's no longer notified
        observers.splice(observers.indexOf(observer), 1);
        // If there's no more listeners, do cleanup
        if (observers.length === 0) {
          clearTimeout(timeoutId);
        }
      }
    };
  };
}


// Run through an array of numbers, emitting one value
// per second until it gets to the end of the array.
function doSequence(observer, arr, idx) {
  return setTimeout(() => {
    observer.next(arr[idx]);
    if (idx === arr.length - 1) {
      observer.complete();
    } else {
      doSequence(observer, arr, ++idx);
    }
  }, 1000);
}


// Create a new Observable that will deliver the above sequence
const multicastSequence = new Observable(multicastSequenceSubscriber());


// Subscribe starts the clock, and begins to emit after 1 second
multicastSequence.subscribe({
  next(num) { console.log('1st subscribe: ' + num); },
  complete() { console.log('1st sequence finished.'); }
});


// After 1 1/2 seconds, subscribe again (should "miss" the first value).
setTimeout(() => {
  multicastSequence.subscribe({
    next(num) { console.log('2nd subscribe: ' + num); },
    complete() { console.log('2nd sequence finished.'); }
  });
}, 1500);


// Logs:
// (at 1 second): 1st subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3 seconds): 2nd subscribe: 3
// (at 3 seconds): 2nd sequence finished

雖然支持多播的可觀察對(duì)象需要做更多的準(zhǔn)備工作,但對(duì)某些應(yīng)用來(lái)說(shuō),這非常有用。稍后我們會(huì)介紹一些簡(jiǎn)化多播的工具,它們讓你能接收任何可觀察對(duì)象,并把它變成支持多播的。

錯(cuò)誤處理

由于可觀察對(duì)象會(huì)異步生成值,所以用 try/catch 是無(wú)法捕獲錯(cuò)誤的。你應(yīng)該在觀察者中指定一個(gè) error 回調(diào)來(lái)處理錯(cuò)誤。發(fā)生錯(cuò)誤時(shí)還會(huì)導(dǎo)致可觀察對(duì)象清理現(xiàn)有的訂閱,并且停止生成值??捎^察對(duì)象可以生成值(調(diào)用 next 回調(diào)),也可以調(diào)用 completeerror 回調(diào)來(lái)主動(dòng)結(jié)束。

myObservable.subscribe({
  next(num) { console.log('Next num: ' + num)},
  error(err) { console.log('Received an errror: ' + err)}
});
以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)