webSocket

2020-10-13 11:26 更新

瀏覽器提供的與 w3c 兼容的 WebSocket 對(duì)象的包裝。

webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>

參量

urlConfigOrSource WebSocket 端點(diǎn)作為 url 或帶有 配置和其他觀察者。

returns

WebSocketSubject<T>:允許通過(guò) WebSocket 連接發(fā)送和接收消息的主題。

描述

Subject通過(guò) WebSocket 與服務(wù)器通信的服務(wù)器

webSocket是產(chǎn)生一個(gè)的工廠函數(shù),WebSocketSubject可用于建立與任意端點(diǎn)的 WebSocket 連接。 webSocket接受帶有 WebSocket 終結(jié)點(diǎn)的url 的字符串或 WebSocketSubjectConfig 提供附加配置的 對(duì)象以及用于跟蹤 WebSocket 連接生命周期的觀察器作為參數(shù)。

當(dāng) WebSocketSubject訂閱,它試圖讓一個(gè) socket 連接,除非已經(jīng)有一個(gè)。 這意味著許多訂閱用戶將始終收聽(tīng) 在同一套接字上,從而節(jié)省了資源。 但是,如果兩個(gè)實(shí)例由組成 WebSocketSubject, 即使這兩個(gè)網(wǎng)址具有相同的網(wǎng)址,它們也會(huì)嘗試將其分開(kāi) 連接。 使用者時(shí) WebSocketSubject取消訂閱的 ,套接字連接會(huì)關(guān)閉, 僅當(dāng)沒(méi)有更多訂閱用戶仍在收聽(tīng)時(shí)。 如果一段時(shí)間后消費(fèi)者開(kāi)始 再次訂閱,將重新建立連接。

建立連接后,每當(dāng)服務(wù)器收到新消息時(shí), WebSocketSubject都會(huì)發(fā)出該消息。 消息作為流中的值。 默認(rèn)情況下,通過(guò)解析來(lái)自套接字的消息 JSON.parse。 如果你 想要自定義反序列化的處理方式(如果有的話),您可以提供自定義 resultSelectorWebSocketSubject。 如果連接關(guān)閉,則流將完成,前提是發(fā)生了 任何錯(cuò)誤。 如果在任何時(shí)候(啟動(dòng),維護(hù)或關(guān)閉連接)出現(xiàn)錯(cuò)誤, 無(wú)論拋出什么 WebSocket API,流都將出錯(cuò)。

由于是 Subject,因此 WebSocketSubject允許從服務(wù)器接收和發(fā)送消息。 為了 與所連接的端點(diǎn),用于通信 nexterrorcomplete方法。 next將值發(fā)送到服務(wù)器,因此請(qǐng)記住 該值將不會(huì)預(yù)先序列化。 因此, JSON.stringify必須手動(dòng)調(diào)用一個(gè)值, 在調(diào)用 之前 next結(jié)果 。 還要注意,如果在下一個(gè)價(jià)值時(shí)刻 沒(méi)有套接字連接(例如,沒(méi)有人在訂閱),這些值將被緩沖,并在連接時(shí)發(fā)送 終于成立了。 complete方法關(guān)閉套接字連接。 error一樣 并通過(guò)狀態(tài)代碼和字符串以及發(fā)生的詳細(xì)信息通知服務(wù)器發(fā)生了問(wèn)題。 由于 WebSocket API 需要狀態(tài)碼, WebSocketSubject因此不允許使用,例如常規(guī) Subject, 將任意值傳遞給該 error方法。 需要使用具有以下內(nèi)容的對(duì)象進(jìn)行調(diào)用 code 具有狀態(tài)碼編號(hào)的 可選 reason屬性和具有描述細(xì)節(jié)的字符串的 屬性 錯(cuò)誤。

通話 next不會(huì)影響的訂閱用戶 WebSocketSubject-他們沒(méi)有 某些信息已發(fā)送到服務(wù)器的信息(當(dāng)然,除非服務(wù)器 以某種方式響應(yīng)消息)。 另一方面,由于調(diào)用 complete觸發(fā)器 嘗試關(guān)閉套接字連接。 如果該連接已關(guān)閉且沒(méi)有任何錯(cuò)誤,則流將 完成,從而通知所有訂閱用戶。 由于通話 error關(guān)閉 如果關(guān)閉自身,套接字連接也將帶有服務(wù)器的不同狀態(tài)代碼 沒(méi)有錯(cuò)誤,已訂閱的 Observable 不會(huì)像人們期望的那樣出錯(cuò),但會(huì)像往常一樣完成。 在兩種情況下 (調(diào)用 completeerror),如果關(guān)閉套接字連接的過(guò)程導(dǎo)致某些錯(cuò)誤, 流 會(huì)出錯(cuò)。

多路復(fù)用

WebSocketSubject還有其他操作符,在其他主題中找不到。 這就是所謂的 multiplex,它是 用于模擬打開(kāi)多個(gè)套接字連接,而實(shí)際上僅維護(hù)一個(gè)。 例如,一個(gè)應(yīng)用程序同時(shí)具有聊天面板和有關(guān)體育新聞的實(shí)時(shí)通知。 由于這是兩個(gè)不同的功能, 每個(gè)有兩個(gè)獨(dú)立的連接是有意義的。 也許 WebSocket 可能有兩個(gè)單獨(dú)的服務(wù) 端點(diǎn),在單獨(dú)的計(jì)算機(jī)上運(yùn)行,只有 GUI 將它們組合在一起。 具有套接字連接 因?yàn)槊總€(gè)功能都可能變得資源過(guò)于昂貴。 單身是一種常見(jiàn)的模式 WebSocket 端點(diǎn),充當(dāng)其他服務(wù)(在本例中為聊天和體育新聞服務(wù))的網(wǎng)關(guān)。 即使客戶端應(yīng)用程序中只有一個(gè)連接,也可以像處理流一樣操作流 需要兩個(gè)單獨(dú)的插座。 這消除了手動(dòng)在網(wǎng)關(guān)中注冊(cè)和注銷(xiāo) 提供服務(wù)并過(guò)濾出感興趣的消息。 這正是該 multiplex方法的目的。

