You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. 'use strict'
  2. const proc = typeof process === 'object' && process ? process : {
  3. stdout: null,
  4. stderr: null,
  5. }
  6. const EE = require('events')
  7. const Stream = require('stream')
  8. const SD = require('string_decoder').StringDecoder
  9. const EOF = Symbol('EOF')
  10. const MAYBE_EMIT_END = Symbol('maybeEmitEnd')
  11. const EMITTED_END = Symbol('emittedEnd')
  12. const EMITTING_END = Symbol('emittingEnd')
  13. const EMITTED_ERROR = Symbol('emittedError')
  14. const CLOSED = Symbol('closed')
  15. const READ = Symbol('read')
  16. const FLUSH = Symbol('flush')
  17. const FLUSHCHUNK = Symbol('flushChunk')
  18. const ENCODING = Symbol('encoding')
  19. const DECODER = Symbol('decoder')
  20. const FLOWING = Symbol('flowing')
  21. const PAUSED = Symbol('paused')
  22. const RESUME = Symbol('resume')
  23. const BUFFERLENGTH = Symbol('bufferLength')
  24. const BUFFERPUSH = Symbol('bufferPush')
  25. const BUFFERSHIFT = Symbol('bufferShift')
  26. const OBJECTMODE = Symbol('objectMode')
  27. const DESTROYED = Symbol('destroyed')
  28. const EMITDATA = Symbol('emitData')
  29. const EMITEND = Symbol('emitEnd')
  30. const EMITEND2 = Symbol('emitEnd2')
  31. const ASYNC = Symbol('async')
  32. const defer = fn => Promise.resolve().then(fn)
  33. // TODO remove when Node v8 support drops
  34. const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1'
  35. const ASYNCITERATOR = doIter && Symbol.asyncIterator
  36. || Symbol('asyncIterator not implemented')
  37. const ITERATOR = doIter && Symbol.iterator
  38. || Symbol('iterator not implemented')
  39. // events that mean 'the stream is over'
  40. // these are treated specially, and re-emitted
  41. // if they are listened for after emitting.
  42. const isEndish = ev =>
  43. ev === 'end' ||
  44. ev === 'finish' ||
  45. ev === 'prefinish'
  46. const isArrayBuffer = b => b instanceof ArrayBuffer ||
  47. typeof b === 'object' &&
  48. b.constructor &&
  49. b.constructor.name === 'ArrayBuffer' &&
  50. b.byteLength >= 0
  51. const isArrayBufferView = b => !Buffer.isBuffer(b) && ArrayBuffer.isView(b)
  52. class Pipe {
  53. constructor (src, dest, opts) {
  54. this.src = src
  55. this.dest = dest
  56. this.opts = opts
  57. this.ondrain = () => src[RESUME]()
  58. dest.on('drain', this.ondrain)
  59. }
  60. unpipe () {
  61. this.dest.removeListener('drain', this.ondrain)
  62. }
  63. // istanbul ignore next - only here for the prototype
  64. proxyErrors () {}
  65. end () {
  66. this.unpipe()
  67. if (this.opts.end)
  68. this.dest.end()
  69. }
  70. }
  71. class PipeProxyErrors extends Pipe {
  72. unpipe () {
  73. this.src.removeListener('error', this.proxyErrors)
  74. super.unpipe()
  75. }
  76. constructor (src, dest, opts) {
  77. super(src, dest, opts)
  78. this.proxyErrors = er => dest.emit('error', er)
  79. src.on('error', this.proxyErrors)
  80. }
  81. }
  82. module.exports = class Minipass extends Stream {
  83. constructor (options) {
  84. super()
  85. this[FLOWING] = false
  86. // whether we're explicitly paused
  87. this[PAUSED] = false
  88. this.pipes = []
  89. this.buffer = []
  90. this[OBJECTMODE] = options && options.objectMode || false
  91. if (this[OBJECTMODE])
  92. this[ENCODING] = null
  93. else
  94. this[ENCODING] = options && options.encoding || null
  95. if (this[ENCODING] === 'buffer')
  96. this[ENCODING] = null
  97. this[ASYNC] = options && !!options.async || false
  98. this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null
  99. this[EOF] = false
  100. this[EMITTED_END] = false
  101. this[EMITTING_END] = false
  102. this[CLOSED] = false
  103. this[EMITTED_ERROR] = null
  104. this.writable = true
  105. this.readable = true
  106. this[BUFFERLENGTH] = 0
  107. this[DESTROYED] = false
  108. }
  109. get bufferLength () { return this[BUFFERLENGTH] }
  110. get encoding () { return this[ENCODING] }
  111. set encoding (enc) {
  112. if (this[OBJECTMODE])
  113. throw new Error('cannot set encoding in objectMode')
  114. if (this[ENCODING] && enc !== this[ENCODING] &&
  115. (this[DECODER] && this[DECODER].lastNeed || this[BUFFERLENGTH]))
  116. throw new Error('cannot change encoding')
  117. if (this[ENCODING] !== enc) {
  118. this[DECODER] = enc ? new SD(enc) : null
  119. if (this.buffer.length)
  120. this.buffer = this.buffer.map(chunk => this[DECODER].write(chunk))
  121. }
  122. this[ENCODING] = enc
  123. }
  124. setEncoding (enc) {
  125. this.encoding = enc
  126. }
  127. get objectMode () { return this[OBJECTMODE] }
  128. set objectMode (om) { this[OBJECTMODE] = this[OBJECTMODE] || !!om }
  129. get ['async'] () { return this[ASYNC] }
  130. set ['async'] (a) { this[ASYNC] = this[ASYNC] || !!a }
  131. write (chunk, encoding, cb) {
  132. if (this[EOF])
  133. throw new Error('write after end')
  134. if (this[DESTROYED]) {
  135. this.emit('error', Object.assign(
  136. new Error('Cannot call write after a stream was destroyed'),
  137. { code: 'ERR_STREAM_DESTROYED' }
  138. ))
  139. return true
  140. }
  141. if (typeof encoding === 'function')
  142. cb = encoding, encoding = 'utf8'
  143. if (!encoding)
  144. encoding = 'utf8'
  145. const fn = this[ASYNC] ? defer : f => f()
  146. // convert array buffers and typed array views into buffers
  147. // at some point in the future, we may want to do the opposite!
  148. // leave strings and buffers as-is
  149. // anything else switches us into object mode
  150. if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) {
  151. if (isArrayBufferView(chunk))
  152. chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
  153. else if (isArrayBuffer(chunk))
  154. chunk = Buffer.from(chunk)
  155. else if (typeof chunk !== 'string')
  156. // use the setter so we throw if we have encoding set
  157. this.objectMode = true
  158. }
  159. // handle object mode up front, since it's simpler
  160. // this yields better performance, fewer checks later.
  161. if (this[OBJECTMODE]) {
  162. /* istanbul ignore if - maybe impossible? */
  163. if (this.flowing && this[BUFFERLENGTH] !== 0)
  164. this[FLUSH](true)
  165. if (this.flowing)
  166. this.emit('data', chunk)
  167. else
  168. this[BUFFERPUSH](chunk)
  169. if (this[BUFFERLENGTH] !== 0)
  170. this.emit('readable')
  171. if (cb)
  172. fn(cb)
  173. return this.flowing
  174. }
  175. // at this point the chunk is a buffer or string
  176. // don't buffer it up or send it to the decoder
  177. if (!chunk.length) {
  178. if (this[BUFFERLENGTH] !== 0)
  179. this.emit('readable')
  180. if (cb)
  181. fn(cb)
  182. return this.flowing
  183. }
  184. // fast-path writing strings of same encoding to a stream with
  185. // an empty buffer, skipping the buffer/decoder dance
  186. if (typeof chunk === 'string' &&
  187. // unless it is a string already ready for us to use
  188. !(encoding === this[ENCODING] && !this[DECODER].lastNeed)) {
  189. chunk = Buffer.from(chunk, encoding)
  190. }
  191. if (Buffer.isBuffer(chunk) && this[ENCODING])
  192. chunk = this[DECODER].write(chunk)
  193. // Note: flushing CAN potentially switch us into not-flowing mode
  194. if (this.flowing && this[BUFFERLENGTH] !== 0)
  195. this[FLUSH](true)
  196. if (this.flowing)
  197. this.emit('data', chunk)
  198. else
  199. this[BUFFERPUSH](chunk)
  200. if (this[BUFFERLENGTH] !== 0)
  201. this.emit('readable')
  202. if (cb)
  203. fn(cb)
  204. return this.flowing
  205. }
  206. read (n) {
  207. if (this[DESTROYED])
  208. return null
  209. if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH]) {
  210. this[MAYBE_EMIT_END]()
  211. return null
  212. }
  213. if (this[OBJECTMODE])
  214. n = null
  215. if (this.buffer.length > 1 && !this[OBJECTMODE]) {
  216. if (this.encoding)
  217. this.buffer = [this.buffer.join('')]
  218. else
  219. this.buffer = [Buffer.concat(this.buffer, this[BUFFERLENGTH])]
  220. }
  221. const ret = this[READ](n || null, this.buffer[0])
  222. this[MAYBE_EMIT_END]()
  223. return ret
  224. }
  225. [READ] (n, chunk) {
  226. if (n === chunk.length || n === null)
  227. this[BUFFERSHIFT]()
  228. else {
  229. this.buffer[0] = chunk.slice(n)
  230. chunk = chunk.slice(0, n)
  231. this[BUFFERLENGTH] -= n
  232. }
  233. this.emit('data', chunk)
  234. if (!this.buffer.length && !this[EOF])
  235. this.emit('drain')
  236. return chunk
  237. }
  238. end (chunk, encoding, cb) {
  239. if (typeof chunk === 'function')
  240. cb = chunk, chunk = null
  241. if (typeof encoding === 'function')
  242. cb = encoding, encoding = 'utf8'
  243. if (chunk)
  244. this.write(chunk, encoding)
  245. if (cb)
  246. this.once('end', cb)
  247. this[EOF] = true
  248. this.writable = false
  249. // if we haven't written anything, then go ahead and emit,
  250. // even if we're not reading.
  251. // we'll re-emit if a new 'end' listener is added anyway.
  252. // This makes MP more suitable to write-only use cases.
  253. if (this.flowing || !this[PAUSED])
  254. this[MAYBE_EMIT_END]()
  255. return this
  256. }
  257. // don't let the internal resume be overwritten
  258. [RESUME] () {
  259. if (this[DESTROYED])
  260. return
  261. this[PAUSED] = false
  262. this[FLOWING] = true
  263. this.emit('resume')
  264. if (this.buffer.length)
  265. this[FLUSH]()
  266. else if (this[EOF])
  267. this[MAYBE_EMIT_END]()
  268. else
  269. this.emit('drain')
  270. }
  271. resume () {
  272. return this[RESUME]()
  273. }
  274. pause () {
  275. this[FLOWING] = false
  276. this[PAUSED] = true
  277. }
  278. get destroyed () {
  279. return this[DESTROYED]
  280. }
  281. get flowing () {
  282. return this[FLOWING]
  283. }
  284. get paused () {
  285. return this[PAUSED]
  286. }
  287. [BUFFERPUSH] (chunk) {
  288. if (this[OBJECTMODE])
  289. this[BUFFERLENGTH] += 1
  290. else
  291. this[BUFFERLENGTH] += chunk.length
  292. this.buffer.push(chunk)
  293. }
  294. [BUFFERSHIFT] () {
  295. if (this.buffer.length) {
  296. if (this[OBJECTMODE])
  297. this[BUFFERLENGTH] -= 1
  298. else
  299. this[BUFFERLENGTH] -= this.buffer[0].length
  300. }
  301. return this.buffer.shift()
  302. }
  303. [FLUSH] (noDrain) {
  304. do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]()))
  305. if (!noDrain && !this.buffer.length && !this[EOF])
  306. this.emit('drain')
  307. }
  308. [FLUSHCHUNK] (chunk) {
  309. return chunk ? (this.emit('data', chunk), this.flowing) : false
  310. }
  311. pipe (dest, opts) {
  312. if (this[DESTROYED])
  313. return
  314. const ended = this[EMITTED_END]
  315. opts = opts || {}
  316. if (dest === proc.stdout || dest === proc.stderr)
  317. opts.end = false
  318. else
  319. opts.end = opts.end !== false
  320. opts.proxyErrors = !!opts.proxyErrors
  321. // piping an ended stream ends immediately
  322. if (ended) {
  323. if (opts.end)
  324. dest.end()
  325. } else {
  326. this.pipes.push(!opts.proxyErrors ? new Pipe(this, dest, opts)
  327. : new PipeProxyErrors(this, dest, opts))
  328. if (this[ASYNC])
  329. defer(() => this[RESUME]())
  330. else
  331. this[RESUME]()
  332. }
  333. return dest
  334. }
  335. unpipe (dest) {
  336. const p = this.pipes.find(p => p.dest === dest)
  337. if (p) {
  338. this.pipes.splice(this.pipes.indexOf(p), 1)
  339. p.unpipe()
  340. }
  341. }
  342. addListener (ev, fn) {
  343. return this.on(ev, fn)
  344. }
  345. on (ev, fn) {
  346. const ret = super.on(ev, fn)
  347. if (ev === 'data' && !this.pipes.length && !this.flowing)
  348. this[RESUME]()
  349. else if (ev === 'readable' && this[BUFFERLENGTH] !== 0)
  350. super.emit('readable')
  351. else if (isEndish(ev) && this[EMITTED_END]) {
  352. super.emit(ev)
  353. this.removeAllListeners(ev)
  354. } else if (ev === 'error' && this[EMITTED_ERROR]) {
  355. if (this[ASYNC])
  356. defer(() => fn.call(this, this[EMITTED_ERROR]))
  357. else
  358. fn.call(this, this[EMITTED_ERROR])
  359. }
  360. return ret
  361. }
  362. get emittedEnd () {
  363. return this[EMITTED_END]
  364. }
  365. [MAYBE_EMIT_END] () {
  366. if (!this[EMITTING_END] &&
  367. !this[EMITTED_END] &&
  368. !this[DESTROYED] &&
  369. this.buffer.length === 0 &&
  370. this[EOF]) {
  371. this[EMITTING_END] = true
  372. this.emit('end')
  373. this.emit('prefinish')
  374. this.emit('finish')
  375. if (this[CLOSED])
  376. this.emit('close')
  377. this[EMITTING_END] = false
  378. }
  379. }
  380. emit (ev, data, ...extra) {
  381. // error and close are only events allowed after calling destroy()
  382. if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])
  383. return
  384. else if (ev === 'data') {
  385. return !data ? false
  386. : this[ASYNC] ? defer(() => this[EMITDATA](data))
  387. : this[EMITDATA](data)
  388. } else if (ev === 'end') {
  389. return this[EMITEND]()
  390. } else if (ev === 'close') {
  391. this[CLOSED] = true
  392. // don't emit close before 'end' and 'finish'
  393. if (!this[EMITTED_END] && !this[DESTROYED])
  394. return
  395. const ret = super.emit('close')
  396. this.removeAllListeners('close')
  397. return ret
  398. } else if (ev === 'error') {
  399. this[EMITTED_ERROR] = data
  400. const ret = super.emit('error', data)
  401. this[MAYBE_EMIT_END]()
  402. return ret
  403. } else if (ev === 'resume') {
  404. const ret = super.emit('resume')
  405. this[MAYBE_EMIT_END]()
  406. return ret
  407. } else if (ev === 'finish' || ev === 'prefinish') {
  408. const ret = super.emit(ev)
  409. this.removeAllListeners(ev)
  410. return ret
  411. }
  412. // Some other unknown event
  413. const ret = super.emit(ev, data, ...extra)
  414. this[MAYBE_EMIT_END]()
  415. return ret
  416. }
  417. [EMITDATA] (data) {
  418. for (const p of this.pipes) {
  419. if (p.dest.write(data) === false)
  420. this.pause()
  421. }
  422. const ret = super.emit('data', data)
  423. this[MAYBE_EMIT_END]()
  424. return ret
  425. }
  426. [EMITEND] () {
  427. if (this[EMITTED_END])
  428. return
  429. this[EMITTED_END] = true
  430. this.readable = false
  431. if (this[ASYNC])
  432. defer(() => this[EMITEND2]())
  433. else
  434. this[EMITEND2]()
  435. }
  436. [EMITEND2] () {
  437. if (this[DECODER]) {
  438. const data = this[DECODER].end()
  439. if (data) {
  440. for (const p of this.pipes) {
  441. p.dest.write(data)
  442. }
  443. super.emit('data', data)
  444. }
  445. }
  446. for (const p of this.pipes) {
  447. p.end()
  448. }
  449. const ret = super.emit('end')
  450. this.removeAllListeners('end')
  451. return ret
  452. }
  453. // const all = await stream.collect()
  454. collect () {
  455. const buf = []
  456. if (!this[OBJECTMODE])
  457. buf.dataLength = 0
  458. // set the promise first, in case an error is raised
  459. // by triggering the flow here.
  460. const p = this.promise()
  461. this.on('data', c => {
  462. buf.push(c)
  463. if (!this[OBJECTMODE])
  464. buf.dataLength += c.length
  465. })
  466. return p.then(() => buf)
  467. }
  468. // const data = await stream.concat()
  469. concat () {
  470. return this[OBJECTMODE]
  471. ? Promise.reject(new Error('cannot concat in objectMode'))
  472. : this.collect().then(buf =>
  473. this[OBJECTMODE]
  474. ? Promise.reject(new Error('cannot concat in objectMode'))
  475. : this[ENCODING] ? buf.join('') : Buffer.concat(buf, buf.dataLength))
  476. }
  477. // stream.promise().then(() => done, er => emitted error)
  478. promise () {
  479. return new Promise((resolve, reject) => {
  480. this.on(DESTROYED, () => reject(new Error('stream destroyed')))
  481. this.on('error', er => reject(er))
  482. this.on('end', () => resolve())
  483. })
  484. }
  485. // for await (let chunk of stream)
  486. [ASYNCITERATOR] () {
  487. const next = () => {
  488. const res = this.read()
  489. if (res !== null)
  490. return Promise.resolve({ done: false, value: res })
  491. if (this[EOF])
  492. return Promise.resolve({ done: true })
  493. let resolve = null
  494. let reject = null
  495. const onerr = er => {
  496. this.removeListener('data', ondata)
  497. this.removeListener('end', onend)
  498. reject(er)
  499. }
  500. const ondata = value => {
  501. this.removeListener('error', onerr)
  502. this.removeListener('end', onend)
  503. this.pause()
  504. resolve({ value: value, done: !!this[EOF] })
  505. }
  506. const onend = () => {
  507. this.removeListener('error', onerr)
  508. this.removeListener('data', ondata)
  509. resolve({ done: true })
  510. }
  511. const ondestroy = () => onerr(new Error('stream destroyed'))
  512. return new Promise((res, rej) => {
  513. reject = rej
  514. resolve = res
  515. this.once(DESTROYED, ondestroy)
  516. this.once('error', onerr)
  517. this.once('end', onend)
  518. this.once('data', ondata)
  519. })
  520. }
  521. return { next }
  522. }
  523. // for (let chunk of stream)
  524. [ITERATOR] () {
  525. const next = () => {
  526. const value = this.read()
  527. const done = value === null
  528. return { value, done }
  529. }
  530. return { next }
  531. }
  532. destroy (er) {
  533. if (this[DESTROYED]) {
  534. if (er)
  535. this.emit('error', er)
  536. else
  537. this.emit(DESTROYED)
  538. return this
  539. }
  540. this[DESTROYED] = true
  541. // throw away all buffered data, it's never coming out
  542. this.buffer.length = 0
  543. this[BUFFERLENGTH] = 0
  544. if (typeof this.close === 'function' && !this[CLOSED])
  545. this.close()
  546. if (er)
  547. this.emit('error', er)
  548. else // if no error to emit, still reject pending promises
  549. this.emit(DESTROYED)
  550. return this
  551. }
  552. static isStream (s) {
  553. return !!s && (s instanceof Minipass || s instanceof Stream ||
  554. s instanceof EE && (
  555. typeof s.pipe === 'function' || // readable
  556. (typeof s.write === 'function' && typeof s.end === 'function') // writable
  557. ))
  558. }
  559. }