IO.js Stream

2018-11-28 22:35 更新

穩(wěn)定度: 2 - 穩(wěn)定

流是一個被io.js內(nèi)部的許多對象所實(shí)現(xiàn)的抽象接口。例如一個發(fā)往HTTP服務(wù)器的請求是一個留,stdout也是一個流。流可以是可讀的,可寫的或雙向的。所有的流都是EventEmitter實(shí)例。

你可以通過require('stream')來取貨Stream的基類。其中包括了Readable流,Writable流,Duplex流和Transform流的基類。

此文檔分為三個章節(jié)。第一章節(jié)解釋了在你的編程中使用流時(shí)需要的API。如果你不需要實(shí)現(xiàn)你自己的流式API,你可以在這里停止。

第二章節(jié)解釋了你在構(gòu)建你自己的流時(shí)需要的API,這些API是為了方便你這么做而設(shè)計(jì)的。

第三章節(jié)深入講述了流的工作機(jī)制,包括一些內(nèi)部的機(jī)制和函數(shù),你不應(yīng)該去改動它們除非你知道你在做什么。

面向流消費(fèi)者的API

流可以是可讀的,可寫的,或雙工的。

所有的流都是EventEmitters。但是它們也各自有一些獨(dú)特的方法和屬性,這取決于它們是可讀流,可寫流或雙工流。

如果一個流同時(shí)是可讀的和可寫的,那么表示它實(shí)現(xiàn)了以下所有的方法和事件。所以,這些API同時(shí)也涵蓋DuplexTransform流,即使它們的實(shí)現(xiàn)可能有些不同。

在你程序中,為了消費(fèi)流而去實(shí)現(xiàn)流接口不是必須的。如果你確實(shí)正在你的程序中實(shí)現(xiàn)流接口,請參考下一章節(jié)面向流實(shí)現(xiàn)者的API。

幾乎所有io.js程序,不論多簡單,都使用了流。下面是一個在io.js是使用流的例子:

var http = require('http');

var server = http.createServer(function (req, res) {
  // req is an http.IncomingMessage, which is a Readable Stream
  // res is an http.ServerResponse, which is a Writable Stream

  var body = '';
  // we want to get the data as utf8 strings
  // If you don't set an encoding, then you'll get Buffer objects
  req.setEncoding('utf8');

  // Readable streams emit 'data' events once a listener is added
  req.on('data', function (chunk) {
    body += chunk;
  });

  // the end event tells you that you have entire body
  req.on('end', function () {
    try {
      var data = JSON.parse(body);
    } catch (er) {
      // uh oh!  bad json!
      res.statusCode = 400;
      return res.end('error: ' + er.message);
    }

    // write back something interesting to the user:
    res.write(typeof data);
    res.end();
  });
});

server.listen(1337);

// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// error: Unexpected token o

Class: stream.Readable

可讀流接口是一個你可以從之讀取數(shù)據(jù)的數(shù)據(jù)源的抽象。換句話說,數(shù)據(jù)從可讀流而來。

除非你指示已經(jīng)準(zhǔn)備好接受數(shù)據(jù),否則可讀流不會開始發(fā)生數(shù)據(jù)。

可讀流有兩個“模式”:流動模式和暫停模式。當(dāng)在流動模式時(shí),數(shù)據(jù)由底層系統(tǒng)讀出,并且會盡快地提供給你的程序。當(dāng)在暫停模式時(shí),你必須調(diào)用stream.read()方法來獲取數(shù)據(jù)塊。流默認(rèn)是暫停模式。

注意:如果data事件沒有被綁定監(jiān)聽器,并且沒有導(dǎo)流(pipe)目標(biāo),并且流被切換到了流動模式,那么數(shù)據(jù)將會被丟失。

你可以通過下面任意一個做法切換到流動模式:

  • 添加一個data事件的監(jiān)聽器來監(jiān)聽數(shù)據(jù)。

  • 調(diào)用resume()方法來明確開啟流動模式。

  • 調(diào)用pipe()方法將數(shù)據(jù)導(dǎo)入一個可寫流。

你可以同意下面任意一種方法切換回暫停模式:

  • 如果沒有導(dǎo)流(pipe)目標(biāo),調(diào)用pause()方法。

  • 如果有導(dǎo)流(pipe)目標(biāo),移除所有的data事件監(jiān)聽器,并且通過unpipe()方法移除所有導(dǎo)流目標(biāo)。

注意,由于為了向后兼任的原因,移除data事件的監(jiān)聽器將不會自動暫停流。同樣的,如果有導(dǎo)流目標(biāo),調(diào)用pause()方法將不會保證目標(biāo)流排空并請求更多數(shù)據(jù)時(shí)保持暫停。

一些內(nèi)置的可讀流例子:

  • 客戶端的HTTP請求
  • 服務(wù)端的HTTP響應(yīng)
  • 文件系統(tǒng)讀取流
  • zlib
  • crypto
  • tcp sockets
  • 子進(jìn)程的stdout和stderr
  • process.stdin

Event: 'readable'

當(dāng)一個數(shù)據(jù)塊能可以從流中被讀出時(shí),會觸發(fā)一個readable事件。

某些情況下,監(jiān)聽一個readable事件會導(dǎo)致一些將要被讀出的數(shù)據(jù)從底層系統(tǒng)進(jìn)入內(nèi)部緩沖,如果它沒有準(zhǔn)備好。

var readable = getReadableStreamSomehow();
readable.on('readable', function() {
  // there is some data to read now
});

當(dāng)內(nèi)部緩沖被排空時(shí),一旦有更多數(shù)據(jù),readable事件會再次觸發(fā)。

Event: 'data'

  • chunk Buffer | String 數(shù)據(jù)塊

為一個沒有被暫停的流添加一個data事件的監(jiān)聽器會使其切換到流動模式。之后數(shù)據(jù)會被盡快得傳遞給用戶。

如果你只是想盡快得從流中取得所有數(shù)據(jù),這是最好的方式。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
});

Event: 'end'

當(dāng)沒有更多可讀的數(shù)據(jù)時(shí)這個事件會被觸發(fā)。

注意,除非數(shù)據(jù)被完全消費(fèi),end事件才會觸發(fā)。這可以通過切換到流動模式,或重復(fù)調(diào)用read()方法。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
});
readable.on('end', function() {
  console.log('there will be no more data.');
});

Event: 'close'

當(dāng)?shù)讓淤Y源(如源頭的文件描述符)被關(guān)閉時(shí)觸發(fā)。不是所有的流都會觸發(fā)這個事件。

Event: 'error'

  • Error Object

當(dāng)接受數(shù)據(jù)時(shí)有錯誤發(fā)生,會觸發(fā)此事件。

readable.read([size])

  • size Number 可選,指定讀取數(shù)據(jù)的數(shù)量
  • Return String | Buffer | null

read()方法從內(nèi)部緩沖中取出數(shù)據(jù)并返回它。如果沒有可用數(shù)據(jù),那么將返回null。

如果你傳遞了一個size參數(shù),那么它將返回指定字節(jié)的數(shù)據(jù)。如果size參數(shù)的字節(jié)數(shù)不可用,那么將返回null。

如果你不指定size參數(shù),那么將會返回內(nèi)部緩沖中的所有數(shù)據(jù)。

這個方法只能在暫定模式中被調(diào)用。在流動模式下,這個方法會被自動地重復(fù)調(diào)用,知道內(nèi)部緩沖被排空。

var readable = getReadableStreamSomehow();
readable.on('readable', function() {
  var chunk;
  while (null !== (chunk = readable.read())) {
    console.log('got %d bytes of data', chunk.length);
  }
});

如果這個方法返回一個數(shù)據(jù)塊,那么它也會觸發(fā)data事件。

readable.setEncoding(encoding)

  • encoding String 使用的編碼
  • Return: this

調(diào)用這個函數(shù)會導(dǎo)致流返回指定編碼的字符串而不是Buffer對象。例如,如果你調(diào)用readable.setEncoding('utf8'),那么輸出的數(shù)據(jù)將被解釋為UTF-8數(shù)據(jù),并且作為字符串返回。如果你調(diào)用了readable.setEncoding('hex'),那么數(shù)據(jù)將被使用十六進(jìn)制字符串的格式編碼。

該方法可以正確地處理多字節(jié)字符。如果你只是簡單地直接取出緩沖并且對它們調(diào)用buf.toString(encoding),將會導(dǎo)致錯位。如果你想使用字符串讀取數(shù)據(jù),請使用這個方法。

var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
});

readable.resume()

  • Return: this

這個方法將會讓可讀流繼續(xù)觸發(fā)data事件。

這個方法將會使流切換至流動模式。如果你不想消費(fèi)流中的數(shù)據(jù),但你想監(jiān)聽它的end事件,你可以通過調(diào)用readable.resume()來打開數(shù)據(jù)流。

var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function() {
  console.log('got to the end, but did not read anything');
});

readable.pause()

  • Return: this

這個方法會使一個處于流動模式的流停止觸發(fā)data事件,并切換至?xí)和DJ?。所有可用的?shù)據(jù)將仍然存在于內(nèi)部緩沖中。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
  readable.pause();
  console.log('there will be no more data for 1 second');
  setTimeout(function() {
    console.log('now data will start flowing again');
    readable.resume();
  }, 1000);
});

readable.isPaused()

  • Return: Boolean

這個方法會返回流是否被客戶端代碼所暫停(調(diào)用readable.pause(),并且沒有在之后調(diào)用readable.resume())。

var readable = new stream.Readable

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false

readable.pipe(destination[, options])

  • destination Writable Stream 寫入數(shù)據(jù)的目標(biāo)
  • options Object

  • end Boolean 當(dāng)讀取者結(jié)束時(shí)結(jié)束寫入者。默認(rèn)為true

這個方法會取出可讀流中所有的數(shù)據(jù),并且將之寫入指定的目標(biāo)。這個方法會自動調(diào)節(jié)流量,所以當(dāng)快速讀取可讀流時(shí)目標(biāo)不會溢出。

可以將數(shù)據(jù)安全地導(dǎo)流至多個目標(biāo)。

var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
readable.pipe(writable);

這個函數(shù)返回目標(biāo)流,所以你可以鏈?zhǔn)秸{(diào)用pipe()

var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

例子,模仿UNIX的cat命令:

process.stdin.pipe(process.stdout);

默認(rèn)情況下,當(dāng)源流觸發(fā)end事件時(shí),目標(biāo)流會被調(diào)用end()方法,然后目標(biāo)就不再是可寫的了。將傳遞{ end: false }作為options參數(shù),將保持目標(biāo)流開啟。

例子,保持被寫入的流開啟,所以“Goodbye”可以在末端被寫入:

reader.pipe(writer, { end: false });
reader.on('end', function() {
  writer.end('Goodbye\n');
});

注意,不論指定任何options參數(shù),process.stderrprocess.stdout在程序退出前永遠(yuǎn)不會被關(guān)閉。

readable.unpipe([destination])

  • destination Writable Stream 可選,指定解除導(dǎo)流的流

這方法會移除之前調(diào)用pipe()方法所設(shè)置的鉤子。

如果沒有指定目標(biāo),那么所有的導(dǎo)流都會被移除。

如果指定了目標(biāo),但是并沒有為目標(biāo)設(shè)置導(dǎo)流,那么什么都不會發(fā)生。

var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(function() {
  console.log('stop writing to file.txt');
  readable.unpipe(writable);
  console.log('manually close the file stream');
  writable.end();
}, 1000);

readable.unshift(chunk)

  • chunk Buffer | String 要插回讀取隊(duì)列開頭的數(shù)據(jù)塊。

該方法在許多場景中都很有用,比如一個流正在被一個解析器消費(fèi),解析器可能需要將某些剛拉取出的數(shù)據(jù)“逆消費(fèi)”回來源,以便流能將它傳遞給其它消費(fèi)者。

如果你發(fā)現(xiàn)你必須經(jīng)常在你的程序中調(diào)用stream.unshift(chunk),你應(yīng)該考慮實(shí)現(xiàn)一個Transform流(參閱下文的面向流實(shí)現(xiàn)者的API)。

// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  var decoder = new StringDecoder('utf8');
  var header = '';
  function onReadable() {
    var chunk;
    while (null !== (chunk = stream.read())) {
      var str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // found the header boundary
        var split = str.split(/\n\n/);
        header += split.shift();
        var remaining = split.join('\n\n');
        var buf = new Buffer(remaining, 'utf8');
        if (buf.length)
          stream.unshift(buf);
        stream.removeListener('error', callback);
        stream.removeListener('readable', onReadable);
        // now the body of the message can be read from the stream.
        callback(null, header, stream);
      } else {
        // still reading the header.
        header += str;
      }
    }
  }
}

readable.wrap(stream)

  • stream Stream 一個“舊式”可讀流

Node.js v0.10 以及之前版本的流沒有完全包含如今的所有的流API(更多的信息請參閱下文的“兼容性”)。

如果你正在使用一個老舊的io.js庫,它觸發(fā)data時(shí)間并且有一個僅作查詢用途的pause()方法,那么你可以調(diào)用wrap()方法來創(chuàng)建一個使用“舊式”流作為數(shù)據(jù)源的可讀流。

你幾乎不會用到這個函數(shù),它的存在僅是為了老舊的io.js程序和庫交互。

例子:

var OldReader = require('./old-api-module.js').OldReader;
var oreader = new OldReader;
var Readable = require('stream').Readable;
var myReader = new Readable().wrap(oreader);

myReader.on('readable', function() {
  myReader.read(); // etc.
});

Class: stream.Writable

可寫流接口是一個你可以向其寫入數(shù)據(jù)的目標(biāo)的抽象。

一些內(nèi)部的可寫流例子:

  • 客戶端的http請求
  • 服務(wù)端的http響應(yīng)
  • 文件系統(tǒng)寫入流
  • zlib
  • crypto
  • tcp socket
  • 子進(jìn)程stdin
  • process.stdout,process.stderr

writable.write(chunk[, encoding][, callback])

  • chunk String | Buffer 要寫入的數(shù)據(jù)
  • encoding String 編碼,如果數(shù)據(jù)塊是字符串
  • callback Function 當(dāng)數(shù)據(jù)塊寫入完畢后調(diào)用的回調(diào)函數(shù)
  • Returns: Boolean 如果被全部處理則返回true

該方法向底層系統(tǒng)寫入數(shù)據(jù),并且當(dāng)數(shù)據(jù)被全部處理后調(diào)用指定的回調(diào)函數(shù)。

返回值指示了你是否可以立刻寫入數(shù)據(jù)。如果數(shù)據(jù)需要被內(nèi)部緩沖,會返回false。否則返回true

返回值經(jīng)供參考。即使返回false,你仍可以繼續(xù)寫入數(shù)據(jù)。但是,寫入的數(shù)據(jù)將會被緩沖在內(nèi)存里,所以最好不要這樣做。應(yīng)該在寫入更多數(shù)據(jù)前等待drain事件。

Event: 'drain'

如果一個writable.write(chunk)調(diào)用返回了false,那么drain事件會指示出可以繼續(xù)向流寫入數(shù)據(jù)的時(shí)機(jī)。

// Write the data to the supplied writable stream 1MM times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}

writable.cork()

強(qiáng)制滯留所有寫入。

滯留的數(shù)據(jù)會在調(diào)用.uncork().end()方法后被寫入。

writable.uncork()

寫入在調(diào)用.cork()方法所有被滯留的數(shù)據(jù)。

writable.setDefaultEncoding(encoding)

  • encoding String 新的默認(rèn)編碼

設(shè)置一個可寫流的默認(rèn)編碼。

writable.end([chunk][, encoding][, callback])

  • chunk String | Buffer 可選,寫入的數(shù)據(jù)
  • encoding String 編碼,如果數(shù)據(jù)塊是字符串
  • callback Function 可選,回調(diào)函數(shù)

當(dāng)沒有更多可寫的數(shù)據(jù)時(shí),調(diào)用這個方法。如果指定了回調(diào)函數(shù),那么會被添加為finish事件的監(jiān)聽器。

在調(diào)用了end()后調(diào)用write()會導(dǎo)致一個錯誤。

// write 'hello, ' and then end with 'world!'
var file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// writing more now is not allowed!

Event: 'finish'

當(dāng)調(diào)用了end()方法,并且所有的數(shù)據(jù)都被寫入了底層系統(tǒng),這個事件會被觸發(fā)。

var writer = getWritableStreamSomehow();
for (var i = 0; i < 100; i ++) {
  writer.write('hello, #' + i + '!\n');
}
writer.end('this is the end\n');
writer.on('finish', function() {
  console.error('all writes are now complete.');
});

