RxJS webSocket

2020-10-09 17:39 更新

瀏覽器提供的與 w3c 兼容的 WebSocket 對(duì)象的包裝。

webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>

參量

urlConfigOrSource WebSocket 端點(diǎn)作為 url 或帶有 配置和其他觀察者。

退貨

WebSocketSubject<T>:允許通過 WebSocket 連接發(fā)送和接收消息的主題。

描述

Subject通過 WebSocket 與服務(wù)器通信的服務(wù)器

webSocket是產(chǎn)生一個(gè)工廠函數(shù) WebSocketSubject, 可以用來與任意端點(diǎn)建立 WebSocket 連接。 webSocket接受帶有 WebSocket 端點(diǎn) url 的字符串作為參數(shù),或者接受 WebSocketSubjectConfig用于提供其他配置的對(duì)象,如 以及用于跟蹤 WebSocket 連接生命周期的觀察者。

當(dāng) WebSocketSubject訂閱,它試圖讓一個(gè) socket 連接, 除非已經(jīng)有一個(gè)。 這意味著許多訂戶將始終收聽 在同一套接字上,從而節(jié)省了資源。 但是,如果兩個(gè)實(shí)例由組成 WebSocketSubject, 即使這兩個(gè)網(wǎng)址具有相同的網(wǎng)址,它們也會(huì)嘗試將其分開 連接。 使用者時(shí) WebSocketSubject取消訂閱的 ,套接字連接會(huì)關(guān)閉, 僅當(dāng)沒有更多訂戶仍在收聽時(shí)。 如果一段時(shí)間后消費(fèi)者開始 再次訂閱,將重新建立連接。

建立連接后,每當(dāng)服務(wù)器收到新消息時(shí), WebSocketSubject都會(huì)發(fā)出該消息。 消息作為流中的值。 默認(rèn)情況下,通過解析來自套接字的消息 JSON.parse。 如果你 想要自定義反序列化的處理方式(如果有的話),您可以提供自定義 resultSelectorWebSocketSubject。 如果連接關(guān)閉,則流將完成,前提是發(fā)生了 任何錯(cuò)誤。 如果在任何時(shí)候(啟動(dòng),維護(hù)或關(guān)閉連接)出現(xiàn)錯(cuò)誤, 無論拋出什么 WebSocket API,流都將出錯(cuò)。

由于是 Subject,因此 WebSocketSubject允許從服務(wù)器接收和發(fā)送消息。 為了 與所連接的端點(diǎn),用于通信 next, errorcomplete方法。 next將值發(fā)送到服務(wù)器,因此請(qǐng)記住 該值將不會(huì)預(yù)先序列化。 因此, JSON.stringify必須手動(dòng)調(diào)用一個(gè)值, 在調(diào)用 之前 next結(jié)果 。 還要注意,如果在下一個(gè)價(jià)值時(shí)刻 沒有套接字連接(例如,沒有人在訂閱),這些值將被緩沖,并在連接時(shí)發(fā)送 終于成立了。 complete方法關(guān)閉套接字連接。 error一樣 并通過狀態(tài)代碼和字符串以及發(fā)生的詳細(xì)信息通知服務(wù)器發(fā)生了問題。 由于 WebSocket API 需要狀態(tài)碼, WebSocketSubject因此不允許使用,例如常規(guī) Subject, 將任意值傳遞給該 error方法。 需要使用具有以下內(nèi)容的對(duì)象進(jìn)行調(diào)用 code 具有狀態(tài)碼編號(hào)的 可選 reason屬性和具有描述細(xì)節(jié)的字符串的 屬性 錯(cuò)誤。

通話 next不會(huì)影響的訂戶 WebSocketSubject-他們沒有 某些信息已發(fā)送到服務(wù)器的信息(當(dāng)然,除非服務(wù)器 以某種方式響應(yīng)消息)。 另一方面,由于調(diào)用 complete觸發(fā)器 嘗試關(guān)閉套接字連接。 如果該連接已關(guān)閉且沒有任何錯(cuò)誤,則流將 完成,從而通知所有訂戶。 由于通話 error關(guān)閉 如果關(guān)閉自身,套接字連接也將帶有服務(wù)器的不同狀態(tài)代碼 沒有錯(cuò)誤,已訂閱的 Observable 不會(huì)像人們期望的那樣出錯(cuò),但會(huì)像往常一樣完成。 在兩種情況下 (調(diào)用 completeerror),如果關(guān)閉套接字連接的過程導(dǎo)致某些錯(cuò)誤, 流 會(huì)出錯(cuò)。

多路復(fù)用

WebSocketSubject還有其他操作符,在其他主題中找不到。 這就是所謂的 multiplex,它是 用于模擬打開多個(gè)套接字連接,而實(shí)際上僅維護(hù)一個(gè)。 例如,一個(gè)應(yīng)用程序同時(shí)具有聊天面板和有關(guān)體育新聞的實(shí)時(shí)通知。 由于這是兩個(gè)不同的功能, 每個(gè)有兩個(gè)獨(dú)立的連接是有意義的。 也許 WebSocket 可能有兩個(gè)單獨(dú)的服務(wù) 端點(diǎn),在單獨(dú)的計(jì)算機(jī)上運(yùn)行,只有 GUI 將它們組合在一起。 具有套接字連接 因?yàn)槊總€(gè)功能都可能變得資源過于昂貴。 單身是一種常見的模式 WebSocket 端點(diǎn),充當(dāng)其他服務(wù)(在本例中為聊天和體育新聞服務(wù))的網(wǎng)關(guān)。 即使客戶端應(yīng)用程序中只有一個(gè)連接,也可以像處理流一樣操作流 需要兩個(gè)單獨(dú)的插座。 這消除了手動(dòng)在網(wǎng)關(guān)中注冊(cè)和注銷 提供服務(wù)并過濾出感興趣的消息。 這正是該 multiplex方法的目的。

