node中的Stream-Readable和Writeable解读

在node中,只要涉及到文件IO的场景一般都会涉及到一个类-Stream。Stream是对IO设备的抽象表示,其在JAVA中也有涉及,主要体现在四个类-InputStream、Reader、OutputStream、Writer,其中InputStream和OutputStream类针对字节数据进行读写;Reader和Writer针对字符数据读写。同时Java中有多种针对这四种类型的扩展类,如节点流、缓冲流和转换流等。比较而言,node中Stream类型也和Java中的类似,同样提供了支持字节和字符读写的Readable和Writeable类,也存在转换流Transform类,本文主要分析node中Readable和Writeable的实现机制,从底层的角度更好的理解Readable和Writeable实现机制,解读在读写过程中发生的一些重要事件。

Readable类

Readable对应于Java中的InputStream和Reader两个类,针对Readable设置encode编码可完成内部数据由Buffer到字符的转换。Readable Stream有两种模式,即flowing和paused模式。这两种模式对于用户而言区别在于是否需要手动调用Readable.prototype.read(n),读取缓冲区的数据。查询node API文档可知触发flowing模式有三种方式:

  1. 侦听data事件

  2. readable.resume()

  3. readable.pipe()
    而触发paused模式同样有几种方式:

  4. 移除data事件

  5. readable.pause()

  6. readable.unpipe()
    可能这样讲解大家仍不明白Readable Stream这两种模式的区别,那么下文从更深层次分析两种模式的机制。

深入Readable的实现

Readable继承EventEmitter,大家也都知道。但是相信大家应该不怎么熟悉Readable的实例属性_readableState。该属性是一个ReadableState类型的对象,保存了Readable实例的重要信息,如读取模式(是否为对象模式)、highWaterMask(缓冲区存放的最大字节数)、缓冲区、flowing模式等。在Readable的实现中,处处使用ReadableState对象记录当前读取状态,并设置缓冲区保证读操作的顺利进行。

首先需要针对Readable.prototype.read方法进行特别解读:

  if (n === 0 &&      state.needReadable &&      (state.length >= state.highWaterMark || state.ended)) {    debug('read: emitReadable', state.length, state.ended);    if (state.length === 0 && state.ended)      endReadable(this);    else      emitReadable(this);    return null;  }

当读入的数据为0时,执行emitReadable操作。这意味着,针对Readable Stream执行read(0)方法会触发readable事件,但是不会读当前缓冲区。因此使用read(0)可以完成一些比较巧妙的事情,如在readable处理函数中可以使用read(0)触发下一次readable事件,可选的操作读缓冲区。

继续分析代码,如果读入的数据并不是0,则计算读取缓冲区的具体字节数,