Event: 'pipe'

  • src Readable Stream 對這個可寫流進(jìn)行導(dǎo)流的源可讀流

這個事件將會在可讀流被一個可寫流使用pipe()方法進(jìn)行導(dǎo)流時(shí)觸發(fā)。

var writer = getWritableStreamSomehow();
var reader = getReadableStreamSomehow();
writer.on('pipe', function(src) {
  console.error('something is piping into the writer');
  assert.equal(src, reader);
});
reader.pipe(writer);

Event: 'unpipe'

  • src Readable Stream 對這個可寫流停止導(dǎo)流的源可讀流

當(dāng)可讀流對其調(diào)用unpipe()方法,在源可讀流的目標(biāo)集合中刪除這個可寫流,這個事件將會觸發(fā)。

var writer = getWritableStreamSomehow();
var reader = getReadableStreamSomehow();
writer.on('unpipe', function(src) {
  console.error('something has stopped piping into the writer');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);

Event: 'error'

  • Error object

在寫入數(shù)據(jù)或?qū)Я靼l(fā)生錯誤時(shí)觸發(fā)。

Class: stream.Duplex

雙工是同時(shí)實(shí)現(xiàn)了可讀流與可寫流的借口。它的用處請參閱下文。

內(nèi)部雙工流的例子:

  • tcp socket
  • zlib
  • crypto

Class: stream.Transform

轉(zhuǎn)換流是一種輸出由輸入計(jì)算所得的栓共流。它們同時(shí)集成了可讀流與可寫流的借口。它們的用處請參閱下文。

內(nèi)部轉(zhuǎn)換流的例子:

  • zlib
  • crypto

面向流實(shí)現(xiàn)者的API

實(shí)現(xiàn)所有種類的流的模式都是一樣的:

  1. 為你的子類繼承合適的父類(util.inherits非常合適于做這個)。
  2. 為了保證內(nèi)部機(jī)制被正確初始化,在你的構(gòu)造函數(shù)中調(diào)用合適的父類構(gòu)造函數(shù)。
  3. 實(shí)現(xiàn)一個或多個特定的方法,參閱下文。

被擴(kuò)展的類和要實(shí)現(xiàn)的方法取決于你要編寫的流類的類型:

用途需要實(shí)現(xiàn)的方法
只讀Readable_read
只寫Writable_write, _writev
可讀以及可寫Duplex_read, _write, _writev
操作被寫入數(shù)據(jù),然后讀出結(jié)果Transform_transform, _flush

在你的實(shí)現(xiàn)代碼中,非常重要的一點(diǎn)是永遠(yuǎn)不要調(diào)用上文的面向流消費(fèi)者的API。否則,你在程序中消費(fèi)你的流接口時(shí)可能有潛在的副作用。

Class: stream.Readable

stream.Readable是一個被設(shè)計(jì)為需要實(shí)現(xiàn)底層的_read(size)方法的抽象類。

請參閱上文的面向流消費(fèi)者的API來了解如何在程序中消費(fèi)流。以下解釋了如果在你的程序中實(shí)現(xiàn)可讀流。

例子:一個計(jì)數(shù)流

這是一個可讀流的基礎(chǔ)例子。它從1到1,000,000遞增數(shù)字,然后結(jié)束。

var Readable = require('stream').Readable;
var util = require('util');
util.inherits(Counter, Readable);

function Counter(opt) {
  Readable.call(this, opt);
  this._max = 1000000;
  this._index = 1;
}

Counter.prototype._read = function() {
  var i = this._index++;
  if (i > this._max)
    this.push(null);
  else {
    var str = '' + i;
    var buf = new Buffer(str, 'ascii');
    this.push(buf);
  }
};

例子:簡單協(xié)議 v1 (次優(yōu))

這類似于上文中提到的parseHeader函數(shù),但是使用一個自定義流實(shí)現(xiàn)。另外,注意這個實(shí)現(xiàn)不將流入的數(shù)據(jù)轉(zhuǎn)換為字符串。

更好地實(shí)現(xiàn)是作為一個轉(zhuǎn)換流實(shí)現(xiàn),請參閱下文更好地實(shí)現(xiàn)。

// A parser for a simple data protocol.
// The "header" is a JSON object, followed by 2 \n characters, and
// then a message body.
//
// NOTE: This can be done more simply as a Transform stream!
// Using Readable directly for this is sub-optimal.  See the
// alternative example below under the Transform section.

var Readable = require('stream').Readable;
var util = require('util');

util.inherits(SimpleProtocol, Readable);

function SimpleProtocol(source, options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(source, options);

  Readable.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;

  // source is a readable stream, such as a socket or file
  this._source = source;

  var self = this;
  source.on('end', function() {
    self.push(null);
  });

  // give it a kick whenever the source is readable
  // read(0) will not consume any bytes
  source.on('readable', function() {
    self.read(0);
  });

  this._rawHeader = [];
  this.header = null;
}

SimpleProtocol.prototype._read = function(n) {
  if (!this._inBody) {
    var chunk = this._source.read();

    // if the source doesn't have data, we don't have data yet.
    if (chunk === null)
      return this.push('');

    // check if the chunk has a \n\n
    var split = -1;
    for (var i = 0; i < chunk.length; i++) {
      if (chunk[i] === 10) { // '\n'
        if (this._sawFirstCr) {
          split = i;
          break;
        } else {
          this._sawFirstCr = true;
        }
      } else {
        this._sawFirstCr = false;
      }
    }

    if (split === -1) {
      // still waiting for the \n\n
      // stash the chunk, and try again.
      this._rawHeader.push(chunk);
      this.push('');
    } else {
      this._inBody = true;
      var h = chunk.slice(0, split);
      this._rawHeader.push(h);
      var header = Buffer.concat(this._rawHeader).toString();
      try {
        this.header = JSON.parse(header);
      } catch (er) {
        this.emit('error', new Error('invalid simple protocol data'));
        return;
      }
      // now, because we got some extra data, unshift the rest
      // back into the read queue so that our consumer will see it.
      var b = chunk.slice(split);
      this.unshift(b);

      // and let them know that we are done parsing the header.
      this.emit('header', this.header);
    }
  } else {
    // from there on, just provide the data to our consumer.
    // careful not to push(null), since that would indicate EOF.
    var chunk = this._source.read();
    if (chunk) this.push(chunk);
  }
};

// Usage:
// var parser = new SimpleProtocol(source);
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.

new stream.Readable([options])

  • options Object

  • highWaterMark Number 在停止從底層資源讀取之前,在內(nèi)部緩沖中存儲的最大字節(jié)數(shù)。默認(rèn)為16kb,對于objectMode則是16
  • encoding String 如果被指定,那么緩沖將被利用指定編碼解碼為字符串,默認(rèn)為null
  • objectMode Boolean 是否該流應(yīng)該表現(xiàn)如一個對象的流。意思是說stream.read(n)返回一個單獨(dú)的對象而不是一個大小為nBuffer,默認(rèn)為false

在實(shí)現(xiàn)了Readable類的類中,請確保調(diào)用了Readable構(gòu)造函數(shù),這樣緩沖設(shè)置才能被正確的初始化。

readable._read(size)

  • size Number 異步讀取數(shù)據(jù)的字節(jié)數(shù)

注意:實(shí)現(xiàn)這個函數(shù),而不要直接調(diào)用這個函數(shù)。

這個函數(shù)不應(yīng)該被直接調(diào)用。它應(yīng)該被子類實(shí)現(xiàn),并且僅被Readable類的內(nèi)部方法調(diào)用。

所有的可讀流都必須實(shí)現(xiàn)這個方法用來從底層資源中獲取數(shù)據(jù)。

這個函數(shù)有一個下劃線前綴,因?yàn)樗鼘τ陬愂莾?nèi)部的,并應(yīng)該直接被用戶的程序調(diào)用。你應(yīng)在你的拓展類里覆蓋這個方法。

當(dāng)數(shù)據(jù)可用時(shí),調(diào)用readable.push(chunk)方法將之推入讀取隊(duì)列。如果方法返回false,那么你應(yīng)當(dāng)停止讀取。當(dāng)_read方法再次被調(diào)用,你應(yīng)當(dāng)推入更多數(shù)據(jù)。

參數(shù)size僅作查詢?!皉ead”調(diào)用返回?cái)?shù)據(jù)的實(shí)現(xiàn)可以通過這個參數(shù)來知道應(yīng)當(dāng)抓取多少數(shù)據(jù);其余與之無關(guān)的實(shí)現(xiàn),比如TCP或TLS,則可忽略這個參數(shù),并在可用時(shí)返回?cái)?shù)據(jù)。例如,沒有必要“等到”size個字節(jié)可用時(shí)才調(diào)用stream.push(chunk)。

readable.push(chunk[, encoding])

  • chunk Buffer | null | String 被推入讀取隊(duì)列的數(shù)據(jù)塊
  • encoding String 字符串?dāng)?shù)據(jù)塊的編碼。必須是一個合法的Buffer編碼,如'utf8'或'ascii'
  • return Boolean 是否應(yīng)該繼續(xù)推入

注意:這個函數(shù)應(yīng)該被Readable流的實(shí)現(xiàn)者調(diào)用,而不是消費(fèi)者。

_read()函數(shù)在至少調(diào)用一次push(chunk)方法前,不會被再次調(diào)用。

Readable類通過在readable事件觸發(fā)時(shí),調(diào)用read()方法將數(shù)據(jù)推入 之后用于讀出數(shù)據(jù)的讀取隊(duì)列 來工作。

push()方法需要明確地向讀取隊(duì)列中插入數(shù)據(jù)。如果它的參數(shù)為null,那么它將發(fā)送一個數(shù)據(jù)結(jié)束信號(EOF)。

這個API被設(shè)計(jì)為盡可能的靈活。例如,你可能正在包裝一個有pause/resume機(jī)制和一個數(shù)據(jù)回調(diào)函數(shù)的低級別源。那那些情況下,你可以通過以下方式包裝這些低級別源:

// source is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.

util.inherits(SourceWrapper, Readable);

function SourceWrapper(options) {
  Readable.call(this, options);

  this._source = getLowlevelSourceObject();
  var self = this;

  // Every time there's data, we push it into the internal buffer.
  this._source.ondata = function(chunk) {
    // if push() returns false, then we need to stop reading from source
    if (!self.push(chunk))
      self._source.readStop();
  };

  // When the source ends, we push the EOF-signaling `null` chunk
  this._source.onend = function() {
    self.push(null);
  };
}

// _read will be called when the stream wants to pull more data in
// the advisory size argument is ignored in this case.
SourceWrapper.prototype._read = function(size) {
  this._source.readStart();
};

Class: stream.Writable

stream.Writable是一個被設(shè)計(jì)為需要實(shí)現(xiàn)底層的_write(chunk, encoding, callback)方法的抽象類。

請參閱上文的面向流消費(fèi)者的API來了解如何在程序中消費(fèi)流。以下解釋了如果在你的程序中實(shí)現(xiàn)可寫流。

new stream.Writable([options])

  • options Object

  • highWaterMark Number write()方法開始返回false的緩沖級別。默認(rèn)為16kb,對于objectMode流則是16
  • decodeStrings Boolean 是否在傳遞給write()方法前將字符串解碼成Buffer。默認(rèn)為true
  • objectMode Boolean 是否write(anyObj)為一個合法操作。如果設(shè)置為true你可以寫入任意數(shù)據(jù)而不僅是Buffer或字符串?dāng)?shù)據(jù)。默認(rèn)為false

在實(shí)現(xiàn)了Writable類的類中,請確保調(diào)用了Writable構(gòu)造函數(shù),這樣緩沖設(shè)置才能被正確的初始化。

writable._write(chunk, encoding, callback)

  • chunk Buffer | String 將要被寫入的數(shù)據(jù)塊。除非decodeStrings配置被設(shè)置為false,否則將一直是一個buffer
  • encoding String 如果數(shù)據(jù)塊是一個字符串,那么這就是編碼的類型。如果是一個buffer,那么則會忽略它
  • callback Function 當(dāng)你處理完給定的數(shù)據(jù)塊后調(diào)用這個函數(shù)

所有的Writable流的實(shí)現(xiàn)都必須提供一個_write()方法來給底層資源傳輸數(shù)據(jù)。

這個函數(shù)不應(yīng)該被直接調(diào)用。它應(yīng)該被子類實(shí)現(xiàn),并且僅被Writable類的內(nèi)部方法調(diào)用。

回調(diào)函數(shù)使用標(biāo)準(zhǔn)的callback(error)模式來表示這個寫操作成功或發(fā)生了錯誤。

如果構(gòu)造函數(shù)選項(xiàng)中設(shè)置了decodeStrings標(biāo)志,那么數(shù)據(jù)塊將是一個字符串而不是一個Buffer,編碼將會決定字符串的類型。這個是為了幫助處理編碼字符串的實(shí)現(xiàn)。如果你沒有明確地將decodeStrings選項(xiàng)設(shè)為false,那么你會安全地忽略encoding參數(shù),并且數(shù)據(jù)塊是Buffer形式。

這個函數(shù)有一個下劃線前綴,因?yàn)樗鼘τ陬愂莾?nèi)部的,并應(yīng)該直接被用戶的程序調(diào)用。你應(yīng)在你的拓展類里覆蓋這個方法。

writable._writev(chunks, callback)

  • chunks Array 將被寫入的數(shù)據(jù)塊數(shù)組。其中每一個數(shù)據(jù)都有如下格式:{ chunk: ..., encoding: ... }
  • callback Function 當(dāng)你處理完給定的數(shù)據(jù)塊后調(diào)用這個函數(shù)

注意:這個函數(shù)不應(yīng)該被直接調(diào)用。它應(yīng)該被子類實(shí)現(xiàn),并且僅被Writable類的內(nèi)部方法調(diào)用。

這個函數(shù)對于你的實(shí)現(xiàn)是完全可選的。大多數(shù)情況下它是不必的。如果實(shí)現(xiàn),它會被以所有滯留在寫入隊(duì)列中的數(shù)據(jù)塊調(diào)用。

Class: stream.Duplex

一個“雙工”流既是可讀的,又是可寫的。如TCPsocket連接。

注意,和你實(shí)現(xiàn)ReadableWritable流時(shí)一樣,stream.Duplex是一個被設(shè)計(jì)為需要實(shí)現(xiàn)底層的_read(size)_write(chunk, encoding, callback)方法的抽象類。

由于JavaScript并不具備多繼承能力,這個類是繼承于Readable類,并寄生于Writable類。所以為了實(shí)現(xiàn)這個類,用戶需要同時(shí)實(shí)現(xiàn)低級別的_read(n)方法和低級別的_write(chunk, encoding, callback)方法。

new stream.Duplex(options)

  • options Object 同時(shí)傳遞給WritableReadable構(gòu)造函數(shù)。并且包含以下屬性:

  • allowHalfOpen Boolean 默認(rèn)為true。如果設(shè)置為false,那么流的可讀的一端結(jié)束時(shí)可寫的一端也會自動結(jié)束,反之亦然。
  • readableObjectMode Boolean 默認(rèn)為false,為流的可讀的一端設(shè)置objectMode。當(dāng)objectModetrue時(shí)沒有效果。
  • writableObjectMode Boolean 默認(rèn)為false,為流的可寫的一端設(shè)置objectMode。當(dāng)objectModetrue時(shí)沒有效果。

在實(shí)現(xiàn)了Duplex類的類中,請確保調(diào)用了Duplex構(gòu)造函數(shù),這樣緩沖設(shè)置才能被正確的初始化。

Class: stream.Transform

“轉(zhuǎn)換”流是一個輸出于輸入存在對應(yīng)關(guān)系的雙工流,如一個zilib流或一個crypto流。

輸出和輸出并不需要有相同的大小,相同的數(shù)據(jù)塊數(shù)或同時(shí)到達(dá)。例如,一個哈希流只有一個單獨(dú)數(shù)據(jù)塊的輸出當(dāng)輸入結(jié)束時(shí)。一個zlib流的輸出比其輸入小得多或大得多。

除了實(shí)現(xiàn)_read()方法和_write()方法,轉(zhuǎn)換流還必須實(shí)現(xiàn)_transform()方法,并且可選地實(shí)現(xiàn)_flush()方法(參閱下文)。

new stream.Transform([options])

  • options Object 同時(shí)傳遞給WritableReadable構(gòu)造函數(shù)。

