RxJS webSocket

2020-10-09 17:39 更新

瀏覽器提供的與 w3c 兼容的 WebSocket 對象的包裝。

webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>

參量

urlConfigOrSource WebSocket 端點作為 url 或帶有 配置和其他觀察者。

退貨

WebSocketSubject<T>:允許通過 WebSocket 連接發(fā)送和接收消息的主題。

描述

Subject通過 WebSocket 與服務(wù)器通信的服務(wù)器

webSocket是產(chǎn)生一個工廠函數(shù) WebSocketSubject, 可以用來與任意端點建立 WebSocket 連接。 webSocket接受帶有 WebSocket 端點 url 的字符串作為參數(shù),或者接受 WebSocketSubjectConfig用于提供其他配置的對象,如 以及用于跟蹤 WebSocket 連接生命周期的觀察者。

當(dāng) WebSocketSubject訂閱,它試圖讓一個 socket 連接, 除非已經(jīng)有一個。 這意味著許多訂戶將始終收聽 在同一套接字上,從而節(jié)省了資源。 但是,如果兩個實例由組成 WebSocketSubject, 即使這兩個網(wǎng)址具有相同的網(wǎng)址,它們也會嘗試將其分開 連接。 使用者時 WebSocketSubject取消訂閱的 ,套接字連接會關(guān)閉, 僅當(dāng)沒有更多訂戶仍在收聽時。 如果一段時間后消費(fèi)者開始 再次訂閱,將重新建立連接。

建立連接后,每當(dāng)服務(wù)器收到新消息時, WebSocketSubject都會發(fā)出該消息。 消息作為流中的值。 默認(rèn)情況下,通過解析來自套接字的消息 JSON.parse。 如果你 想要自定義反序列化的處理方式(如果有的話),您可以提供自定義 resultSelectorWebSocketSubject。 如果連接關(guān)閉,則流將完成,前提是發(fā)生了 任何錯誤。 如果在任何時候(啟動,維護(hù)或關(guān)閉連接)出現(xiàn)錯誤, 無論拋出什么 WebSocket API,流都將出錯。

由于是 Subject,因此 WebSocketSubject允許從服務(wù)器接收和發(fā)送消息。 為了 與所連接的端點,用于通信 next, errorcomplete方法。 next將值發(fā)送到服務(wù)器,因此請記住 該值將不會預(yù)先序列化。 因此, JSON.stringify必須手動調(diào)用一個值, 在調(diào)用 之前 next結(jié)果 。 還要注意,如果在下一個價值時刻 沒有套接字連接(例如,沒有人在訂閱),這些值將被緩沖,并在連接時發(fā)送 終于成立了。 complete方法關(guān)閉套接字連接。 error一樣 并通過狀態(tài)代碼和字符串以及發(fā)生的詳細(xì)信息通知服務(wù)器發(fā)生了問題。 由于 WebSocket API 需要狀態(tài)碼, WebSocketSubject因此不允許使用,例如常規(guī) Subject, 將任意值傳遞給該 error方法。 需要使用具有以下內(nèi)容的對象進(jìn)行調(diào)用 code 具有狀態(tài)碼編號的 可選 reason屬性和具有描述細(xì)節(jié)的字符串的 屬性 錯誤。

通話 next不會影響的訂戶 WebSocketSubject-他們沒有 某些信息已發(fā)送到服務(wù)器的信息(當(dāng)然,除非服務(wù)器 以某種方式響應(yīng)消息)。 另一方面,由于調(diào)用 complete觸發(fā)器 嘗試關(guān)閉套接字連接。 如果該連接已關(guān)閉且沒有任何錯誤,則流將 完成,從而通知所有訂戶。 由于通話 error關(guān)閉 如果關(guān)閉自身,套接字連接也將帶有服務(wù)器的不同狀態(tài)代碼 沒有錯誤,已訂閱的 Observable 不會像人們期望的那樣出錯,但會像往常一樣完成。 在兩種情況下 (調(diào)用 completeerror),如果關(guān)閉套接字連接的過程導(dǎo)致某些錯誤, 流 會出錯。

多路復(fù)用

WebSocketSubject還有其他操作符,在其他主題中找不到。 這就是所謂的 multiplex,它是 用于模擬打開多個套接字連接,而實際上僅維護(hù)一個。 例如,一個應(yīng)用程序同時具有聊天面板和有關(guān)體育新聞的實時通知。 由于這是兩個不同的功能, 每個有兩個獨立的連接是有意義的。 也許 WebSocket 可能有兩個單獨的服務(wù) 端點,在單獨的計算機(jī)上運(yùn)行,只有 GUI 將它們組合在一起。 具有套接字連接 因為每個功能都可能變得資源過于昂貴。 單身是一種常見的模式 WebSocket 端點,充當(dāng)其他服務(wù)(在本例中為聊天和體育新聞服務(wù))的網(wǎng)關(guān)。 即使客戶端應(yīng)用程序中只有一個連接,也可以像處理流一樣操作流 需要兩個單獨的插座。 這消除了手動在網(wǎng)關(guān)中注冊和注銷 提供服務(wù)并過濾出感興趣的消息。 這正是該 multiplex方法的目的。

