123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- 'use strict'
-
- /* eslint-disable no-var */
-
- var reusify = require('reusify')
-
- function fastqueue (context, worker, _concurrency) {
- if (typeof context === 'function') {
- _concurrency = worker
- worker = context
- context = null
- }
-
- if (!(_concurrency >= 1)) {
- throw new Error('fastqueue concurrency must be equal to or greater than 1')
- }
-
- var cache = reusify(Task)
- var queueHead = null
- var queueTail = null
- var _running = 0
- var errorHandler = null
-
- var self = {
- push: push,
- drain: noop,
- saturated: noop,
- pause: pause,
- paused: false,
-
- get concurrency () {
- return _concurrency
- },
- set concurrency (value) {
- if (!(value >= 1)) {
- throw new Error('fastqueue concurrency must be equal to or greater than 1')
- }
- _concurrency = value
-
- if (self.paused) return
- for (; queueHead && _running < _concurrency;) {
- _running++
- release()
- }
- },
-
- running: running,
- resume: resume,
- idle: idle,
- length: length,
- getQueue: getQueue,
- unshift: unshift,
- empty: noop,
- kill: kill,
- killAndDrain: killAndDrain,
- error: error
- }
-
- return self
-
- function running () {
- return _running
- }
-
- function pause () {
- self.paused = true
- }
-
- function length () {
- var current = queueHead
- var counter = 0
-
- while (current) {
- current = current.next
- counter++
- }
-
- return counter
- }
-
- function getQueue () {
- var current = queueHead
- var tasks = []
-
- while (current) {
- tasks.push(current.value)
- current = current.next
- }
-
- return tasks
- }
-
- function resume () {
- if (!self.paused) return
- self.paused = false
- if (queueHead === null) {
- _running++
- release()
- return
- }
- for (; queueHead && _running < _concurrency;) {
- _running++
- release()
- }
- }
-
- function idle () {
- return _running === 0 && self.length() === 0
- }
-
- function push (value, done) {
- var current = cache.get()
-
- current.context = context
- current.release = release
- current.value = value
- current.callback = done || noop
- current.errorHandler = errorHandler
-
- if (_running >= _concurrency || self.paused) {
- if (queueTail) {
- queueTail.next = current
- queueTail = current
- } else {
- queueHead = current
- queueTail = current
- self.saturated()
- }
- } else {
- _running++
- worker.call(context, current.value, current.worked)
- }
- }
-
- function unshift (value, done) {
- var current = cache.get()
-
- current.context = context
- current.release = release
- current.value = value
- current.callback = done || noop
- current.errorHandler = errorHandler
-
- if (_running >= _concurrency || self.paused) {
- if (queueHead) {
- current.next = queueHead
- queueHead = current
- } else {
- queueHead = current
- queueTail = current
- self.saturated()
- }
- } else {
- _running++
- worker.call(context, current.value, current.worked)
- }
- }
-
- function release (holder) {
- if (holder) {
- cache.release(holder)
- }
- var next = queueHead
- if (next && _running <= _concurrency) {
- if (!self.paused) {
- if (queueTail === queueHead) {
- queueTail = null
- }
- queueHead = next.next
- next.next = null
- worker.call(context, next.value, next.worked)
- if (queueTail === null) {
- self.empty()
- }
- } else {
- _running--
- }
- } else if (--_running === 0) {
- self.drain()
- }
- }
-
- function kill () {
- queueHead = null
- queueTail = null
- self.drain = noop
- }
-
- function killAndDrain () {
- queueHead = null
- queueTail = null
- self.drain()
- self.drain = noop
- }
-
- function error (handler) {
- errorHandler = handler
- }
- }
-
- function noop () {}
-
- function Task () {
- this.value = null
- this.callback = noop
- this.next = null
- this.release = noop
- this.context = null
- this.errorHandler = null
-
- var self = this
-
- this.worked = function worked (err, result) {
- var callback = self.callback
- var errorHandler = self.errorHandler
- var val = self.value
- self.value = null
- self.callback = noop
- if (self.errorHandler) {
- errorHandler(err, val)
- }
- callback.call(self.context, err, result)
- self.release(self)
- }
- }
-
- function queueAsPromised (context, worker, _concurrency) {
- if (typeof context === 'function') {
- _concurrency = worker
- worker = context
- context = null
- }
-
- function asyncWrapper (arg, cb) {
- worker.call(this, arg)
- .then(function (res) {
- cb(null, res)
- }, cb)
- }
-
- var queue = fastqueue(context, asyncWrapper, _concurrency)
-
- var pushCb = queue.push
- var unshiftCb = queue.unshift
-
- queue.push = push
- queue.unshift = unshift
- queue.drained = drained
-
- return queue
-
- function push (value) {
- var p = new Promise(function (resolve, reject) {
- pushCb(value, function (err, result) {
- if (err) {
- reject(err)
- return
- }
- resolve(result)
- })
- })
-
- // Let's fork the promise chain to
- // make the error bubble up to the user but
- // not lead to a unhandledRejection
- p.catch(noop)
-
- return p
- }
-
- function unshift (value) {
- var p = new Promise(function (resolve, reject) {
- unshiftCb(value, function (err, result) {
- if (err) {
- reject(err)
- return
- }
- resolve(result)
- })
- })
-
- // Let's fork the promise chain to
- // make the error bubble up to the user but
- // not lead to a unhandledRejection
- p.catch(noop)
-
- return p
- }
-
- function drained () {
- if (queue.idle()) {
- return new Promise(function (resolve) {
- resolve()
- })
- }
-
- var previousDrain = queue.drain
-
- var p = new Promise(function (resolve) {
- queue.drain = function () {
- previousDrain()
- resolve()
- }
- })
-
- return p
- }
- }
-
- module.exports = fastqueue
- module.exports.promise = queueAsPromised
|