在實(shí)現(xiàn)了Transform類的類中,請確保調(diào)用了Transform構(gòu)造函數(shù),這樣緩沖設(shè)置才能被正確的初始化。

transform._transform(chunk, encoding, callback)

  • chunk Buffer | String 將要被寫入的數(shù)據(jù)塊。除非decodeStrings配置被設(shè)置為false,否則將一直是一個buffer
  • encoding String 如果數(shù)據(jù)塊是一個字符串,那么這就是編碼的類型。如果是一個buffer,那么則會忽略它
  • callback Function 當(dāng)你處理完給定的數(shù)據(jù)塊后調(diào)用這個函數(shù)

這個函數(shù)不應(yīng)該被直接調(diào)用。它應(yīng)該被子類實(shí)現(xiàn),并且僅被Transform類的內(nèi)部方法調(diào)用。

所有Transform流的實(shí)現(xiàn)都必須提供一個_transform方法來接受輸入和產(chǎn)生輸出。

Transform類中,_transform可以做需要做的任何事,如處理需要寫入的字節(jié),將它們傳遞給可寫端,異步I/O,等等。

調(diào)用transform.push(outputChunk)0次或多次來從輸入的數(shù)據(jù)塊產(chǎn)生輸出,取決于你想從這個數(shù)據(jù)塊中輸出多少數(shù)據(jù)作為結(jié)果。

僅當(dāng)目前的數(shù)據(jù)塊被完全消費(fèi)后,才會調(diào)用回調(diào)函數(shù)。注意,對于某些特殊的輸入可能會沒有輸出。如果你將數(shù)據(jù)作為第二個參數(shù)傳入回調(diào)函數(shù),那么數(shù)據(jù)將被傳遞給push方法。換句話說,下面的兩個例子是相等的:

transform.prototype._transform = function (data, encoding, callback) {
  this.push(data);
  callback();
}

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data);
}

這個函數(shù)有一個下劃線前綴,因?yàn)樗鼘τ陬愂莾?nèi)部的,并應(yīng)該直接被用戶的程序調(diào)用。你應(yīng)在你的拓展類里覆蓋這個方法。

transform._flush(callback)

  • callback Function 當(dāng)你排空了所有剩余數(shù)據(jù)后,這個回調(diào)函數(shù)會被調(diào)用

注意:這個函數(shù)不應(yīng)該被直接調(diào)用。它應(yīng)該被子類實(shí)現(xiàn),并且僅被Transform類的內(nèi)部方法調(diào)用。

在一些情景中,你的轉(zhuǎn)換操作需要在流的末尾多發(fā)生一點(diǎn)點(diǎn)數(shù)據(jù)。例如,一個Zlib壓縮流會存儲一些內(nèi)部狀態(tài)以便它能優(yōu)化壓縮輸出。但是在最后,它需要盡可能好得處理這些留下的東西來使數(shù)據(jù)完整。

在這種情況中,您可以實(shí)現(xiàn)一個_flush方法,它會在最后被調(diào)用,在所有寫入數(shù)據(jù)被消費(fèi)、但在觸發(fā)end表示可讀端到達(dá)末尾之前。和_transform一樣,只需在寫入操作完成時(shí)適當(dāng)?shù)卣{(diào)用transform.push(chunk)零或多次。

這個函數(shù)有一個下劃線前綴,因?yàn)樗鼘τ陬愂莾?nèi)部的,并應(yīng)該直接被用戶的程序調(diào)用。你應(yīng)在你的拓展類里覆蓋這個方法。

Events: 'finish' 和 'end'

finishend事件分別來自于父類WritableReadable。finish事件在end()方法被調(diào)用以及所有的輸入被_transform方法處理后觸發(fā)。end事件在所有的在_flush方法的回調(diào)函數(shù)被調(diào)用后的數(shù)據(jù)被輸出后觸發(fā)。

Example: SimpleProtocol 解釋器 v2

上文中的簡單協(xié)議解釋器可以簡單地通過高級別的Transform流更好地實(shí)現(xiàn)。與上文例子中的parseHeaderSimpleProtocol v1相似。

在這個例子中,沒有從參數(shù)中提供輸入,然后將它導(dǎo)流至解釋器中,這更符合io.js的使用習(xí)慣。

var util = require('util');
var Transform = require('stream').Transform;
util.inherits(SimpleProtocol, Transform);

function SimpleProtocol(options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(options);

  Transform.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;
  this._rawHeader = [];
  this.header = null;
}

SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
  if (!this._inBody) {
    // check if the chunk has a \n\n
    var split = -1;
    for (var i = 0; i < chunk.length; i++) {
      if (chunk[i] === 10) { // '\n'
        if (this._sawFirstCr) {
          split = i;
          break;
        } else {
          this._sawFirstCr = true;
        }
      } else {
        this._sawFirstCr = false;
      }
    }

    if (split === -1) {
      // still waiting for the \n\n
      // stash the chunk, and try again.
      this._rawHeader.push(chunk);
    } else {
      this._inBody = true;
      var h = chunk.slice(0, split);
      this._rawHeader.push(h);
      var header = Buffer.concat(this._rawHeader).toString();
      try {
        this.header = JSON.parse(header);
      } catch (er) {
        this.emit('error', new Error('invalid simple protocol data'));
        return;
      }
      // and let them know that we are done parsing the header.
      this.emit('header', this.header);

      // now, because we got some extra data, emit this first.
      this.push(chunk.slice(split));
    }
  } else {
    // from there on, just provide the data to our consumer as-is.
    this.push(chunk);
  }
  done();
};

// Usage:
// var parser = new SimpleProtocol();
// source.pipe(parser)
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.

