可觀察對(duì)象對(duì)在應(yīng)用的各個(gè)部分之間傳遞消息提供了支持。 它們?cè)?Angular 中頻繁使用,并且推薦把它們用于事件處理、異步編程以及處理多個(gè)值等場(chǎng)景。
觀察者(Observer
)模式是一個(gè)軟件設(shè)計(jì)模式,它有一個(gè)對(duì)象,稱(chēng)之為主體 Subject
,負(fù)責(zé)維護(hù)一個(gè)依賴(lài)項(xiàng)(稱(chēng)之為觀察者 Observer
)的列表,并且在狀態(tài)變化時(shí)自動(dòng)通知它們。 該模式和發(fā)布/訂閱模式非常相似(但不完全一樣)。
可觀察對(duì)象是聲明式的 —— 也就是說(shuō),雖然你定義了一個(gè)用于發(fā)布值的函數(shù),但是在有消費(fèi)者訂閱它之前,這個(gè)函數(shù)并不會(huì)實(shí)際執(zhí)行。 訂閱之后,當(dāng)這個(gè)函數(shù)執(zhí)行完或取消訂閱時(shí),訂閱者就會(huì)收到通知。
可觀察對(duì)象可以發(fā)送多個(gè)任意類(lèi)型的值 —— 字面量、消息、事件。無(wú)論這些值是同步發(fā)送的還是異步發(fā)送的,接收這些值的 API 都是一樣的。 由于準(zhǔn)備(setup
)和清場(chǎng)(teardown
)的邏輯都是由可觀察對(duì)象自己處理的,因此你的應(yīng)用代碼只管訂閱并消費(fèi)這些值就可以了,做完之后,取消訂閱。無(wú)論這個(gè)流是擊鍵流、HTTP 響應(yīng)流還是定時(shí)器,對(duì)這些值進(jìn)行監(jiān)聽(tīng)和停止監(jiān)聽(tīng)的接口都是一樣的。
由于這些優(yōu)點(diǎn),可觀察對(duì)象在 Angular 中得到廣泛使用,也同樣建議應(yīng)用開(kāi)發(fā)者好好使用它。
作為發(fā)布者,你創(chuàng)建一個(gè) Observable 的實(shí)例,其中定義了一個(gè)訂閱者(subscriber
)函數(shù)。 當(dāng)有消費(fèi)者調(diào)用 subscribe()
方法時(shí),這個(gè)函數(shù)就會(huì)執(zhí)行。 訂閱者函數(shù)用于定義“如何獲取或生成那些要發(fā)布的值或消息”。
要執(zhí)行所創(chuàng)建的可觀察對(duì)象,并開(kāi)始從中接收通知,你就要調(diào)用它的 subscribe()
方法,并傳入一個(gè)觀察者(observer
)。 這是一個(gè) JavaScript 對(duì)象,它定義了你收到的這些消息的處理器(handler
)。 subscribe()
調(diào)用會(huì)返回一個(gè) Subscription
對(duì)象,該對(duì)象具有一個(gè) unsubscribe()
方法。 當(dāng)調(diào)用該方法時(shí),你就會(huì)停止接收通知。
下面這個(gè)例子中示范了這種基本用法,它展示了如何使用可觀察對(duì)象來(lái)對(duì)當(dāng)前地理位置進(jìn)行更新。
//Observe geolocation updates
// Create an Observable that will start listening to geolocation updates
// when a consumer subscribes.
const locations = new Observable((observer) => {
let watchId: number;
// Simple geolocation API check provides values to publish
if ('geolocation' in navigator) {
watchId = navigator.geolocation.watchPosition((position: Position) => {
observer.next(position);
}, (error: PositionError) => {
observer.error(error);
});
} else {
observer.error('Geolocation not available');
}
// When the consumer unsubscribes, clean up data ready for next subscription.
return {
unsubscribe() {
navigator.geolocation.clearWatch(watchId);
}
};
});
// Call subscribe() to start listening for updates.
const locationsSubscription = locations.subscribe({
next(position) {
console.log('Current Position: ', position);
},
error(msg) {
console.log('Error Getting Location: ', msg);
}
});
// Stop listening for location after 10 seconds
setTimeout(() => {
locationsSubscription.unsubscribe();
}, 10000);
用于接收可觀察對(duì)象通知的處理器要實(shí)現(xiàn) Observer
接口。這個(gè)對(duì)象定義了一些回調(diào)函數(shù)來(lái)處理可觀察對(duì)象可能會(huì)發(fā)來(lái)的三種通知:
通知類(lèi)型 | 說(shuō)明 |
---|---|
next |
必要。用來(lái)處理每個(gè)送達(dá)值。在開(kāi)始執(zhí)行后可能執(zhí)行零次或多次。 |
error |
可選。用來(lái)處理錯(cuò)誤通知。錯(cuò)誤會(huì)中斷這個(gè)可觀察對(duì)象實(shí)例的執(zhí)行過(guò)程。 |
complete |
可選。用來(lái)處理執(zhí)行完畢(complete)通知。當(dāng)執(zhí)行完畢后,這些值就會(huì)繼續(xù)傳給下一個(gè)處理器。 |
觀察者對(duì)象可以定義這三種處理器的任意組合。如果你不為某種通知類(lèi)型提供處理器,這個(gè)觀察者就會(huì)忽略相應(yīng)類(lèi)型的通知。
只有當(dāng)有人訂閱 Observable
的實(shí)例時(shí),它才會(huì)開(kāi)始發(fā)布值。 訂閱時(shí)要先調(diào)用該實(shí)例的 subscribe()
方法,并把一個(gè)觀察者對(duì)象傳給它,用來(lái)接收通知。
為了展示訂閱的原理,我們需要?jiǎng)?chuàng)建新的可觀察對(duì)象。它有一個(gè)構(gòu)造函數(shù)可以用來(lái)創(chuàng)建新實(shí)例,但是為了更簡(jiǎn)明,也可以使用
Observable
上定義的一些靜態(tài)方法來(lái)創(chuàng)建一些常用的簡(jiǎn)單可觀察對(duì)象:
- `of(...items)` —— 返回一個(gè) `Observable` 實(shí)例,它用同步的方式把參數(shù)中提供的這些值發(fā)送出來(lái)。
- `from(iterable)` —— 把它的參數(shù)轉(zhuǎn)換成一個(gè) `Observable` 實(shí)例。 該方法通常用于把一個(gè)數(shù)組轉(zhuǎn)換成一個(gè)(發(fā)送多個(gè)值的)可觀察對(duì)象。
下面的例子會(huì)創(chuàng)建并訂閱一個(gè)簡(jiǎn)單的可觀察對(duì)象,它的觀察者會(huì)把接收到的消息記錄到控制臺(tái)中:
//Subscribe using observer
// Create simple observable that emits three values
const myObservable = of(1, 2, 3);
// Create observer object
const myObserver = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// Execute with the observer object
myObservable.subscribe(myObserver);
// Logs:
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification
另外,subscribe()
方法還可以接收定義在同一行中的回調(diào)函數(shù),無(wú)論 next
、error
還是 complete
處理器。比如,下面的 subscribe()
調(diào)用和前面指定預(yù)定義觀察者的例子是等價(jià)的。
//Subscribe with positional arguments
myObservable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
無(wú)論哪種情況,next
處理器都是必要的,而 error
和 complete
處理器是可選的。
注意,next()
函數(shù)可以接受消息字符串、事件對(duì)象、數(shù)字值或各種結(jié)構(gòu),具體類(lèi)型取決于上下文。 為了更通用一點(diǎn),我們把由可觀察對(duì)象發(fā)布出來(lái)的數(shù)據(jù)統(tǒng)稱(chēng)為流。任何類(lèi)型的值都可以表示為可觀察對(duì)象,而這些值會(huì)被發(fā)布為一個(gè)流。
使用 Observable
構(gòu)造函數(shù)可以創(chuàng)建任何類(lèi)型的可觀察流。 當(dāng)執(zhí)行可觀察對(duì)象的 subscribe()
方法時(shí),這個(gè)構(gòu)造函數(shù)就會(huì)把它接收到的參數(shù)作為訂閱函數(shù)來(lái)運(yùn)行。 訂閱函數(shù)會(huì)接收一個(gè) Observer
對(duì)象,并把值發(fā)布給觀察者的 next()
方法。
比如,要?jiǎng)?chuàng)建一個(gè)與前面的 of(1, 2, 3) 等價(jià)的可觀察對(duì)象,你可以這樣做:
//Create observable with constructor
// This function runs when subscribe() is called
function sequenceSubscriber(observer) {
// synchronously deliver 1, 2, and 3, then complete
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
// unsubscribe function doesn't need to do anything in this
// because values are delivered synchronously
return {unsubscribe() {}};
}
// Create a new Observable that will deliver the above sequence
const sequence = new Observable(sequenceSubscriber);
// execute the Observable and print the result of each notification
sequence.subscribe({
next(num) { console.log(num); },
complete() { console.log('Finished sequence'); }
});
// Logs:
// 1
// 2
// 3
// Finished sequence
如果要略微加強(qiáng)這個(gè)例子,我們可以創(chuàng)建一個(gè)用來(lái)發(fā)布事件的可觀察對(duì)象。在這個(gè)例子中,訂閱函數(shù)是用內(nèi)聯(lián)方式定義的。
//Create with custom fromEvent function
function fromEvent(target, eventName) {
return new Observable((observer) => {
const handler = (e) => observer.next(e);
// Add the event handler to the target
target.addEventListener(eventName, handler);
return () => {
// Detach the event handler from the target
target.removeEventListener(eventName, handler);
};
});
}
現(xiàn)在,你就可以使用這個(gè)函數(shù)來(lái)創(chuàng)建可發(fā)布 keydown
事件的可觀察對(duì)象了:
//Use custom fromEvent function
const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;
const subscription = fromEvent(nameInput, 'keydown')
.subscribe((e: KeyboardEvent) => {
if (e.keyCode === ESC_KEY) {
nameInput.value = '';
}
});
典型的可觀察對(duì)象會(huì)為每一個(gè)觀察者創(chuàng)建一次新的、獨(dú)立的執(zhí)行。 當(dāng)觀察者進(jìn)行訂閱時(shí),該可觀察對(duì)象會(huì)連上一個(gè)事件處理器,并且向那個(gè)觀察者發(fā)送一些值。當(dāng)?shù)诙€(gè)觀察者訂閱時(shí),這個(gè)可觀察對(duì)象就會(huì)連上一個(gè)新的事件處理器,并獨(dú)立執(zhí)行一次,把這些值發(fā)送給第二個(gè)可觀察對(duì)象。
有時(shí)候,不應(yīng)該對(duì)每一個(gè)訂閱者都獨(dú)立執(zhí)行一次,你可能會(huì)希望每次訂閱都得到同一批值 —— 即使是那些你已經(jīng)發(fā)送過(guò)的。這在某些情況下有用,比如用來(lái)發(fā)送 document 上的點(diǎn)擊事件的可觀察對(duì)象。
多播用來(lái)讓可觀察對(duì)象在一次執(zhí)行中同時(shí)廣播給多個(gè)訂閱者。借助支持多播的可觀察對(duì)象,你不必注冊(cè)多個(gè)監(jiān)聽(tīng)器,而是復(fù)用第一個(gè)(next
)監(jiān)聽(tīng)器,并且把值發(fā)送給各個(gè)訂閱者。
當(dāng)創(chuàng)建可觀察對(duì)象時(shí),你要決定你希望別人怎么用這個(gè)對(duì)象以及是否對(duì)它的值進(jìn)行多播。
來(lái)看一個(gè)從 1 到 3 進(jìn)行計(jì)數(shù)的例子,它每發(fā)出一個(gè)數(shù)字就會(huì)等待 1 秒。
//Create a delayed sequence
function sequenceSubscriber(observer) {
const seq = [1, 2, 3];
let timeoutId;
// Will run through an array of numbers, emitting one value
// per second until it gets to the end of the array.
function doSequence(arr, idx) {
timeoutId = setTimeout(() => {
observer.next(arr[idx]);
if (idx === arr.length - 1) {
observer.complete();
} else {
doSequence(arr, ++idx);
}
}, 1000);
}
doSequence(seq, 0);
// Unsubscribe should clear the timeout to stop execution
return {unsubscribe() {
clearTimeout(timeoutId);
}};
}
// Create a new Observable that will deliver the above sequence
const sequence = new Observable(sequenceSubscriber);
sequence.subscribe({
next(num) { console.log(num); },
complete() { console.log('Finished sequence'); }
});
// Logs:
// (at 1 second): 1
// (at 2 seconds): 2
// (at 3 seconds): 3
// (at 3 seconds): Finished sequence
注意,如果你訂閱了兩次,就會(huì)有兩個(gè)獨(dú)立的流,每個(gè)流都會(huì)每秒發(fā)出一個(gè)數(shù)字。代碼如下:
//Two subscriptions
// Subscribe starts the clock, and will emit after 1 second
sequence.subscribe({
next(num) { console.log('1st subscribe: ' + num); },
complete() { console.log('1st sequence finished.'); }
});
// After 1/2 second, subscribe again.
setTimeout(() => {
sequence.subscribe({
next(num) { console.log('2nd subscribe: ' + num); },
complete() { console.log('2nd sequence finished.'); }
});
}, 500);
// Logs:
// (at 1 second): 1st subscribe: 1
// (at 1.5 seconds): 2nd subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2.5 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3.5 seconds): 2nd subscribe: 3
// (at 3.5 seconds): 2nd sequence finished
修改這個(gè)可觀察對(duì)象以支持多播,代碼如下:
//Create a multicast subscriber
function multicastSequenceSubscriber() {
const seq = [1, 2, 3];
// Keep track of each observer (one for every active subscription)
const observers = [];
// Still a single timeoutId because there will only ever be one
// set of values being generated, multicasted to each subscriber
let timeoutId;
// Return the subscriber function (runs when subscribe()
// function is invoked)
return (observer) => {
observers.push(observer);
// When this is the first subscription, start the sequence
if (observers.length === 1) {
timeoutId = doSequence({
next(val) {
// Iterate through observers and notify all subscriptions
observers.forEach(obs => obs.next(val));
},
complete() {
// Notify all complete callbacks
observers.slice(0).forEach(obs => obs.complete());
}
}, seq, 0);
}
return {
unsubscribe() {
// Remove from the observers array so it's no longer notified
observers.splice(observers.indexOf(observer), 1);
// If there's no more listeners, do cleanup
if (observers.length === 0) {
clearTimeout(timeoutId);
}
}
};
};
}
// Run through an array of numbers, emitting one value
// per second until it gets to the end of the array.
function doSequence(observer, arr, idx) {
return setTimeout(() => {
observer.next(arr[idx]);
if (idx === arr.length - 1) {
observer.complete();
} else {
doSequence(observer, arr, ++idx);
}
}, 1000);
}
// Create a new Observable that will deliver the above sequence
const multicastSequence = new Observable(multicastSequenceSubscriber());
// Subscribe starts the clock, and begins to emit after 1 second
multicastSequence.subscribe({
next(num) { console.log('1st subscribe: ' + num); },
complete() { console.log('1st sequence finished.'); }
});
// After 1 1/2 seconds, subscribe again (should "miss" the first value).
setTimeout(() => {
multicastSequence.subscribe({
next(num) { console.log('2nd subscribe: ' + num); },
complete() { console.log('2nd sequence finished.'); }
});
}, 1500);
// Logs:
// (at 1 second): 1st subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3 seconds): 2nd subscribe: 3
// (at 3 seconds): 2nd sequence finished
雖然支持多播的可觀察對(duì)象需要做更多的準(zhǔn)備工作,但對(duì)某些應(yīng)用來(lái)說(shuō),這非常有用。稍后我們會(huì)介紹一些簡(jiǎn)化多播的工具,它們讓你能接收任何可觀察對(duì)象,并把它變成支持多播的。
由于可觀察對(duì)象會(huì)異步生成值,所以用 try/catch
是無(wú)法捕獲錯(cuò)誤的。你應(yīng)該在觀察者中指定一個(gè) error
回調(diào)來(lái)處理錯(cuò)誤。發(fā)生錯(cuò)誤時(shí)還會(huì)導(dǎo)致可觀察對(duì)象清理現(xiàn)有的訂閱,并且停止生成值??捎^察對(duì)象可以生成值(調(diào)用 next
回調(diào)),也可以調(diào)用 complete
或 error
回調(diào)來(lái)主動(dòng)結(jié)束。
myObservable.subscribe({
next(num) { console.log('Next num: ' + num)},
error(err) { console.log('Received an errror: ' + err)}
});
更多建議: