RxJS 運(yùn)算符

2020-09-25 17:12 更新

盡管 Observable 是基礎(chǔ),但 RxJS 對(duì)它的運(yùn)算符最有用。運(yùn)算符是使復(fù)雜的異步代碼易于以聲明的方式編寫(xiě)的基本組成部分。

什么是運(yùn)算符?

運(yùn)算符就是功能。運(yùn)算符有兩種:

可管道運(yùn)算符是可以使用語(yǔ)法通過(guò)管道傳遞給 Observables 的類(lèi)型 observableInstance.pipe(operator())。這些包括filter(...)mergeMap(...)。調(diào)用時(shí),它們不會(huì)更改現(xiàn)有的 Observable 實(shí)例。相反,它們返回一個(gè)新的 Observable,其訂閱邏輯基于第一個(gè) Observable。

管道運(yùn)算符是一個(gè)將 Observable 作為其輸入并返回另一個(gè) Observable 的函數(shù)。這是一個(gè)純粹的操作:以前的 Observable 保持不變。

管道運(yùn)算符本質(zhì)上是一個(gè)純函數(shù),它將一個(gè) Observable 用作輸入并生成另一個(gè) Observable 作為輸出。訂閱輸出 Observable 也將訂閱輸入 Observable。

創(chuàng)建運(yùn)算符是另一種運(yùn)算符,可以稱為獨(dú)立函數(shù)來(lái)創(chuàng)建新的 Observable。例如:of(1, 2, 3)創(chuàng)建一個(gè)可觀察物體,該物體將依次發(fā)射 1、2 和 3。創(chuàng)建運(yùn)算符將在后面的部分中詳細(xì)討論。

例如,被調(diào)用的運(yùn)算符 map 類(lèi)似于同名的 Array 方法。就像 [1, 2, 3].map(x => x * x)yield一樣[1, 4, 9],Observable 創(chuàng)建如下:

import { of } from 'rxjs';
import { map } from 'rxjs/operators';


map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));


// Logs:
// value: 1 
// value: 4
// value: 9

會(huì)散發(fā)出14,9。另一個(gè)有用的運(yùn)算符是 first

import { of } from 'rxjs';
import { first } from 'rxjs/operators';


first()(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));


// Logs:
// value: 1

請(qǐng)注意,map邏輯上必須動(dòng)態(tài)構(gòu)建,因?yàn)楸仨殲槠涮峁┯成涔δ?。相比之下,?first 可能是一個(gè)常數(shù),但是仍然是動(dòng)態(tài)構(gòu)建的。通常,無(wú)論是否需要參數(shù),都構(gòu)造所有運(yùn)算符。

管道

Pipeable 運(yùn)營(yíng)商的功能,所以他們可以像使用普通的功能:op()(obs)-但在實(shí)踐中,往往是很多人一起卷積,并迅速成為不可讀:op4()(op3()(op2()(op1()(obs))))。因此,Observables 具有一種稱為的方法.pipe(),該方法可以完成相同的操作,但更易于閱讀:

obs.pipe(
  op1(),
  op2(),
  op3(),
  op3(),
)

從風(fēng)格上講 op()(obs),即使只有一個(gè)運(yùn)算符,也不要使用。obs.pipe(op())是普遍首選的。

創(chuàng)建運(yùn)算符

什么是創(chuàng)作運(yùn)算符?與管道運(yùn)算符不同,創(chuàng)建運(yùn)算符是可用于創(chuàng)建具有某些常見(jiàn)預(yù)定義行為或通過(guò)加入其他 Observable 的 Observable 的函數(shù)。

創(chuàng)建運(yùn)算符的典型示例是 interval 函數(shù)。它以數(shù)字(不是 Observable)作為輸入?yún)?shù),并產(chǎn)生 Observable 作為輸出:

import { interval } from 'rxjs';


const observable = interval(1000 /* number of milliseconds */);

在這里查看所有靜態(tài)創(chuàng)建運(yùn)算符的列表。

高階可觀察物

可觀察對(duì)象最通常發(fā)出諸如字符串和數(shù)字之類(lèi)的普通值,但令人驚訝的是,經(jīng)常需要處理可觀察對(duì)象可觀察對(duì)象,即所謂的高階可觀察對(duì)象。例如,假設(shè)您有一個(gè)Observable發(fā)射字符串,這些字符串是您想要查看的文件的URL。代碼可能看起來(lái)像這樣:

const fileObservable = urlObservable.pipe(
   map(url => http.get(url)),
);

http.get()為每個(gè)單獨(dú)的 URL 返回一個(gè) Observable(可能是字符串或字符串?dāng)?shù)組)?,F(xiàn)在您有了一個(gè) Observables Observables,一個(gè)更高階的 Observable。

但是如何處理高階 Observable?通常,通過(guò)展:通過(guò)(以某種方式)將高階 Observable 轉(zhuǎn)換為普通 Observable。例如:

const fileObservable = urlObservable.pipe(
   map(url => http.get(url)),
);

concatAll()操作者訂閱了各“內(nèi)部”可觀察所散發(fā)出來(lái)的“外”觀察的,和復(fù)制所有所發(fā)射的值,直到該可觀察完成,并繼續(xù)到下一個(gè)。所有值都以這種方式連接在一起。其他有用的扁平化運(yùn)算符(稱為連接運(yùn)算符)是

  • mergeAll() —訂閱每個(gè)內(nèi)部 Observable 的到達(dá),然后在到達(dá)時(shí)發(fā)出每個(gè)值
  • switchAll() —在第一個(gè)內(nèi)部 Observable 到達(dá)時(shí)訂閱它,并在到達(dá)時(shí)發(fā)出每個(gè)值,但是在下一個(gè)內(nèi)部Observable到達(dá)時(shí),取消訂閱前一個(gè),并訂閱新值。
  • exhaust() —在第一個(gè)內(nèi)部 Observable 到達(dá)時(shí)訂閱它,并在到達(dá)時(shí)發(fā)出每個(gè)值,并丟棄所有新到達(dá)的內(nèi)部Observable,直到第一個(gè)完成時(shí),然后等待下一個(gè)內(nèi)部Observable。

正如許多陣列庫(kù)結(jié)合 map()flat()(或 flatten())成一個(gè)單一的 flatMap(),也有全部的 RxJS 映射當(dāng)量壓扁運(yùn)營(yíng)商 concatMap()``mergeMap()``switchMap(),和 exhaustMap()

大理石圖

為了解釋操作員的工作方式,文字描述通常是不夠的。許多操作員都與時(shí)間有關(guān),例如,他們可能以不同的方式延遲,采樣,節(jié)流或消除反跳值。圖通常是一個(gè)更好的工具。大理石圖是操作員如何工作的直觀表示,包括輸入的 Observable,操作員及其參數(shù)以及輸出的 Observable。

在大理石圖中,時(shí)間向右流動(dòng),并且該圖描述了如何在 Observable 執(zhí)行中發(fā)出值(“大理石”)。

您可以在下面看到大理石圖的解剖圖。

img

在整個(gè)文檔站點(diǎn)中,我們廣泛使用大理石圖來(lái)說(shuō)明操作員的工作方式。它們?cè)谄渌h(huán)境中也可能確實(shí)有用,例如在白板上,甚至在我們的單元測(cè)試中(如ASCII圖)。

運(yùn)營(yíng)商類(lèi)別

存在用于不同目的的運(yùn)算符,它們可以歸類(lèi)為:創(chuàng)建,轉(zhuǎn)換,過(guò)濾,聯(lián)接,多播,錯(cuò)誤處理,實(shí)用程序等。在下面的列表中,您將找到按類(lèi)別組織的所有運(yùn)算符。

有關(guān)完整概述,請(qǐng)參見(jiàn)參考頁(yè)。

創(chuàng)建運(yùn)算符

  • ajax
  • bindCallback
  • bindNodeCallback
  • defer
  • empty
  • from
  • fromEvent
  • fromEventPattern
  • generate
  • interval
  • of
  • range
  • throwError
  • timer
  • iif

加入創(chuàng)作運(yùn)營(yíng)商

這些是 Observable 創(chuàng)建運(yùn)算符,它們也具有聯(lián)接功能-發(fā)出多個(gè)源 Observable 的值。

  • combineLatest
  • concat
  • forkJoin
  • merge
  • partition
  • race
  • zip

轉(zhuǎn)型運(yùn)營(yíng)商

  • buffer
  • bufferCount
  • bufferTime
  • bufferToggle
  • bufferWhen
  • concatMap
  • concatMapTo
  • exhaust
  • exhaustMap
  • expand
  • groupBy
  • map
  • mapTo
  • mergeMap
  • mergeMapTo
  • mergeScan
  • pairwise
  • partition
  • pluck
  • scan
  • switchMap
  • switchMapTo
  • window
  • windowCount
  • windowTime
  • windowToggle
  • windowWhen

篩選運(yùn)算符

  • audit
  • auditTime
  • debounce
  • debounceTime
  • distinct
  • distinctKey
  • distinctUntilChanged
  • distinctUntilKeyChanged
  • elementAt
  • filter
  • first
  • ignoreElements
  • last
  • sample
  • sampleTime
  • single
  • skip
  • skipLast
  • skipUntil
  • skipWhile
  • take
  • takeLast
  • takeUntil
  • takeWhile
  • throttle
  • throttleTime

加盟運(yùn)營(yíng)商

另請(qǐng)參見(jiàn)上面的“ 加入創(chuàng)建運(yùn)算符”部分。

  • combineAll
  • concatAll
  • exhaust
  • mergeAll
  • startWith
  • withLatestFrom

組播運(yùn)營(yíng)商

  • multicast
  • publish
  • publishBehavior
  • publishLast
  • publishReplay
  • share

錯(cuò)誤處理運(yùn)算符

  • catchError
  • retry
  • retryWhen

公用事業(yè)運(yùn)營(yíng)商

  • tap
  • delay
  • delayWhen
  • dematerialize
  • materialize
  • observeOn
  • subscribeOn
  • timeInterval
  • timestamp
  • timeout
  • timeoutWith
  • toArray

條件運(yùn)算符和布爾運(yùn)算符

  • defaultIfEmpty
  • every
  • find
  • findIndex
  • isEmpty

數(shù)學(xué)運(yùn)算符和聚合運(yùn)算符

  • count
  • max
  • min
  • reduce

創(chuàng)建自定義運(yùn)算符

使用該 pipe()函數(shù)創(chuàng)建新運(yùn)算符

如果您的代碼中有一個(gè)常用的運(yùn)算符序列,請(qǐng)使用該 pipe()函數(shù)將該序列提取到新的運(yùn)算符中。即使序列不是那么常見(jiàn),將其分解為單個(gè)運(yùn)算符也可以提高可讀性。

例如,您可以創(chuàng)建一個(gè)將奇數(shù)值丟棄并且將偶數(shù)值加倍的函數(shù),如下所示:

import { pipe } from 'rxjs';
import { filter, map } from 'rxjs/operators';


function discardOddDoubleEven() {
  return pipe(
    filter(v => ! (v % 2)),
    map(v => v + v),
  );
}

(該 pipe()功能與.pipe()Observable上的方法類(lèi)似,但不相同。)

從頭開(kāi)始創(chuàng)建新的運(yùn)算符

它更復(fù)雜,但是如果您必須編寫(xiě)不能由現(xiàn)有運(yùn)算符的組合構(gòu)成的運(yùn)算符(這種情況很少發(fā)生),則可以使用 Observable 構(gòu)造函數(shù)從頭開(kāi)始編寫(xiě)運(yùn)算符,如下所示:

import { Observable } from 'rxjs';


function delay(delayInMillis) {
  return (observable) => new Observable(observer => {
    // this function will called each time this
    // Observable is subscribed to.
    const allTimerIDs = new Set();
    const subscription = observable.subscribe({
      next(value) {
        const timerID = setTimeout(() => {
          observer.next(value);
          allTimerIDs.delete(timerID);
        }, delayInMillis);
        allTimerIDs.add(timerID);
      },
      error(err) {
        observer.error(err);
      },
      complete() {
        observer.complete();
      }
    });
    // the return value is the teardown function,
    // which will be invoked when the new
    // Observable is unsubscribed from.
    return () => {
      subscription.unsubscribe();
      allTimerIDs.forEach(timerID => {
        clearTimeout(timerID);
      });
    }
  });
}

請(qǐng)注意,您必須

  1. 實(shí)現(xiàn)所有三個(gè)觀察功能 next(),error()以及 complete()訂閱輸入可觀察的時(shí)候。
  2. 實(shí)現(xiàn)“刪除”功能,該功能在 Observable 完成時(shí)清除(在這種情況下,通過(guò)取消訂閱并清除所有待處理的超時(shí))。
  3. 從傳遞給 Observable 構(gòu)造函數(shù)的函數(shù)中返回該拆解函數(shù)。

當(dāng)然,這僅是示例。該delay()運(yùn)營(yíng)商已經(jīng)存在。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)