Class: stream.PassThrough

這是一個Transform流的實(shí)現(xiàn)。將輸入的流簡單地傳遞給輸出。它的主要目的是用來演示和測試,但它在某些需要構(gòu)建特殊流的情況下可能有用。

簡化的構(gòu)造器API

可以簡單的構(gòu)造流而不使用繼承。

這可以通過調(diào)用合適的方法作為構(gòu)造函數(shù)和參數(shù)來實(shí)現(xiàn):

例子:

Readable

var readable = new stream.Readable({
  read: function(n) {
    // sets this._read under the hood
  }
});

Writable

var writable = new stream.Writable({
  write: function(chunk, encoding, next) {
    // sets this._write under the hood
  }
});

// or

var writable = new stream.Writable({
  writev: function(chunks, next) {
    // sets this._writev under the hood
  }
});

Duplex

var duplex = new stream.Duplex({
  read: function(n) {
    // sets this._read under the hood
  },
  write: function(chunk, encoding, next) {
    // sets this._write under the hood
  }
});

// or

var duplex = new stream.Duplex({
  read: function(n) {
    // sets this._read under the hood
  },
  writev: function(chunks, next) {
    // sets this._writev under the hood
  }
});

Transform

var transform = new stream.Transform({
  transform: function(chunk, encoding, next) {
    // sets this._transform under the hood
  },
  flush: function(done) {
    // sets this._flush under the hood
  }
});

流:內(nèi)部細(xì)節(jié)

緩沖

Writable流和Readable流都會分別在一個內(nèi)部的叫_writableState.buffer_readableState.buffer的對象里緩沖數(shù)據(jù)。

潛在的被緩沖的數(shù)據(jù)量取決于被傳遞給構(gòu)造函數(shù)的highWaterMark參數(shù)。

Readable流中,當(dāng)其的實(shí)現(xiàn)調(diào)用stream.push(chunk)時(shí)就會發(fā)生緩沖。如果流的消費(fèi)者沒有調(diào)用stream.read(),那么數(shù)據(jù)就會保留在內(nèi)部隊(duì)列中直到它被消費(fèi)。

Writable流中,當(dāng)用戶重復(fù)調(diào)用stream.write(chunk)時(shí)就會發(fā)生緩沖,甚至是當(dāng)write()返回false時(shí)。

流,尤其是pipe()方法的初衷,是限制數(shù)據(jù)的滯留量在一個可接受的水平,這樣才使得不同傳輸速度的來源和目標(biāo)不會淹沒可用的內(nèi)存。

stream.read(0)

在一些情況下,你想不消費(fèi)任何數(shù)據(jù)而去觸發(fā)一次底層可讀流機(jī)制的刷新。你可以調(diào)用stream.read(0),它總是返回null。

如果內(nèi)部的讀緩沖量在highWaterMark之下,并且流沒有正在讀取,那么調(diào)用read(0)將會觸發(fā)一次低級別的_read調(diào)用。

幾乎永遠(yuǎn)沒有必須這么做。但是,你可能會在io.jsReadable流類的內(nèi)部代碼的幾處看到這個。

stream.push('')

推入一個0字節(jié)的字符串或Buffer(不處于對象模式)有一個有趣的副作用。因?yàn)檫@是一個stream.push()的調(diào)用,它將會結(jié)束讀取進(jìn)程。但是,它不添加任何數(shù)據(jù)到可讀緩沖中,所以沒有任何用戶可消費(fèi)的數(shù)據(jù)。

在極少的情況下,你當(dāng)下沒有數(shù)據(jù)可以提供,但你的消費(fèi)者同過調(diào)用stream.read(0)來得知合適再次檢查。在這樣的情況下,你可以調(diào)用stream.push('')

至今為止,這個功能的唯一使用之處是在tls.CryptoStream類中,它將在io.js的1.0版本中被廢棄。如果你發(fā)現(xiàn)你不得不使用stream.push(''),請考慮使用另外的方式。因?yàn)檫@幾乎表示發(fā)生了某些可怕的錯誤。

與舊版本的Node.js的兼容性

Node.js的0.10版本之前,可讀流接口非常簡單,并且功能和功用都不強(qiáng)。

  • data事件會立刻觸發(fā),而不是等待你調(diào)用read()方法。如果你需要進(jìn)行一些I/O操作來決定是否處理數(shù)據(jù),那么你只能將數(shù)據(jù)存儲在某些緩沖區(qū)中以防數(shù)據(jù)流失。
  • pause()僅供查詢,并不保證生效。這意味著你還是要準(zhǔn)備接收data事件在流已經(jīng)處于暫停模式中時(shí)。

io.js v1.0 和Node.js v0.10中,下文所述的Readable類添加進(jìn)來。為了向后兼容性,當(dāng)一個data事件的監(jiān)聽器被添加時(shí)或resume()方法被調(diào)用時(shí),可讀流切換至流動模式。其作用是,即便您不使用新的read()方法和readable事件,您也不必?fù)?dān)心丟失數(shù)據(jù)塊。

大多數(shù)程序都會保持功能正常,但是,以下有一些邊界情況:

  • 沒有添加任何data事件
  • 從未調(diào)用resume()方法
  • 流沒有被導(dǎo)流至任何可寫的目標(biāo)

例如,考慮以下代碼:

// WARNING!  BROKEN!
net.createServer(function(socket) {

  // we add an 'end' method, but never consume the data
  socket.on('end', function() {
    // It will never get here.
    socket.end('I got your message (but didnt read it)\n');
  });

}).listen(1337);

Node.js v0.10前,到來的信息數(shù)據(jù)會被簡單地丟棄

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號