方法接受三個參數(shù)。 前兩個是返回訂閱和取消訂閱消息的函數(shù) 分別。 這些消息將在產(chǎn)生 Observable 的使用者使用時發(fā)送到服務(wù)器 訂閱和取消訂閱。 服務(wù)器可以使用它們來驗證某種消息應(yīng)該啟動還是停止 被轉(zhuǎn)發(fā)給客戶。 對于上述示例應(yīng)用程序,在獲得帶有正確標(biāo)識符的訂閱消息后, 網(wǎng)關(guān)服務(wù)器可以決定應(yīng)連接到真實的體育新聞服務(wù)并開始從中轉(zhuǎn)發(fā)消息。 請注意,這兩個消息都將按函數(shù)返回的方式發(fā)送,默認(rèn)情況下,它們使用 JSON.stringify 進(jìn)行序列化,只是 作為通過推送的消息 next。 還請記住,這些消息將在 發(fā)送, 每次 訂閱時 并且 取消訂閱。 這是潛在的危險,因為 Observable 的一個使用者可能會取消訂閱,并且服務(wù)器 由于收到取消訂閱的消息,可能會停止發(fā)送消息。 這需要處理 在服務(wù)器上或 使用 publish在從“多路復(fù)用”返回的 Observable 上 。

的最后一個參數(shù) multiplex是 的 messageFilter應(yīng)返回布爾值 函數(shù)。 用于過濾郵件 由服務(wù)器發(fā)送給僅屬于模擬 WebSocket 流的服務(wù)器。 例如,服務(wù)器可能會標(biāo)記這些 消息對象上帶有某種字符串標(biāo)識符的消息,并且 messageFilter將返回 true 如果套接字發(fā)出的對象上有這樣的標(biāo)識符。 消息返回 falsemessageFilter簡單地跳過, 并且不會順流而下。

返回值 multiplex是 Observable,其中包含來自模擬套接字連接的消息。 請注意 不是 WebSocketSubject,因此調(diào)用 nextmultiplex再次失敗。 用于將值推向 服務(wù)器,使用 root WebSocketSubject

例子

偵聽來自服務(wù)器的消息

import { webSocket } from "rxjs/webSocket";
const subject = webSocket("ws://localhost:8081");


subject.subscribe(
   msg => console.log('message received: ' + msg), // Called whenever there is a message from the server.
   err => console.log(err), // Called if at any point WebSocket API signals some kind of error.
   () => console.log('complete') // Called when connection is closed (for whatever reason).
 );

將消息推送到服務(wù)器

import { webSocket } from "rxjs/webSocket";
const subject = webSocket('ws://localhost:8081');


subject.subscribe();
// Note that at least one consumer has to subscribe to the created subject - otherwise "nexted" values will be just buffered and not sent,
// since no connection was established!


subject.next({message: 'some message'});
// This will send a message to the server once a connection is made. Remember value is serialized with JSON.stringify by default!


subject.complete(); // Closes the connection.


subject.error({code: 4000, reason: 'I think our app just broke!'});
// Also closes the connection, but let's the server know that this closing is caused by some error.

多路 WebSocket

import { webSocket } from "rxjs/webSocket";
const subject = webSocket('ws://localhost:8081');


const observableA = subject.multiplex(
  () => ({subscribe: 'A'}), // When server gets this message, it will start sending messages for 'A'...
  () => ({unsubscribe: 'A'}), // ...and when gets this one, it will stop.
  message => message.type === 'A' // If the function returns `true` message is passed down the stream. Skipped if the function returns false.
);


const observableB = subject.multiplex( // And the same goes for 'B'.
  () => ({subscribe: 'B'}),
  () => ({unsubscribe: 'B'}),
  message => message.type === 'B'
);


const subA = observableA.subscribe(messageForA => console.log(messageForA));
// At this moment WebSocket connection is established. Server gets '{"subscribe": "A"}' message and starts sending messages for 'A',
// which we log here.


const subB = observableB.subscribe(messageForB => console.log(messageForB));
// Since we already have a connection, we just send '{"subscribe": "B"}' message to the server. It starts sending messages for 'B',
// which we log here.


subB.unsubscribe();
// Message '{"unsubscribe": "B"}' is sent to the server, which stops sending 'B' messages.


subA.unsubscribe();
// Message '{"unsubscribe": "A"}' makes the server stop sending messages for 'A'. Since there is no more subscribers to root Subject,
// socket connection closes.
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號