方法接受三個(gè)參數(shù)。 前兩個(gè)是返回訂閱和取消訂閱消息的函數(shù) 分別。 這些消息將在產(chǎn)生 Observable 的使用者使用時(shí)發(fā)送到服務(wù)器 訂閱和取消訂閱。 服務(wù)器可以使用它們來(lái)驗(yàn)證某種消息應(yīng)該啟動(dòng)還是停止 被轉(zhuǎn)發(fā)給客戶。 對(duì)于上述示例應(yīng)用程序,在獲得帶有正確標(biāo)識(shí)符的訂閱消息后, 網(wǎng)關(guān)服務(wù)器可以決定應(yīng)連接到真實(shí)的體育新聞服務(wù)并開(kāi)始從中轉(zhuǎn)發(fā)消息。 請(qǐng)注意,這兩個(gè)消息都將按函數(shù)返回的方式發(fā)送,默認(rèn)情況下,它們使用 JSON.stringify 進(jìn)行序列化,只是 作為通過(guò)推送的消息 next。 還請(qǐng)記住,這些消息將在 發(fā)送, 每次 訂閱時(shí) 并且 取消訂閱。 這是潛在的危險(xiǎn),因?yàn)?Observable 的一個(gè)使用者可能會(huì)取消訂閱,并且服務(wù)器 由于收到取消訂閱的消息,可能會(huì)停止發(fā)送消息。 這需要處理 在服務(wù)器上或 使用 publish在從“多路復(fù)用”返回的 Observable 上 。

的最后一個(gè)參數(shù) multiplex是 的 messageFilter應(yīng)返回布爾值 函數(shù)。 用于過(guò)濾郵件 由服務(wù)器發(fā)送給僅屬于模擬 WebSocket 流的服務(wù)器。 例如,服務(wù)器可能會(huì)標(biāo)記這些 消息對(duì)象上帶有某種字符串標(biāo)識(shí)符的消息,并且 messageFilter將返回 true 如果套接字發(fā)出的對(duì)象上有這樣的標(biāo)識(shí)符。 消息返回 falsemessageFilter簡(jiǎn)單地跳過(guò), 并且不會(huì)順流而下。

返回值 multiplex是 Observable,其中包含來(lái)自模擬套接字連接的消息。 請(qǐng)注意 不是 WebSocketSubject,因此調(diào)用 nextmultiplex再次失敗。 用于將值推向 服務(wù)器,使用 root WebSocketSubject

例子

偵聽(tīng)來(lái)自服務(wù)器的消息

import { webSocket } from "rxjs/webSocket";
const subject = webSocket("ws://localhost:8081");


subject.subscribe(
   msg => console.log('message received: ' + msg), // Called whenever there is a message from the server.
   err => console.log(err), // Called if at any point WebSocket API signals some kind of error.
   () => console.log('complete') // Called when connection is closed (for whatever reason).
 );

將消息推送到服務(wù)器

import { webSocket } from "rxjs/webSocket";
const subject = webSocket('ws://localhost:8081');


subject.subscribe();
// Note that at least one consumer has to subscribe to the created subject - otherwise "nexted" values will be just buffered and not sent,
// since no connection was established!


subject.next({message: 'some message'});
// This will send a message to the server once a connection is made. Remember value is serialized with JSON.stringify by default!


subject.complete(); // Closes the connection.


subject.error({code: 4000, reason: 'I think our app just broke!'});
// Also closes the connection, but let's the server know that this closing is caused by some error.

多路 WebSocket

import { webSocket } from "rxjs/webSocket";
const subject = webSocket('ws://localhost:8081');


const observableA = subject.multiplex(
  () => ({subscribe: 'A'}), // When server gets this message, it will start sending messages for 'A'...
  () => ({unsubscribe: 'A'}), // ...and when gets this one, it will stop.
  message => message.type === 'A' // If the function returns `true` message is passed down the stream. Skipped if the function returns false.
);


const observableB = subject.multiplex( // And the same goes for 'B'.
  () => ({subscribe: 'B'}),
  () => ({unsubscribe: 'B'}),
  message => message.type === 'B'
);


const subA = observableA.subscribe(messageForA => console.log(messageForA));
// At this moment WebSocket connection is established. Server gets '{"subscribe": "A"}' message and starts sending messages for 'A',
// which we log here.


const subB = observableB.subscribe(messageForB => console.log(messageForB));
// Since we already have a connection, we just send '{"subscribe": "B"}' message to the server. It starts sending messages for 'B',
// which we log here.


subB.unsubscribe();
// Message '{"unsubscribe": "B"}' is sent to the server, which stops sending 'B' messages.


subA.unsubscribe();
// Message '{"unsubscribe": "A"}' makes the server stop sending messages for 'A'. Since there is no more subscribers to root Subject,
// socket connection closes.
以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)