方法接受三個(gè)參數(shù)。 前兩個(gè)是返回訂閱和取消訂閱消息的函數(shù) 分別。 這些消息將在產(chǎn)生 Observable 的使用者使用時(shí)發(fā)送到服務(wù)器 訂閱和取消訂閱。 服務(wù)器可以使用它們來驗(yàn)證某種消息應(yīng)該啟動(dòng)還是停止 被轉(zhuǎn)發(fā)給客戶。 對(duì)于上述示例應(yīng)用程序,在獲得帶有正確標(biāo)識(shí)符的訂閱消息后, 網(wǎng)關(guān)服務(wù)器可以決定應(yīng)連接到真實(shí)的體育新聞服務(wù)并開始從中轉(zhuǎn)發(fā)消息。 請(qǐng)注意,這兩個(gè)消息都將按函數(shù)返回的方式發(fā)送,默認(rèn)情況下,它們使用 JSON.stringify 進(jìn)行序列化,只是 作為通過推送的消息 next。 還請(qǐng)記住,這些消息將在 發(fā)送, 每次 訂閱時(shí) 并且 取消訂閱。 這是潛在的危險(xiǎn),因?yàn)?Observable 的一個(gè)使用者可能會(huì)取消訂閱,并且服務(wù)器 由于收到取消訂閱的消息,可能會(huì)停止發(fā)送消息。 這需要處理 在服務(wù)器上或 使用 publish在從“多路復(fù)用”返回的 Observable 上 。

的最后一個(gè)參數(shù) multiplex是 的 messageFilter應(yīng)返回布爾值 函數(shù)。 用于過濾郵件 由服務(wù)器發(fā)送給僅屬于模擬 WebSocket 流的服務(wù)器。 例如,服務(wù)器可能會(huì)標(biāo)記這些 消息對(duì)象上帶有某種字符串標(biāo)識(shí)符的消息,并且 messageFilter將返回 true 如果套接字發(fā)出的對(duì)象上有這樣的標(biāo)識(shí)符。 消息返回 falsemessageFilter簡(jiǎn)單地跳過, 并且不會(huì)順流而下。

返回值 multiplex是 Observable,其中包含來自模擬套接字連接的消息。 請(qǐng)注意 不是 WebSocketSubject,因此調(diào)用 nextmultiplex再次失敗。 用于將值推向 服務(wù)器,使用 root WebSocketSubject。

例子

偵聽來自服務(wù)器的消息

import { webSocket } from "rxjs/webSocket";
const subject = webSocket("ws://localhost:8081");


subject.subscribe(
   msg => console.log('message received: ' + msg), // Called whenever there is a message from the server.
   err => console.log(err), // Called if at any point WebSocket API signals some kind of error.
   () => console.log('complete') // Called when connection is closed (for whatever reason).
 );

將消息推送到服務(wù)器

import { webSocket } from "rxjs/webSocket";
const subject = webSocket('ws://localhost:8081');


subject.subscribe();
// Note that at least one consumer has to subscribe to the created subject - otherwise "nexted" values will be just buffered and not sent,
// since no connection was established!


subject.next({message: 'some message'});
// This will send a message to the server once a connection is made. Remember value is serialized with JSON.stringify by default!


subject.complete(); // Closes the connection.


subject.error({code: 4000, reason: 'I think our app just broke!'});
// Also closes the connection, but let's the server know that this closing is caused by some error.

多路 WebSocket

import { webSocket } from "rxjs/webSocket";
const subject = webSocket('ws://localhost:8081');


const observableA = subject.multiplex(
  () => ({subscribe: 'A'}), // When server gets this message, it will start sending messages for 'A'...
  () => ({unsubscribe: 'A'}), // ...and when gets this one, it will stop.
  message => message.type === 'A' // If the function returns `true` message is passed down the stream. Skipped if the function returns false.
);


const observableB = subject.multiplex( // And the same goes for 'B'.
  () => ({subscribe: 'B'}),
  () => ({unsubscribe: 'B'}),
  message => message.type === 'B'
);


const subA = observableA.subscribe(messageForA => console.log(messageForA));
// At this moment WebSocket connection is established. Server gets '{"subscribe": "A"}' message and starts sending messages for 'A',
// which we log here.


const subB = observableB.subscribe(messageForB => console.log(messageForB));
// Since we already have a connection, we just send '{"subscribe": "B"}' message to the server. It starts sending messages for 'B',
// which we log here.


subB.unsubscribe();
// Message '{"unsubscribe": "B"}' is sent to the server, which stops sending 'B' messages.


subA.unsubscribe();
// Message '{"unsubscribe": "A"}' makes the server stop sending messages for 'A'. Since there is no more subscribers to root Subject,
// socket connection closes.
以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)