You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. 'use strict'
  2. const MiniPass = require('minipass')
  3. const EE = require('events').EventEmitter
  4. const fs = require('fs')
  5. let writev = fs.writev
  6. /* istanbul ignore next */
  7. if (!writev) {
  8. // This entire block can be removed if support for earlier than Node.js
  9. // 12.9.0 is not needed.
  10. const binding = process.binding('fs')
  11. const FSReqWrap = binding.FSReqWrap || binding.FSReqCallback
  12. writev = (fd, iovec, pos, cb) => {
  13. const done = (er, bw) => cb(er, bw, iovec)
  14. const req = new FSReqWrap()
  15. req.oncomplete = done
  16. binding.writeBuffers(fd, iovec, pos, req)
  17. }
  18. }
  19. const _autoClose = Symbol('_autoClose')
  20. const _close = Symbol('_close')
  21. const _ended = Symbol('_ended')
  22. const _fd = Symbol('_fd')
  23. const _finished = Symbol('_finished')
  24. const _flags = Symbol('_flags')
  25. const _flush = Symbol('_flush')
  26. const _handleChunk = Symbol('_handleChunk')
  27. const _makeBuf = Symbol('_makeBuf')
  28. const _mode = Symbol('_mode')
  29. const _needDrain = Symbol('_needDrain')
  30. const _onerror = Symbol('_onerror')
  31. const _onopen = Symbol('_onopen')
  32. const _onread = Symbol('_onread')
  33. const _onwrite = Symbol('_onwrite')
  34. const _open = Symbol('_open')
  35. const _path = Symbol('_path')
  36. const _pos = Symbol('_pos')
  37. const _queue = Symbol('_queue')
  38. const _read = Symbol('_read')
  39. const _readSize = Symbol('_readSize')
  40. const _reading = Symbol('_reading')
  41. const _remain = Symbol('_remain')
  42. const _size = Symbol('_size')
  43. const _write = Symbol('_write')
  44. const _writing = Symbol('_writing')
  45. const _defaultFlag = Symbol('_defaultFlag')
  46. const _errored = Symbol('_errored')
  47. class ReadStream extends MiniPass {
  48. constructor (path, opt) {
  49. opt = opt || {}
  50. super(opt)
  51. this.readable = true
  52. this.writable = false
  53. if (typeof path !== 'string')
  54. throw new TypeError('path must be a string')
  55. this[_errored] = false
  56. this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
  57. this[_path] = path
  58. this[_readSize] = opt.readSize || 16*1024*1024
  59. this[_reading] = false
  60. this[_size] = typeof opt.size === 'number' ? opt.size : Infinity
  61. this[_remain] = this[_size]
  62. this[_autoClose] = typeof opt.autoClose === 'boolean' ?
  63. opt.autoClose : true
  64. if (typeof this[_fd] === 'number')
  65. this[_read]()
  66. else
  67. this[_open]()
  68. }
  69. get fd () { return this[_fd] }
  70. get path () { return this[_path] }
  71. write () {
  72. throw new TypeError('this is a readable stream')
  73. }
  74. end () {
  75. throw new TypeError('this is a readable stream')
  76. }
  77. [_open] () {
  78. fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd))
  79. }
  80. [_onopen] (er, fd) {
  81. if (er)
  82. this[_onerror](er)
  83. else {
  84. this[_fd] = fd
  85. this.emit('open', fd)
  86. this[_read]()
  87. }
  88. }
  89. [_makeBuf] () {
  90. return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain]))
  91. }
  92. [_read] () {
  93. if (!this[_reading]) {
  94. this[_reading] = true
  95. const buf = this[_makeBuf]()
  96. /* istanbul ignore if */
  97. if (buf.length === 0)
  98. return process.nextTick(() => this[_onread](null, 0, buf))
  99. fs.read(this[_fd], buf, 0, buf.length, null, (er, br, buf) =>
  100. this[_onread](er, br, buf))
  101. }
  102. }
  103. [_onread] (er, br, buf) {
  104. this[_reading] = false
  105. if (er)
  106. this[_onerror](er)
  107. else if (this[_handleChunk](br, buf))
  108. this[_read]()
  109. }
  110. [_close] () {
  111. if (this[_autoClose] && typeof this[_fd] === 'number') {
  112. const fd = this[_fd]
  113. this[_fd] = null
  114. fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))
  115. }
  116. }
  117. [_onerror] (er) {
  118. this[_reading] = true
  119. this[_close]()
  120. this.emit('error', er)
  121. }
  122. [_handleChunk] (br, buf) {
  123. let ret = false
  124. // no effect if infinite
  125. this[_remain] -= br
  126. if (br > 0)
  127. ret = super.write(br < buf.length ? buf.slice(0, br) : buf)
  128. if (br === 0 || this[_remain] <= 0) {
  129. ret = false
  130. this[_close]()
  131. super.end()
  132. }
  133. return ret
  134. }
  135. emit (ev, data) {
  136. switch (ev) {
  137. case 'prefinish':
  138. case 'finish':
  139. break
  140. case 'drain':
  141. if (typeof this[_fd] === 'number')
  142. this[_read]()
  143. break
  144. case 'error':
  145. if (this[_errored])
  146. return
  147. this[_errored] = true
  148. return super.emit(ev, data)
  149. default:
  150. return super.emit(ev, data)
  151. }
  152. }
  153. }
  154. class ReadStreamSync extends ReadStream {
  155. [_open] () {
  156. let threw = true
  157. try {
  158. this[_onopen](null, fs.openSync(this[_path], 'r'))
  159. threw = false
  160. } finally {
  161. if (threw)
  162. this[_close]()
  163. }
  164. }
  165. [_read] () {
  166. let threw = true
  167. try {
  168. if (!this[_reading]) {
  169. this[_reading] = true
  170. do {
  171. const buf = this[_makeBuf]()
  172. /* istanbul ignore next */
  173. const br = buf.length === 0 ? 0
  174. : fs.readSync(this[_fd], buf, 0, buf.length, null)
  175. if (!this[_handleChunk](br, buf))
  176. break
  177. } while (true)
  178. this[_reading] = false
  179. }
  180. threw = false
  181. } finally {
  182. if (threw)
  183. this[_close]()
  184. }
  185. }
  186. [_close] () {
  187. if (this[_autoClose] && typeof this[_fd] === 'number') {
  188. const fd = this[_fd]
  189. this[_fd] = null
  190. fs.closeSync(fd)
  191. this.emit('close')
  192. }
  193. }
  194. }
  195. class WriteStream extends EE {
  196. constructor (path, opt) {
  197. opt = opt || {}
  198. super(opt)
  199. this.readable = false
  200. this.writable = true
  201. this[_errored] = false
  202. this[_writing] = false
  203. this[_ended] = false
  204. this[_needDrain] = false
  205. this[_queue] = []
  206. this[_path] = path
  207. this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
  208. this[_mode] = opt.mode === undefined ? 0o666 : opt.mode
  209. this[_pos] = typeof opt.start === 'number' ? opt.start : null
  210. this[_autoClose] = typeof opt.autoClose === 'boolean' ?
  211. opt.autoClose : true
  212. // truncating makes no sense when writing into the middle
  213. const defaultFlag = this[_pos] !== null ? 'r+' : 'w'
  214. this[_defaultFlag] = opt.flags === undefined
  215. this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags
  216. if (this[_fd] === null)
  217. this[_open]()
  218. }
  219. emit (ev, data) {
  220. if (ev === 'error') {
  221. if (this[_errored])
  222. return
  223. this[_errored] = true
  224. }
  225. return super.emit(ev, data)
  226. }
  227. get fd () { return this[_fd] }
  228. get path () { return this[_path] }
  229. [_onerror] (er) {
  230. this[_close]()
  231. this[_writing] = true
  232. this.emit('error', er)
  233. }
  234. [_open] () {
  235. fs.open(this[_path], this[_flags], this[_mode],
  236. (er, fd) => this[_onopen](er, fd))
  237. }
  238. [_onopen] (er, fd) {
  239. if (this[_defaultFlag] &&
  240. this[_flags] === 'r+' &&
  241. er && er.code === 'ENOENT') {
  242. this[_flags] = 'w'
  243. this[_open]()
  244. } else if (er)
  245. this[_onerror](er)
  246. else {
  247. this[_fd] = fd
  248. this.emit('open', fd)
  249. this[_flush]()
  250. }
  251. }
  252. end (buf, enc) {
  253. if (buf)
  254. this.write(buf, enc)
  255. this[_ended] = true
  256. // synthetic after-write logic, where drain/finish live
  257. if (!this[_writing] && !this[_queue].length &&
  258. typeof this[_fd] === 'number')
  259. this[_onwrite](null, 0)
  260. return this
  261. }
  262. write (buf, enc) {
  263. if (typeof buf === 'string')
  264. buf = Buffer.from(buf, enc)
  265. if (this[_ended]) {
  266. this.emit('error', new Error('write() after end()'))
  267. return false
  268. }
  269. if (this[_fd] === null || this[_writing] || this[_queue].length) {
  270. this[_queue].push(buf)
  271. this[_needDrain] = true
  272. return false
  273. }
  274. this[_writing] = true
  275. this[_write](buf)
  276. return true
  277. }
  278. [_write] (buf) {
  279. fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) =>
  280. this[_onwrite](er, bw))
  281. }
  282. [_onwrite] (er, bw) {
  283. if (er)
  284. this[_onerror](er)
  285. else {
  286. if (this[_pos] !== null)
  287. this[_pos] += bw
  288. if (this[_queue].length)
  289. this[_flush]()
  290. else {
  291. this[_writing] = false
  292. if (this[_ended] && !this[_finished]) {
  293. this[_finished] = true
  294. this[_close]()
  295. this.emit('finish')
  296. } else if (this[_needDrain]) {
  297. this[_needDrain] = false
  298. this.emit('drain')
  299. }
  300. }
  301. }
  302. }
  303. [_flush] () {
  304. if (this[_queue].length === 0) {
  305. if (this[_ended])
  306. this[_onwrite](null, 0)
  307. } else if (this[_queue].length === 1)
  308. this[_write](this[_queue].pop())
  309. else {
  310. const iovec = this[_queue]
  311. this[_queue] = []
  312. writev(this[_fd], iovec, this[_pos],
  313. (er, bw) => this[_onwrite](er, bw))
  314. }
  315. }
  316. [_close] () {
  317. if (this[_autoClose] && typeof this[_fd] === 'number') {
  318. const fd = this[_fd]
  319. this[_fd] = null
  320. fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))
  321. }
  322. }
  323. }
  324. class WriteStreamSync extends WriteStream {
  325. [_open] () {
  326. let fd
  327. // only wrap in a try{} block if we know we'll retry, to avoid
  328. // the rethrow obscuring the error's source frame in most cases.
  329. if (this[_defaultFlag] && this[_flags] === 'r+') {
  330. try {
  331. fd = fs.openSync(this[_path], this[_flags], this[_mode])
  332. } catch (er) {
  333. if (er.code === 'ENOENT') {
  334. this[_flags] = 'w'
  335. return this[_open]()
  336. } else
  337. throw er
  338. }
  339. } else
  340. fd = fs.openSync(this[_path], this[_flags], this[_mode])
  341. this[_onopen](null, fd)
  342. }
  343. [_close] () {
  344. if (this[_autoClose] && typeof this[_fd] === 'number') {
  345. const fd = this[_fd]
  346. this[_fd] = null
  347. fs.closeSync(fd)
  348. this.emit('close')
  349. }
  350. }
  351. [_write] (buf) {
  352. // throw the original, but try to close if it fails
  353. let threw = true
  354. try {
  355. this[_onwrite](null,
  356. fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos]))
  357. threw = false
  358. } finally {
  359. if (threw)
  360. try { this[_close]() } catch (_) {}
  361. }
  362. }
  363. }
  364. exports.ReadStream = ReadStream
  365. exports.ReadStreamSync = ReadStreamSync
  366. exports.WriteStream = WriteStream
  367. exports.WriteStreamSync = WriteStreamSync