Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. 'use strict'
  2. /* eslint-disable no-var */
  3. var reusify = require('reusify')
  4. function fastqueue (context, worker, _concurrency) {
  5. if (typeof context === 'function') {
  6. _concurrency = worker
  7. worker = context
  8. context = null
  9. }
  10. if (!(_concurrency >= 1)) {
  11. throw new Error('fastqueue concurrency must be equal to or greater than 1')
  12. }
  13. var cache = reusify(Task)
  14. var queueHead = null
  15. var queueTail = null
  16. var _running = 0
  17. var errorHandler = null
  18. var self = {
  19. push: push,
  20. drain: noop,
  21. saturated: noop,
  22. pause: pause,
  23. paused: false,
  24. get concurrency () {
  25. return _concurrency
  26. },
  27. set concurrency (value) {
  28. if (!(value >= 1)) {
  29. throw new Error('fastqueue concurrency must be equal to or greater than 1')
  30. }
  31. _concurrency = value
  32. if (self.paused) return
  33. for (; queueHead && _running < _concurrency;) {
  34. _running++
  35. release()
  36. }
  37. },
  38. running: running,
  39. resume: resume,
  40. idle: idle,
  41. length: length,
  42. getQueue: getQueue,
  43. unshift: unshift,
  44. empty: noop,
  45. kill: kill,
  46. killAndDrain: killAndDrain,
  47. error: error
  48. }
  49. return self
  50. function running () {
  51. return _running
  52. }
  53. function pause () {
  54. self.paused = true
  55. }
  56. function length () {
  57. var current = queueHead
  58. var counter = 0
  59. while (current) {
  60. current = current.next
  61. counter++
  62. }
  63. return counter
  64. }
  65. function getQueue () {
  66. var current = queueHead
  67. var tasks = []
  68. while (current) {
  69. tasks.push(current.value)
  70. current = current.next
  71. }
  72. return tasks
  73. }
  74. function resume () {
  75. if (!self.paused) return
  76. self.paused = false
  77. if (queueHead === null) {
  78. _running++
  79. release()
  80. return
  81. }
  82. for (; queueHead && _running < _concurrency;) {
  83. _running++
  84. release()
  85. }
  86. }
  87. function idle () {
  88. return _running === 0 && self.length() === 0
  89. }
  90. function push (value, done) {
  91. var current = cache.get()
  92. current.context = context
  93. current.release = release
  94. current.value = value
  95. current.callback = done || noop
  96. current.errorHandler = errorHandler
  97. if (_running >= _concurrency || self.paused) {
  98. if (queueTail) {
  99. queueTail.next = current
  100. queueTail = current
  101. } else {
  102. queueHead = current
  103. queueTail = current
  104. self.saturated()
  105. }
  106. } else {
  107. _running++
  108. worker.call(context, current.value, current.worked)
  109. }
  110. }
  111. function unshift (value, done) {
  112. var current = cache.get()
  113. current.context = context
  114. current.release = release
  115. current.value = value
  116. current.callback = done || noop
  117. current.errorHandler = errorHandler
  118. if (_running >= _concurrency || self.paused) {
  119. if (queueHead) {
  120. current.next = queueHead
  121. queueHead = current
  122. } else {
  123. queueHead = current
  124. queueTail = current
  125. self.saturated()
  126. }
  127. } else {
  128. _running++
  129. worker.call(context, current.value, current.worked)
  130. }
  131. }
  132. function release (holder) {
  133. if (holder) {
  134. cache.release(holder)
  135. }
  136. var next = queueHead
  137. if (next && _running <= _concurrency) {
  138. if (!self.paused) {
  139. if (queueTail === queueHead) {
  140. queueTail = null
  141. }
  142. queueHead = next.next
  143. next.next = null
  144. worker.call(context, next.value, next.worked)
  145. if (queueTail === null) {
  146. self.empty()
  147. }
  148. } else {
  149. _running--
  150. }
  151. } else if (--_running === 0) {
  152. self.drain()
  153. }
  154. }
  155. function kill () {
  156. queueHead = null
  157. queueTail = null
  158. self.drain = noop
  159. }
  160. function killAndDrain () {
  161. queueHead = null
  162. queueTail = null
  163. self.drain()
  164. self.drain = noop
  165. }
  166. function error (handler) {
  167. errorHandler = handler
  168. }
  169. }
  170. function noop () {}
  171. function Task () {
  172. this.value = null
  173. this.callback = noop
  174. this.next = null
  175. this.release = noop
  176. this.context = null
  177. this.errorHandler = null
  178. var self = this
  179. this.worked = function worked (err, result) {
  180. var callback = self.callback
  181. var errorHandler = self.errorHandler
  182. var val = self.value
  183. self.value = null
  184. self.callback = noop
  185. if (self.errorHandler) {
  186. errorHandler(err, val)
  187. }
  188. callback.call(self.context, err, result)
  189. self.release(self)
  190. }
  191. }
  192. function queueAsPromised (context, worker, _concurrency) {
  193. if (typeof context === 'function') {
  194. _concurrency = worker
  195. worker = context
  196. context = null
  197. }
  198. function asyncWrapper (arg, cb) {
  199. worker.call(this, arg)
  200. .then(function (res) {
  201. cb(null, res)
  202. }, cb)
  203. }
  204. var queue = fastqueue(context, asyncWrapper, _concurrency)
  205. var pushCb = queue.push
  206. var unshiftCb = queue.unshift
  207. queue.push = push
  208. queue.unshift = unshift
  209. queue.drained = drained
  210. return queue
  211. function push (value) {
  212. var p = new Promise(function (resolve, reject) {
  213. pushCb(value, function (err, result) {
  214. if (err) {
  215. reject(err)
  216. return
  217. }
  218. resolve(result)
  219. })
  220. })
  221. // Let's fork the promise chain to
  222. // make the error bubble up to the user but
  223. // not lead to a unhandledRejection
  224. p.catch(noop)
  225. return p
  226. }
  227. function unshift (value) {
  228. var p = new Promise(function (resolve, reject) {
  229. unshiftCb(value, function (err, result) {
  230. if (err) {
  231. reject(err)
  232. return
  233. }
  234. resolve(result)
  235. })
  236. })
  237. // Let's fork the promise chain to
  238. // make the error bubble up to the user but
  239. // not lead to a unhandledRejection
  240. p.catch(noop)
  241. return p
  242. }
  243. function drained () {
  244. if (queue.idle()) {
  245. return new Promise(function (resolve) {
  246. resolve()
  247. })
  248. }
  249. var previousDrain = queue.drain
  250. var p = new Promise(function (resolve) {
  251. queue.drain = function () {
  252. previousDrain()
  253. resolve()
  254. }
  255. })
  256. return p
  257. }
  258. }
  259. module.exports = fastqueue
  260. module.exports.promise = queueAsPromised