n = howMuchToRead(n, state);function howMuchToRead(n, state) {  if (state.length === 0 && state.ended)    return 0;  if (state.objectMode)    return n === 0 ? 0 : 1;  if (n === null || isNaN(n)) {    // only flow one buffer at a time    if (state.flowing && state.buffer.length)      return state.buffer[0].length;    // 若是paused状态,则读全部的缓冲区    else      return state.length;  }  if (n  state.highWaterMark)    state.highWaterMark = computeNewHighWaterMark(n);  // don't have that much.  return null, unless we've ended.  if (n > state.length) {    if (!state.ended) {      state.needReadable = true;      return 0;    } else {      return state.length;    }  }  return n;}

针对对象模式的读取,每次只读一个;对于处在flowing模式下的读取,每次只读缓冲区中第一个buffer的长度;在paused模式下则读取全部缓冲区的长度;若读取的字节数大于设置的缓冲区最大值,则适当扩大缓冲区的大小(默认为16k,最大为8m);若读取的长度大于当前缓冲区的大小,设置needReadable属性并准备数据等待下一次读取。

接下来,判断是否需要准备数据。在这里,依赖于needReadable的值,

var doRead = state.needReadable;  debug('need readable', doRead);  if (state.length === 0 || state.length - n  0)    ret = fromList(n, state);  else    ret = null;  if (ret === null) {    state.needReadable = true;    n = 0;  }  state.length -= n;  if (state.length === 0 && !state.ended)    state.needReadable = true;  if (nOrig !== n && state.ended && state.length === 0)    endReadable(this);  // flowing模式下的数据读取依赖于 read函数  // data事件触发的次数,依赖于howMuchToRead计算的次数  if (ret !== null)    this.emit('data', ret);

一旦在_read中更新了缓冲区,那么我们需要重新计算(消费者,即可写流)读取的字节数。fromList方法完成了读缓冲区的slice,如果是objectMode下的读,则只读缓冲区的第一个对象;针对未传参数的read方法而言,默认读取全部缓冲区等等。从读缓冲区读取完数据之后设置相关flag,如needReadable,最终,触发data事件,结束!

上节提到,设置data事件的执行函数会进入flowing模式的读,而上文看到正是read方法触发了data事件,而默认条件下Readable处于paused状态,因此在paused状态读取数据需要手动执行read函数,每次read读取完毕触发一次data事件。从这点看出,flowing和paused状态区别在于是否需要手动执行read()来获取数据。flowing状态下,我们无需执行read,仅需要设置data事件处理函数或者设定导流目标pipe;而在paused状态下,不仅仅是简单的执行read方法,因为读缓冲区的内容时刻在改变,一旦读缓冲区又有新数据,简单执行read()就没法满足需求(因为我们无法知道是否又有新数据到来),因此需要侦听读缓冲区的相关事件,即readable事件,在该事件处理函数中进行read相关数据。

那么,什么情况下会触发readable事件呢?在实现_read私有方法中,我们使用stream.push(chunk)或stream.unshift(chunk)方法注入数据到读缓冲区,那么push和unshift方法都实现了下面的逻辑,

if (state.flowing && state.length === 0 && !state.sync) {  stream.emit('data', chunk);  stream.read(0);} else {  // update the buffer info.  state.length += state.objectMode ? 1 : chunk.length;  if (addToFront)    state.buffer.unshift(chunk);  else    state.buffer.push(chunk);  if (state.needReadable)    emitReadable(stream);}function emitReadable(stream) {  var state = stream._readableState;  state.needReadable = false;  if (!state.emittedReadable) {    debug('emitReadable', state.flowing);    state.emittedReadable = true;    if (state.sync)      process.nextTick(emitReadable_, stream);    else      emitReadable_(stream);  }}function emitReadable_(stream) {  debug('emit readable');  stream.emit('readable');  flow(stream);}// 在flowing状态下,自动读取流(替代paused状态下手动read)function flow(stream) {  var state = stream._readableState;  debug('flow', state.flowing);  if (state.flowing) {    do {      var chunk = stream.read();    } while (null !== chunk && state.flowing);  }}

一旦处于flowing模式并且当前缓冲区没有数据,那么就立即将预处理的push(unshift)数据传递给data事件处理函数,并执行stream.read(0)。前文已经交代过,read(0)仅仅用来触发readable事件,并不读取缓冲区,这就是触发readable的第一种情况。

第二种则是第一种情况之外的所有情景,即根据操作(push、unshift)的不同将数据插入读缓冲区的不同位置。最后执行emitReadable函数,触发readable事件。针对emitReadable函数,它的作用就是异步触发readable事件,并执行flow函数。flow函数则针对flowing状态的Readable做自适应读取,免去了手动执行read函数和何时执行read函数的苦恼。

这样,对于Readable的实现者,一旦在_read函数插入有效数据到读缓冲区,都会触发readable事件,在paused状态下,设置readable事件处理函数并手动执行read函数,便可完成数据的读取;而在flowing状态下,通过设置data事件处理函数或者定义pipe目标流同样可以实现读取。

既然pipe同样可以触发Readable进入flowing状态,那么pipe方法具体做了什么呢?其实pipe针对Readable和Writeable做了限流,首先针对Readable的data事件进行侦听,并执行Writeable的write函数,当Writeable的写缓冲区大于一个临界值(highWaterMark),导致write函数返回false(此时意味着Writeable无法匹配Readable的速度,Writeable的写缓冲区已经满了),此时,pipe修改了Readable模式,执行pause方法,进入paused模式,停止读取读缓冲区。而同时Writeable开始刷新写缓冲区,刷新完毕后异步触发drain事件,在该事件处理函数中,设置Readable为flowing状态,并继续执行flow函数不停的刷新读缓冲区,这样就完成了pipe限流。需要注意的是,Readable和Writeable各自维护了一个缓冲区,在实现的上有区别:Readable的缓冲区是一个数组,存放Buffer、String和Object类型;而Writeable则是一个有向链表,依次存放需要写入的数据。

Writeable解读

Writeable对应Java的OutputStream和Writer类,实现字节和字符数据的写。与Readable类似,Writeable的实例对象同样维护了一个状态对象-WriteableState,记录了当前输出流的状态信息,如写缓冲区的最大值(hightWaterMark)、缓冲区(有向链表)和缓冲区长度等信息。在本节中,主要分析输出流的关键方法write和事件drain,并解析输出流的实现者需要实现的方法_writewrite的关系。

function write----------------------------if (state.ended)    writeAfterEnd(this, cb);  else if (validChunk(this, state, chunk, cb)) {    state.pendingcb++;    ret = writeOrBuffer(this, state, chunk, encoding, cb);  }  return ret;

在write方法中,判断写入数据的格式并执行writeOrBuffer函数,并返回执行结果,该返回值标示当前写缓冲区是否已满。真正执行写入逻辑的是writeOrBuffer函数,该函数的作用在于刷新或者更新写缓冲区,下面看看主要做了什么,

function writeOrBuffer(stream, state, chunk, encoding, cb) {  chunk = decodeChunk(state, chunk, encoding);  if (chunk instanceof Buffer)    encoding = 'buffer';  var len = state.objectMode ? 1 : chunk.length;  state.length += len;  // 如果缓存的长度大于highWaterMark,触发drain事件  var ret = state.length 1. 写缓冲区的数据1. 写完缓冲区的数据后,异步触发drain事件第一步,在clearBuffer函数中,就是取出写缓冲区(有向链表)的第一个WriteReq对象,执行doWrite函数,写入缓冲区的第一个数据;这样循环往复最终清空写缓冲区,重置一些标志位。第二步,异步执行afterWrite函数,触发drain事件,并判断是否写操作完毕触发“finish”事件。这里之所以强调异步触发drain事件,是因为**为了保证先获得write()返回值为false,给用户绑定drain处理函数的时隙,然后再触发drain事件。**至此,Writeable的重要流程已全部走通。可以看出来,在核心的write()中,**判断写缓冲区是否已满并返回改值,在适当条件下缓存数据或调用_write()写数据**,在Writeable实现者需要实现的**_write()**中,主要任务是**数据写入方向控制,完成最基本的任务**。## 总结对比Readable的read()和_read(),我总结了下这四个函数在“读写过程”中的执行顺序与关系,如下图所示:#node.js、state、readable、缓冲区#

版权声明

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处。如若内容有涉嫌抄袭侵权/违法违规/事实不符,请点击 举报 进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部