You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

4849 lines
180 KiB
JavaScript

6 years ago
(function (global, factory) {
typeof exports === 'object' && typeof module !== 'undefined' ? factory(exports) :
typeof define === 'function' && define.amd ? define(['exports'], factory) :
(factory((global.async = {})));
}(this, (function (exports) { 'use strict';
/**
* Creates a continuation function with some arguments already applied.
*
* Useful as a shorthand when combined with other control flow functions. Any
* arguments passed to the returned function are added to the arguments
* originally passed to apply.
*
* @name apply
* @static
* @memberOf module:Utils
* @method
* @category Util
* @param {Function} fn - The function you want to eventually apply all
* arguments to. Invokes with (arguments...).
* @param {...*} arguments... - Any number of arguments to automatically apply
* when the continuation is called.
* @returns {Function} the partially-applied function
* @example
*
* // using apply
* async.parallel([
* async.apply(fs.writeFile, 'testfile1', 'test1'),
* async.apply(fs.writeFile, 'testfile2', 'test2')
* ]);
*
*
* // the same process without using apply
* async.parallel([
* function(callback) {
* fs.writeFile('testfile1', 'test1', callback);
* },
* function(callback) {
* fs.writeFile('testfile2', 'test2', callback);
* }
* ]);
*
* // It's possible to pass any number of additional arguments when calling the
* // continuation:
*
* node> var fn = async.apply(sys.puts, 'one');
* node> fn('two', 'three');
* one
* two
* three
*/
function apply(fn, ...args) {
return (...callArgs) => fn(...args,...callArgs);
}
function initialParams (fn) {
return function (...args/*, callback*/) {
var callback = args.pop();
return fn.call(this, args, callback);
};
}
/* istanbul ignore file */
var hasSetImmediate = typeof setImmediate === 'function' && setImmediate;
var hasNextTick = typeof process === 'object' && typeof process.nextTick === 'function';
function fallback(fn) {
setTimeout(fn, 0);
}
function wrap(defer) {
return (fn, ...args) => defer(() => fn(...args));
}
var _defer;
if (hasSetImmediate) {
_defer = setImmediate;
} else if (hasNextTick) {
_defer = process.nextTick;
} else {
_defer = fallback;
}
var setImmediate$1 = wrap(_defer);
/**
* Take a sync function and make it async, passing its return value to a
* callback. This is useful for plugging sync functions into a waterfall,
* series, or other async functions. Any arguments passed to the generated
* function will be passed to the wrapped function (except for the final
* callback argument). Errors thrown will be passed to the callback.
*
* If the function passed to `asyncify` returns a Promise, that promises's
* resolved/rejected state will be used to call the callback, rather than simply
* the synchronous return value.
*
* This also means you can asyncify ES2017 `async` functions.
*
* @name asyncify
* @static
* @memberOf module:Utils
* @method
* @alias wrapSync
* @category Util
* @param {Function} func - The synchronous function, or Promise-returning
* function to convert to an {@link AsyncFunction}.
* @returns {AsyncFunction} An asynchronous wrapper of the `func`. To be
* invoked with `(args..., callback)`.
* @example
*
* // passing a regular synchronous function
* async.waterfall([
* async.apply(fs.readFile, filename, "utf8"),
* async.asyncify(JSON.parse),
* function (data, next) {
* // data is the result of parsing the text.
* // If there was a parsing error, it would have been caught.
* }
* ], callback);
*
* // passing a function returning a promise
* async.waterfall([
* async.apply(fs.readFile, filename, "utf8"),
* async.asyncify(function (contents) {
* return db.model.create(contents);
* }),
* function (model, next) {
* // `model` is the instantiated model object.
* // If there was an error, this function would be skipped.
* }
* ], callback);
*
* // es2017 example, though `asyncify` is not needed if your JS environment
* // supports async functions out of the box
* var q = async.queue(async.asyncify(async function(file) {
* var intermediateStep = await processFile(file);
* return await somePromise(intermediateStep)
* }));
*
* q.push(files);
*/
function asyncify(func) {
if (isAsync(func)) {
return function (...args/*, callback*/) {
const callback = args.pop();
const promise = func.apply(this, args);
return handlePromise(promise, callback)
}
}
return initialParams(function (args, callback) {
var result;
try {
result = func.apply(this, args);
} catch (e) {
return callback(e);
}
// if result is Promise object
if (result && typeof result.then === 'function') {
return handlePromise(result, callback)
} else {
callback(null, result);
}
});
}
function handlePromise(promise, callback) {
return promise.then(value => {
invokeCallback(callback, null, value);
}, err => {
invokeCallback(callback, err && err.message ? err : new Error(err));
});
}
function invokeCallback(callback, error, value) {
try {
callback(error, value);
} catch (err) {
setImmediate$1(e => { throw e }, err);
}
}
function isAsync(fn) {
return fn[Symbol.toStringTag] === 'AsyncFunction';
}
function isAsyncGenerator(fn) {
return fn[Symbol.toStringTag] === 'AsyncGenerator';
}
function isAsyncIterable(obj) {
return typeof obj[Symbol.asyncIterator] === 'function';
}
function wrapAsync(asyncFn) {
if (typeof asyncFn !== 'function') throw new Error('expected a function')
return isAsync(asyncFn) ? asyncify(asyncFn) : asyncFn;
}
// conditionally promisify a function.
// only return a promise if a callback is omitted
function awaitify (asyncFn, arity = asyncFn.length) {
if (!arity) throw new Error('arity is undefined')
function awaitable (...args) {
if (typeof args[arity - 1] === 'function') {
return asyncFn.apply(this, args)
}
return new Promise((resolve, reject) => {
args[arity - 1] = (err, ...cbArgs) => {
if (err) return reject(err)
resolve(cbArgs.length > 1 ? cbArgs : cbArgs[0]);
};
asyncFn.apply(this, args);
})
}
Object.defineProperty(awaitable, 'name', {
value: `awaitable(${asyncFn.name})`
});
return awaitable
}
function applyEach (eachfn) {
return function applyEach(fns, ...callArgs) {
const go = awaitify(function (callback) {
var that = this;
return eachfn(fns, (fn, cb) => {
wrapAsync(fn).apply(that, callArgs.concat(cb));
}, callback);
});
return go;
};
}
function _asyncMap(eachfn, arr, iteratee, callback) {
arr = arr || [];
var results = [];
var counter = 0;
var _iteratee = wrapAsync(iteratee);
return eachfn(arr, (value, _, iterCb) => {
var index = counter++;
_iteratee(value, (err, v) => {
results[index] = v;
iterCb(err);
});
}, err => {
callback(err, results);
});
}
function isArrayLike(value) {
return value &&
typeof value.length === 'number' &&
value.length >= 0 &&
value.length % 1 === 0;
}
// A temporary value used to identify if the loop should be broken.
// See #1064, #1293
const breakLoop = {};
function once(fn) {
function wrapper (...args) {
if (fn === null) return;
var callFn = fn;
fn = null;
callFn.apply(this, args);
}
Object.assign(wrapper, fn);
return wrapper
}
function getIterator (coll) {
return coll[Symbol.iterator] && coll[Symbol.iterator]();
}
function createArrayIterator(coll) {
var i = -1;
var len = coll.length;
return function next() {
return ++i < len ? {value: coll[i], key: i} : null;
}
}
function createES2015Iterator(iterator) {
var i = -1;
return function next() {
var item = iterator.next();
if (item.done)
return null;
i++;
return {value: item.value, key: i};
}
}
function createObjectIterator(obj) {
var okeys = obj ? Object.keys(obj) : [];
var i = -1;
var len = okeys.length;
return function next() {
var key = okeys[++i];
return i < len ? {value: obj[key], key} : null;
};
}
function createIterator(coll) {
if (isArrayLike(coll)) {
return createArrayIterator(coll);
}
var iterator = getIterator(coll);
return iterator ? createES2015Iterator(iterator) : createObjectIterator(coll);
}
function onlyOnce(fn) {
return function (...args) {
if (fn === null) throw new Error("Callback was already called.");
var callFn = fn;
fn = null;
callFn.apply(this, args);
};
}
// for async generators
function asyncEachOfLimit(generator, limit, iteratee, callback) {
let done = false;
let canceled = false;
let awaiting = false;
let running = 0;
let idx = 0;
function replenish() {
//console.log('replenish')
if (running >= limit || awaiting || done) return
//console.log('replenish awaiting')
awaiting = true;
generator.next().then(({value, done: iterDone}) => {
//console.log('got value', value)
if (canceled || done) return
awaiting = false;
if (iterDone) {
done = true;
if (running <= 0) {
//console.log('done nextCb')
callback(null);
}
return;
}
running++;
iteratee(value, idx, iterateeCallback);
idx++;
replenish();
}).catch(handleError);
}
function iterateeCallback(err, result) {
//console.log('iterateeCallback')
running -= 1;
if (canceled) return
if (err) return handleError(err)
if (err === false) {
done = true;
canceled = true;
return
}
if (result === breakLoop || (done && running <= 0)) {
done = true;
//console.log('done iterCb')
return callback(null);
}
replenish();
}
function handleError(err) {
if (canceled) return
awaiting = false;
done = true;
callback(err);
}
replenish();
}
var eachOfLimit = (limit) => {
return (obj, iteratee, callback) => {
callback = once(callback);
if (limit <= 0) {
throw new RangeError('concurrency limit cannot be less than 1')
}
if (!obj) {
return callback(null);
}
if (isAsyncGenerator(obj)) {
return asyncEachOfLimit(obj, limit, iteratee, callback)
}
if (isAsyncIterable(obj)) {
return asyncEachOfLimit(obj[Symbol.asyncIterator](), limit, iteratee, callback)
}
var nextElem = createIterator(obj);
var done = false;
var canceled = false;
var running = 0;
var looping = false;
function iterateeCallback(err, value) {
if (canceled) return
running -= 1;
if (err) {
done = true;
callback(err);
}
else if (err === false) {
done = true;
canceled = true;
}
else if (value === breakLoop || (done && running <= 0)) {
done = true;
return callback(null);
}
else if (!looping) {
replenish();
}
}
function replenish () {
looping = true;
while (running < limit && !done) {
var elem = nextElem();
if (elem === null) {
done = true;
if (running <= 0) {
callback(null);
}
return;
}
running += 1;
iteratee(elem.value, elem.key, onlyOnce(iterateeCallback));
}
looping = false;
}
replenish();
};
};
/**
* The same as [`eachOf`]{@link module:Collections.eachOf} but runs a maximum of `limit` async operations at a
* time.
*
* @name eachOfLimit
* @static
* @memberOf module:Collections
* @method
* @see [async.eachOf]{@link module:Collections.eachOf}
* @alias forEachOfLimit
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async function to apply to each
* item in `coll`. The `key` is the item's key, or index in the case of an
* array.
* Invoked with (item, key, callback).
* @param {Function} [callback] - A callback which is called when all
* `iteratee` functions have finished, or an error occurs. Invoked with (err).
* @returns {Promise} a promise, if a callback is omitted
*/
function eachOfLimit$1(coll, limit, iteratee, callback) {
return eachOfLimit(limit)(coll, wrapAsync(iteratee), callback);
}
var eachOfLimit$2 = awaitify(eachOfLimit$1, 4);
// eachOf implementation optimized for array-likes
function eachOfArrayLike(coll, iteratee, callback) {
callback = once(callback);
var index = 0,
completed = 0,
{length} = coll,
canceled = false;
if (length === 0) {
callback(null);
}
function iteratorCallback(err, value) {
if (err === false) {
canceled = true;
}
if (canceled === true) return
if (err) {
callback(err);
} else if ((++completed === length) || value === breakLoop) {
callback(null);
}
}
for (; index < length; index++) {
iteratee(coll[index], index, onlyOnce(iteratorCallback));
}
}
// a generic version of eachOf which can handle array, object, and iterator cases.
function eachOfGeneric (coll, iteratee, callback) {
return eachOfLimit$2(coll, Infinity, iteratee, callback);
}
/**
* Like [`each`]{@link module:Collections.each}, except that it passes the key (or index) as the second argument
* to the iteratee.
*
* @name eachOf
* @static
* @memberOf module:Collections
* @method
* @alias forEachOf
* @category Collection
* @see [async.each]{@link module:Collections.each}
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each
* item in `coll`.
* The `key` is the item's key, or index in the case of an array.
* Invoked with (item, key, callback).
* @param {Function} [callback] - A callback which is called when all
* `iteratee` functions have finished, or an error occurs. Invoked with (err).
* @returns {Promise} a promise, if a callback is omitted
* @example
*
* var obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"};
* var configs = {};
*
* async.forEachOf(obj, function (value, key, callback) {
* fs.readFile(__dirname + value, "utf8", function (err, data) {
* if (err) return callback(err);
* try {
* configs[key] = JSON.parse(data);
* } catch (e) {
* return callback(e);
* }
* callback();
* });
* }, function (err) {
* if (err) console.error(err.message);
* // configs is now a map of JSON data
* doSomethingWith(configs);
* });
*/
function eachOf(coll, iteratee, callback) {
var eachOfImplementation = isArrayLike(coll) ? eachOfArrayLike : eachOfGeneric;
return eachOfImplementation(coll, wrapAsync(iteratee), callback);
}
var eachOf$1 = awaitify(eachOf, 3);
/**
* Produces a new collection of values by mapping each value in `coll` through
* the `iteratee` function. The `iteratee` is called with an item from `coll`
* and a callback for when it has finished processing. Each of these callback
* takes 2 arguments: an `error`, and the transformed item from `coll`. If
* `iteratee` passes an error to its callback, the main `callback` (for the
* `map` function) is immediately called with the error.
*
* Note, that since this function applies the `iteratee` to each item in
* parallel, there is no guarantee that the `iteratee` functions will complete
* in order. However, the results array will be in the same order as the
* original `coll`.
*
* If `map` is passed an Object, the results will be an Array. The results
* will roughly be in the order of the original Objects' keys (but this can
* vary across JavaScript engines).
*
* @name map
* @static
* @memberOf module:Collections
* @method
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* The iteratee should complete with the transformed item.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called when all `iteratee`
* functions have finished, or an error occurs. Results is an Array of the
* transformed items from the `coll`. Invoked with (err, results).
* @returns {Promise} a promise, if no callback is passed
* @example
*
* async.map(['file1','file2','file3'], fs.stat, function(err, results) {
* // results is now an array of stats for each file
* });
*/
function map (coll, iteratee, callback) {
return _asyncMap(eachOf$1, coll, iteratee, callback)
}
var map$1 = awaitify(map, 3);
/**
* Applies the provided arguments to each function in the array, calling
* `callback` after all functions have completed. If you only provide the first
* argument, `fns`, then it will return a function which lets you pass in the
* arguments as if it were a single function call. If more arguments are
* provided, `callback` is required while `args` is still optional. The results
* for each of the applied async functions are passed to the final callback
* as an array.
*
* @name applyEach
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {Array|Iterable|AsyncIterable|Object} fns - A collection of {@link AsyncFunction}s
* to all call with the same arguments
* @param {...*} [args] - any number of separate arguments to pass to the
* function.
* @param {Function} [callback] - the final argument should be the callback,
* called when all functions have completed processing.
* @returns {AsyncFunction} - Returns a function that takes no args other than
* an optional callback, that is the result of applying the `args` to each
* of the functions.
* @example
*
* const appliedFn = async.applyEach([enableSearch, updateSchema], 'bucket')
*
* appliedFn((err, results) => {
* // results[0] is the results for `enableSearch`
* // results[1] is the results for `updateSchema`
* });
*
* // partial application example:
* async.each(
* buckets,
* async (bucket) => async.applyEach([enableSearch, updateSchema], bucket)(),
* callback
* );
*/
var applyEach$1 = applyEach(map$1);
/**
* The same as [`eachOf`]{@link module:Collections.eachOf} but runs only a single async operation at a time.
*
* @name eachOfSeries
* @static
* @memberOf module:Collections
* @method
* @see [async.eachOf]{@link module:Collections.eachOf}
* @alias forEachOfSeries
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* Invoked with (item, key, callback).
* @param {Function} [callback] - A callback which is called when all `iteratee`
* functions have finished, or an error occurs. Invoked with (err).
* @returns {Promise} a promise, if a callback is omitted
*/
function eachOfSeries(coll, iteratee, callback) {
return eachOfLimit$2(coll, 1, iteratee, callback)
}
var eachOfSeries$1 = awaitify(eachOfSeries, 3);
/**
* The same as [`map`]{@link module:Collections.map} but runs only a single async operation at a time.
*
* @name mapSeries
* @static
* @memberOf module:Collections
* @method
* @see [async.map]{@link module:Collections.map}
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* The iteratee should complete with the transformed item.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called when all `iteratee`
* functions have finished, or an error occurs. Results is an array of the
* transformed items from the `coll`. Invoked with (err, results).
* @returns {Promise} a promise, if no callback is passed
*/
function mapSeries (coll, iteratee, callback) {
return _asyncMap(eachOfSeries$1, coll, iteratee, callback)
}
var mapSeries$1 = awaitify(mapSeries, 3);
/**
* The same as [`applyEach`]{@link module:ControlFlow.applyEach} but runs only a single async operation at a time.
*
* @name applyEachSeries
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.applyEach]{@link module:ControlFlow.applyEach}
* @category Control Flow
* @param {Array|Iterable|AsyncIterable|Object} fns - A collection of {@link AsyncFunction}s to all
* call with the same arguments
* @param {...*} [args] - any number of separate arguments to pass to the
* function.
* @param {Function} [callback] - the final argument should be the callback,
* called when all functions have completed processing.
* @returns {AsyncFunction} - A function, that when called, is the result of
* appling the `args` to the list of functions. It takes no args, other than
* a callback.
*/
var applyEachSeries = applyEach(mapSeries$1);
const PROMISE_SYMBOL = Symbol('promiseCallback');
function promiseCallback () {
let resolve, reject;
function callback (err, ...args) {
if (err) return reject(err)
resolve(args.length > 1 ? args : args[0]);
}
callback[PROMISE_SYMBOL] = new Promise((res, rej) => {
resolve = res,
reject = rej;
});
return callback
}
/**
* Determines the best order for running the {@link AsyncFunction}s in `tasks`, based on
* their requirements. Each function can optionally depend on other functions
* being completed first, and each function is run as soon as its requirements
* are satisfied.
*
* If any of the {@link AsyncFunction}s pass an error to their callback, the `auto` sequence
* will stop. Further tasks will not execute (so any other functions depending
* on it will not run), and the main `callback` is immediately called with the
* error.
*
* {@link AsyncFunction}s also receive an object containing the results of functions which
* have completed so far as the first argument, if they have dependencies. If a
* task function has no dependencies, it will only be passed a callback.
*
* @name auto
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {Object} tasks - An object. Each of its properties is either a
* function or an array of requirements, with the {@link AsyncFunction} itself the last item
* in the array. The object's key of a property serves as the name of the task
* defined by that property, i.e. can be used when specifying requirements for
* other tasks. The function receives one or two arguments:
* * a `results` object, containing the results of the previously executed
* functions, only passed if the task has any dependencies,
* * a `callback(err, result)` function, which must be called when finished,
* passing an `error` (which can be `null`) and the result of the function's
* execution.
* @param {number} [concurrency=Infinity] - An optional `integer` for
* determining the maximum number of tasks that can be run in parallel. By
* default, as many as possible.
* @param {Function} [callback] - An optional callback which is called when all
* the tasks have been completed. It receives the `err` argument if any `tasks`
* pass an error to their callback. Results are always returned; however, if an
* error occurs, no further `tasks` will be performed, and the results object
* will only contain partial results. Invoked with (err, results).
* @returns {Promise} a promise, if a callback is not passed
* @example
*
* async.auto({
* // this function will just be passed a callback
* readData: async.apply(fs.readFile, 'data.txt', 'utf-8'),
* showData: ['readData', function(results, cb) {
* // results.readData is the file's contents
* // ...
* }]
* }, callback);
*
* async.auto({
* get_data: function(callback) {
* console.log('in get_data');
* // async code to get some data
* callback(null, 'data', 'converted to array');
* },
* make_folder: function(callback) {
* console.log('in make_folder');
* // async code to create a directory to store a file in
* // this is run at the same time as getting the data
* callback(null, 'folder');
* },
* write_file: ['get_data', 'make_folder', function(results, callback) {
* console.log('in write_file', JSON.stringify(results));
* // once there is some data and the directory exists,
* // write the data to a file in the directory
* callback(null, 'filename');
* }],
* email_link: ['write_file', function(results, callback) {
* console.log('in email_link', JSON.stringify(results));
* // once the file is written let's email a link to it...
* // results.write_file contains the filename returned by write_file.
* callback(null, {'file':results.write_file, 'email':'user@example.com'});
* }]
* }, function(err, results) {
* console.log('err = ', err);
* console.log('results = ', results);
* });
*/
function auto(tasks, concurrency, callback) {
if (typeof concurrency !== 'number') {
// concurrency is optional, shift the args.
callback = concurrency;
concurrency = null;
}
callback = once(callback || promiseCallback());
var numTasks = Object.keys(tasks).length;
if (!numTasks) {
return callback(null);
}
if (!concurrency) {
concurrency = numTasks;
}
var results = {};
var runningTasks = 0;
var canceled = false;
var hasError = false;
var listeners = Object.create(null);
var readyTasks = [];
// for cycle detection:
var readyToCheck = []; // tasks that have been identified as reachable
// without the possibility of returning to an ancestor task
var uncheckedDependencies = {};
Object.keys(tasks).forEach(key => {
var task = tasks[key];
if (!Array.isArray(task)) {
// no dependencies
enqueueTask(key, [task]);
readyToCheck.push(key);
return;
}
var dependencies = task.slice(0, task.length - 1);
var remainingDependencies = dependencies.length;
if (remainingDependencies === 0) {
enqueueTask(key, task);
readyToCheck.push(key);
return;
}
uncheckedDependencies[key] = remainingDependencies;
dependencies.forEach(dependencyName => {
if (!tasks[dependencyName]) {
throw new Error('async.auto task `' + key +
'` has a non-existent dependency `' +
dependencyName + '` in ' +
dependencies.join(', '));
}
addListener(dependencyName, () => {
remainingDependencies--;
if (remainingDependencies === 0) {
enqueueTask(key, task);
}
});
});
});
checkForDeadlocks();
processQueue();
function enqueueTask(key, task) {
readyTasks.push(() => runTask(key, task));
}
function processQueue() {
if (canceled) return
if (readyTasks.length === 0 && runningTasks === 0) {
return callback(null, results);
}
while(readyTasks.length && runningTasks < concurrency) {
var run = readyTasks.shift();
run();
}
}
function addListener(taskName, fn) {
var taskListeners = listeners[taskName];
if (!taskListeners) {
taskListeners = listeners[taskName] = [];
}
taskListeners.push(fn);
}
function taskComplete(taskName) {
var taskListeners = listeners[taskName] || [];
taskListeners.forEach(fn => fn());
processQueue();
}
function runTask(key, task) {
if (hasError) return;
var taskCallback = onlyOnce((err, ...result) => {
runningTasks--;
if (err === false) {
canceled = true;
return
}
if (result.length < 2) {
[result] = result;
}
if (err) {
var safeResults = {};
Object.keys(results).forEach(rkey => {
safeResults[rkey] = results[rkey];
});
safeResults[key] = result;
hasError = true;
listeners = Object.create(null);
if (canceled) return
callback(err, safeResults);
} else {
results[key] = result;
taskComplete(key);
}
});
runningTasks++;
var taskFn = wrapAsync(task[task.length - 1]);
if (task.length > 1) {
taskFn(results, taskCallback);
} else {
taskFn(taskCallback);
}
}
function checkForDeadlocks() {
// Kahn's algorithm
// https://en.wikipedia.org/wiki/Topological_sorting#Kahn.27s_algorithm
// http://connalle.blogspot.com/2013/10/topological-sortingkahn-algorithm.html
var currentTask;
var counter = 0;
while (readyToCheck.length) {
currentTask = readyToCheck.pop();
counter++;
getDependents(currentTask).forEach(dependent => {
if (--uncheckedDependencies[dependent] === 0) {
readyToCheck.push(dependent);
}
});
}
if (counter !== numTasks) {
throw new Error(
'async.auto cannot execute tasks due to a recursive dependency'
);
}
}
function getDependents(taskName) {
var result = [];
Object.keys(tasks).forEach(key => {
const task = tasks[key];
if (Array.isArray(task) && task.indexOf(taskName) >= 0) {
result.push(key);
}
});
return result;
}
return callback[PROMISE_SYMBOL]
}
var FN_ARGS = /^(?:async\s+)?(?:function)?\s*\w*\s*\(\s*([^)]+)\s*\)(?:\s*{)/;
var ARROW_FN_ARGS = /^(?:async\s+)?\(?\s*([^)=]+)\s*\)?(?:\s*=>)/;
var FN_ARG_SPLIT = /,/;
var FN_ARG = /(=.+)?(\s*)$/;
var STRIP_COMMENTS = /((\/\/.*$)|(\/\*[\s\S]*?\*\/))/mg;
function parseParams(func) {
const src = func.toString().replace(STRIP_COMMENTS, '');
let match = src.match(FN_ARGS);
if (!match) {
match = src.match(ARROW_FN_ARGS);
}
if (!match) throw new Error('could not parse args in autoInject\nSource:\n' + src)
let [, args] = match;
return args
.replace(/\s/g, '')
.split(FN_ARG_SPLIT)
.map((arg) => arg.replace(FN_ARG, '').trim());
}
/**
* A dependency-injected version of the [async.auto]{@link module:ControlFlow.auto} function. Dependent
* tasks are specified as parameters to the function, after the usual callback
* parameter, with the parameter names matching the names of the tasks it
* depends on. This can provide even more readable task graphs which can be
* easier to maintain.
*
* If a final callback is specified, the task results are similarly injected,
* specified as named parameters after the initial error parameter.
*
* The autoInject function is purely syntactic sugar and its semantics are
* otherwise equivalent to [async.auto]{@link module:ControlFlow.auto}.
*
* @name autoInject
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.auto]{@link module:ControlFlow.auto}
* @category Control Flow
* @param {Object} tasks - An object, each of whose properties is an {@link AsyncFunction} of
* the form 'func([dependencies...], callback). The object's key of a property
* serves as the name of the task defined by that property, i.e. can be used
* when specifying requirements for other tasks.
* * The `callback` parameter is a `callback(err, result)` which must be called
* when finished, passing an `error` (which can be `null`) and the result of
* the function's execution. The remaining parameters name other tasks on
* which the task is dependent, and the results from those tasks are the
* arguments of those parameters.
* @param {Function} [callback] - An optional callback which is called when all
* the tasks have been completed. It receives the `err` argument if any `tasks`
* pass an error to their callback, and a `results` object with any completed
* task results, similar to `auto`.
* @returns {Promise} a promise, if no callback is passed
* @example
*
* // The example from `auto` can be rewritten as follows:
* async.autoInject({
* get_data: function(callback) {
* // async code to get some data
* callback(null, 'data', 'converted to array');
* },
* make_folder: function(callback) {
* // async code to create a directory to store a file in
* // this is run at the same time as getting the data
* callback(null, 'folder');
* },
* write_file: function(get_data, make_folder, callback) {
* // once there is some data and the directory exists,
* // write the data to a file in the directory
* callback(null, 'filename');
* },
* email_link: function(write_file, callback) {
* // once the file is written let's email a link to it...
* // write_file contains the filename returned by write_file.
* callback(null, {'file':write_file, 'email':'user@example.com'});
* }
* }, function(err, results) {
* console.log('err = ', err);
* console.log('email_link = ', results.email_link);
* });
*
* // If you are using a JS minifier that mangles parameter names, `autoInject`
* // will not work with plain functions, since the parameter names will be
* // collapsed to a single letter identifier. To work around this, you can
* // explicitly specify the names of the parameters your task function needs
* // in an array, similar to Angular.js dependency injection.
*
* // This still has an advantage over plain `auto`, since the results a task
* // depends on are still spread into arguments.
* async.autoInject({
* //...
* write_file: ['get_data', 'make_folder', function(get_data, make_folder, callback) {
* callback(null, 'filename');
* }],
* email_link: ['write_file', function(write_file, callback) {
* callback(null, {'file':write_file, 'email':'user@example.com'});
* }]
* //...
* }, function(err, results) {
* console.log('err = ', err);
* console.log('email_link = ', results.email_link);
* });
*/
function autoInject(tasks, callback) {
var newTasks = {};
Object.keys(tasks).forEach(key => {
var taskFn = tasks[key];
var params;
var fnIsAsync = isAsync(taskFn);
var hasNoDeps =
(!fnIsAsync && taskFn.length === 1) ||
(fnIsAsync && taskFn.length === 0);
if (Array.isArray(taskFn)) {
params = [...taskFn];
taskFn = params.pop();
newTasks[key] = params.concat(params.length > 0 ? newTask : taskFn);
} else if (hasNoDeps) {
// no dependencies, use the function as-is
newTasks[key] = taskFn;
} else {
params = parseParams(taskFn);
if ((taskFn.length === 0 && !fnIsAsync) && params.length === 0) {
throw new Error("autoInject task functions require explicit parameters.");
}
// remove callback param
if (!fnIsAsync) params.pop();
newTasks[key] = params.concat(newTask);
}
function newTask(results, taskCb) {
var newArgs = params.map(name => results[name]);
newArgs.push(taskCb);
wrapAsync(taskFn)(...newArgs);
}
});
return auto(newTasks, callback);
}
// Simple doubly linked list (https://en.wikipedia.org/wiki/Doubly_linked_list) implementation
// used for queues. This implementation assumes that the node provided by the user can be modified
// to adjust the next and last properties. We implement only the minimal functionality
// for queue support.
class DLL {
constructor() {
this.head = this.tail = null;
this.length = 0;
}
removeLink(node) {
if (node.prev) node.prev.next = node.next;
else this.head = node.next;
if (node.next) node.next.prev = node.prev;
else this.tail = node.prev;
node.prev = node.next = null;
this.length -= 1;
return node;
}
empty () {
while(this.head) this.shift();
return this;
}
insertAfter(node, newNode) {
newNode.prev = node;
newNode.next = node.next;
if (node.next) node.next.prev = newNode;
else this.tail = newNode;
node.next = newNode;
this.length += 1;
}
insertBefore(node, newNode) {
newNode.prev = node.prev;
newNode.next = node;
if (node.prev) node.prev.next = newNode;
else this.head = newNode;
node.prev = newNode;
this.length += 1;
}
unshift(node) {
if (this.head) this.insertBefore(this.head, node);
else setInitial(this, node);
}
push(node) {
if (this.tail) this.insertAfter(this.tail, node);
else setInitial(this, node);
}
shift() {
return this.head && this.removeLink(this.head);
}
pop() {
return this.tail && this.removeLink(this.tail);
}
toArray() {
return [...this]
}
*[Symbol.iterator] () {
var cur = this.head;
while (cur) {
yield cur.data;
cur = cur.next;
}
}
remove (testFn) {
var curr = this.head;
while(curr) {
var {next} = curr;
if (testFn(curr)) {
this.removeLink(curr);
}
curr = next;
}
return this;
}
}
function setInitial(dll, node) {
dll.length = 1;
dll.head = dll.tail = node;
}
function queue(worker, concurrency, payload) {
if (concurrency == null) {
concurrency = 1;
}
else if(concurrency === 0) {
throw new RangeError('Concurrency must not be zero');
}
var _worker = wrapAsync(worker);
var numRunning = 0;
var workersList = [];
const events = {
error: [],
drain: [],
saturated: [],
unsaturated: [],
empty: []
};
function on (event, handler) {
events[event].push(handler);
}
function once (event, handler) {
const handleAndRemove = (...args) => {
off(event, handleAndRemove);
handler(...args);
};
events[event].push(handleAndRemove);
}
function off (event, handler) {
if (!event) return Object.keys(events).forEach(ev => events[ev] = [])
if (!handler) return events[event] = []
events[event] = events[event].filter(ev => ev !== handler);
}
function trigger (event, ...args) {
events[event].forEach(handler => handler(...args));
}
var processingScheduled = false;
function _insert(data, insertAtFront, rejectOnError, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
q.started = true;
var res, rej;
function promiseCallback (err, ...args) {
// we don't care about the error, let the global error handler
// deal with it
if (err) return rejectOnError ? rej(err) : res()
if (args.length <= 1) return res(args[0])
res(args);
}
var item = {
data,
callback: rejectOnError ?
promiseCallback :
(callback || promiseCallback)
};
if (insertAtFront) {
q._tasks.unshift(item);
} else {
q._tasks.push(item);
}
if (!processingScheduled) {
processingScheduled = true;
setImmediate$1(() => {
processingScheduled = false;
q.process();
});
}
if (rejectOnError || !callback) {
return new Promise((resolve, reject) => {
res = resolve;
rej = reject;
})
}
}
function _createCB(tasks) {
return function (err, ...args) {
numRunning -= 1;
for (var i = 0, l = tasks.length; i < l; i++) {
var task = tasks[i];
var index = workersList.indexOf(task);
if (index === 0) {
workersList.shift();
} else if (index > 0) {
workersList.splice(index, 1);
}
task.callback(err, ...args);
if (err != null) {
trigger('error', err, task.data);
}
}
if (numRunning <= (q.concurrency - q.buffer) ) {
trigger('unsaturated');
}
if (q.idle()) {
trigger('drain');
}
q.process();
};
}
function _maybeDrain(data) {
if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
setImmediate$1(() => trigger('drain'));
return true
}
return false
}
const eventMethod = (name) => (handler) => {
if (!handler) {
return new Promise((resolve, reject) => {
once(name, (err, data) => {
if (err) return reject(err)
resolve(data);
});
})
}
off(name);
on(name, handler);
};
var isProcessing = false;
var q = {
_tasks: new DLL(),
*[Symbol.iterator] () {
yield* q._tasks[Symbol.iterator]();
},
concurrency,
payload,
buffer: concurrency / 4,
started: false,
paused: false,
push (data, callback) {
if (Array.isArray(data)) {
if (_maybeDrain(data)) return
return data.map(datum => _insert(datum, false, false, callback))
}
return _insert(data, false, false, callback);
},
pushAsync (data, callback) {
if (Array.isArray(data)) {
if (_maybeDrain(data)) return
return data.map(datum => _insert(datum, false, true, callback))
}
return _insert(data, false, true, callback);
},
kill () {
off();
q._tasks.empty();
},
unshift (data, callback) {
if (Array.isArray(data)) {
if (_maybeDrain(data)) return
return data.map(datum => _insert(datum, true, false, callback))
}
return _insert(data, true, false, callback);
},
unshiftAsync (data, callback) {
if (Array.isArray(data)) {
if (_maybeDrain(data)) return
return data.map(datum => _insert(datum, true, true, callback))
}
return _insert(data, true, true, callback);
},
remove (testFn) {
q._tasks.remove(testFn);
},
process () {
// Avoid trying to start too many processing operations. This can occur
// when callbacks resolve synchronously (#1267).
if (isProcessing) {
return;
}
isProcessing = true;
while(!q.paused && numRunning < q.concurrency && q._tasks.length){
var tasks = [], data = [];
var l = q._tasks.length;
if (q.payload) l = Math.min(l, q.payload);
for (var i = 0; i < l; i++) {
var node = q._tasks.shift();
tasks.push(node);
workersList.push(node);
data.push(node.data);
}
numRunning += 1;
if (q._tasks.length === 0) {
trigger('empty');
}
if (numRunning === q.concurrency) {
trigger('saturated');
}
var cb = onlyOnce(_createCB(tasks));
_worker(data, cb);
}
isProcessing = false;
},
length () {
return q._tasks.length;
},
running () {
return numRunning;
},
workersList () {
return workersList;
},
idle() {
return q._tasks.length + numRunning === 0;
},
pause () {
q.paused = true;
},
resume () {
if (q.paused === false) { return; }
q.paused = false;
setImmediate$1(q.process);
}
};
// define these as fixed properties, so people get useful errors when updating
Object.defineProperties(q, {
saturated: {
writable: false,
value: eventMethod('saturated')
},
unsaturated: {
writable: false,
value: eventMethod('unsaturated')
},
empty: {
writable: false,
value: eventMethod('empty')
},
drain: {
writable: false,
value: eventMethod('drain')
},
error: {
writable: false,
value: eventMethod('error')
},
});
return q;
}
/**
* Creates a `cargo` object with the specified payload. Tasks added to the
* cargo will be processed altogether (up to the `payload` limit). If the
* `worker` is in progress, the task is queued until it becomes available. Once
* the `worker` has completed some tasks, each callback of those tasks is
* called. Check out [these](https://camo.githubusercontent.com/6bbd36f4cf5b35a0f11a96dcd2e97711ffc2fb37/68747470733a2f2f662e636c6f75642e6769746875622e636f6d2f6173736574732f313637363837312f36383130382f62626330636662302d356632392d313165322d393734662d3333393763363464633835382e676966) [animations](https://camo.githubusercontent.com/f4810e00e1c5f5f8addbe3e9f49064fd5d102699/68747470733a2f2f662e636c6f75642e6769746875622e636f6d2f6173736574732f313637363837312f36383130312f38346339323036362d356632392d313165322d383134662d3964336430323431336266642e676966)
* for how `cargo` and `queue` work.
*
* While [`queue`]{@link module:ControlFlow.queue} passes only one task to one of a group of workers
* at a time, cargo passes an array of tasks to a single worker, repeating
* when the worker is finished.
*
* @name cargo
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.queue]{@link module:ControlFlow.queue}
* @category Control Flow
* @param {AsyncFunction} worker - An asynchronous function for processing an array
* of queued tasks. Invoked with `(tasks, callback)`.
* @param {number} [payload=Infinity] - An optional `integer` for determining
* how many tasks should be processed per round; if omitted, the default is
* unlimited.
* @returns {module:ControlFlow.QueueObject} A cargo object to manage the tasks. Callbacks can
* attached as certain properties to listen for specific events during the
* lifecycle of the cargo and inner queue.
* @example
*
* // create a cargo object with payload 2
* var cargo = async.cargo(function(tasks, callback) {
* for (var i=0; i<tasks.length; i++) {
* console.log('hello ' + tasks[i].name);
* }
* callback();
* }, 2);
*
* // add some items
* cargo.push({name: 'foo'}, function(err) {
* console.log('finished processing foo');
* });
* cargo.push({name: 'bar'}, function(err) {
* console.log('finished processing bar');
* });
* await cargo.push({name: 'baz'});
* console.log('finished processing baz');
*/
function cargo(worker, payload) {
return queue(worker, 1, payload);
}
/**
* Creates a `cargoQueue` object with the specified payload. Tasks added to the
* cargoQueue will be processed together (up to the `payload` limit) in `concurrency` parallel workers.
* If the all `workers` are in progress, the task is queued until one becomes available. Once
* a `worker` has completed some tasks, each callback of those tasks is
* called. Check out [these](https://camo.githubusercontent.com/6bbd36f4cf5b35a0f11a96dcd2e97711ffc2fb37/68747470733a2f2f662e636c6f75642e6769746875622e636f6d2f6173736574732f313637363837312f36383130382f62626330636662302d356632392d313165322d393734662d3333393763363464633835382e676966) [animations](https://camo.githubusercontent.com/f4810e00e1c5f5f8addbe3e9f49064fd5d102699/68747470733a2f2f662e636c6f75642e6769746875622e636f6d2f6173736574732f313637363837312f36383130312f38346339323036362d356632392d313165322d383134662d3964336430323431336266642e676966)
* for how `cargo` and `queue` work.
*
* While [`queue`]{@link module:ControlFlow.queue} passes only one task to one of a group of workers
* at a time, and [`cargo`]{@link module:ControlFlow.cargo} passes an array of tasks to a single worker,
* the cargoQueue passes an array of tasks to multiple parallel workers.
*
* @name cargoQueue
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.queue]{@link module:ControlFlow.queue}
* @see [async.cargo]{@link module:ControlFLow.cargo}
* @category Control Flow
* @param {AsyncFunction} worker - An asynchronous function for processing an array
* of queued tasks. Invoked with `(tasks, callback)`.
* @param {number} [concurrency=1] - An `integer` for determining how many
* `worker` functions should be run in parallel. If omitted, the concurrency
* defaults to `1`. If the concurrency is `0`, an error is thrown.
* @param {number} [payload=Infinity] - An optional `integer` for determining
* how many tasks should be processed per round; if omitted, the default is
* unlimited.
* @returns {module:ControlFlow.CargoObject} A cargoQueue object to manage the tasks. Callbacks can
* attached as certain properties to listen for specific events during the
* lifecycle of the cargoQueue and inner queue.
* @example
*
* // create a cargoQueue object with payload 2 and concurrency 2
* var cargoQueue = async.cargoQueue(function(tasks, callback) {
* for (var i=0; i<tasks.length; i++) {
* console.log('hello ' + tasks[i].name);
* }
* callback();
* }, 2, 2);
*
* // add some items
* cargoQueue.push({name: 'foo'}, function(err) {
* console.log('finished processing foo');
* });
* cargoQueue.push({name: 'bar'}, function(err) {
* console.log('finished processing bar');
* });
* cargoQueue.push({name: 'baz'}, function(err) {
* console.log('finished processing baz');
* });
* cargoQueue.push({name: 'boo'}, function(err) {
* console.log('finished processing boo');
* });
*/
function cargo$1(worker, concurrency, payload) {
return queue(worker, concurrency, payload);
}
/**
* Reduces `coll` into a single value using an async `iteratee` to return each
* successive step. `memo` is the initial state of the reduction. This function
* only operates in series.
*
* For performance reasons, it may make sense to split a call to this function
* into a parallel map, and then use the normal `Array.prototype.reduce` on the
* results. This function is for situations where each step in the reduction
* needs to be async; if you can get the data before reducing it, then it's
* probably a good idea to do so.
*
* @name reduce
* @static
* @memberOf module:Collections
* @method
* @alias inject
* @alias foldl
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {*} memo - The initial state of the reduction.
* @param {AsyncFunction} iteratee - A function applied to each item in the
* array to produce the next step in the reduction.
* The `iteratee` should complete with the next state of the reduction.
* If the iteratee complete with an error, the reduction is stopped and the
* main `callback` is immediately called with the error.
* Invoked with (memo, item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished. Result is the reduced value. Invoked with
* (err, result).
* @returns {Promise} a promise, if no callback is passed
* @example
*
* async.reduce([1,2,3], 0, function(memo, item, callback) {
* // pointless async:
* process.nextTick(function() {
* callback(null, memo + item)
* });
* }, function(err, result) {
* // result is now equal to the last value of memo, which is 6
* });
*/
function reduce(coll, memo, iteratee, callback) {
callback = once(callback);
var _iteratee = wrapAsync(iteratee);
return eachOfSeries$1(coll, (x, i, iterCb) => {
_iteratee(memo, x, (err, v) => {
memo = v;
iterCb(err);
});
}, err => callback(err, memo));
}
var reduce$1 = awaitify(reduce, 4);
/**
* Version of the compose function that is more natural to read. Each function
* consumes the return value of the previous function. It is the equivalent of
* [compose]{@link module:ControlFlow.compose} with the arguments reversed.
*
* Each function is executed with the `this` binding of the composed function.
*
* @name seq
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.compose]{@link module:ControlFlow.compose}
* @category Control Flow
* @param {...AsyncFunction} functions - the asynchronous functions to compose
* @returns {Function} a function that composes the `functions` in order
* @example
*
* // Requires lodash (or underscore), express3 and dresende's orm2.
* // Part of an app, that fetches cats of the logged user.
* // This example uses `seq` function to avoid overnesting and error
* // handling clutter.
* app.get('/cats', function(request, response) {
* var User = request.models.User;
* async.seq(
* _.bind(User.get, User), // 'User.get' has signature (id, callback(err, data))
* function(user, fn) {
* user.getCats(fn); // 'getCats' has signature (callback(err, data))
* }
* )(req.session.user_id, function (err, cats) {
* if (err) {
* console.error(err);
* response.json({ status: 'error', message: err.message });
* } else {
* response.json({ status: 'ok', message: 'Cats found', data: cats });
* }
* });
* });
*/
function seq(...functions) {
var _functions = functions.map(wrapAsync);
return function (...args) {
var that = this;
var cb = args[args.length - 1];
if (typeof cb == 'function') {
args.pop();
} else {
cb = promiseCallback();
}
reduce$1(_functions, args, (newargs, fn, iterCb) => {
fn.apply(that, newargs.concat((err, ...nextargs) => {
iterCb(err, nextargs);
}));
},
(err, results) => cb(err, ...results));
return cb[PROMISE_SYMBOL]
};
}
/**
* Creates a function which is a composition of the passed asynchronous
* functions. Each function consumes the return value of the function that
* follows. Composing functions `f()`, `g()`, and `h()` would produce the result
* of `f(g(h()))`, only this version uses callbacks to obtain the return values.
*
* If the last argument to the composed function is not a function, a promise
* is returned when you call it.
*
* Each function is executed with the `this` binding of the composed function.
*
* @name compose
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {...AsyncFunction} functions - the asynchronous functions to compose
* @returns {Function} an asynchronous function that is the composed
* asynchronous `functions`
* @example
*
* function add1(n, callback) {
* setTimeout(function () {
* callback(null, n + 1);
* }, 10);
* }
*
* function mul3(n, callback) {
* setTimeout(function () {
* callback(null, n * 3);
* }, 10);
* }
*
* var add1mul3 = async.compose(mul3, add1);
* add1mul3(4, function (err, result) {
* // result now equals 15
* });
*/
function compose(...args) {
return seq(...args.reverse());
}
/**
* The same as [`map`]{@link module:Collections.map} but runs a maximum of `limit` async operations at a time.
*
* @name mapLimit
* @static
* @memberOf module:Collections
* @method
* @see [async.map]{@link module:Collections.map}
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* The iteratee should complete with the transformed item.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called when all `iteratee`
* functions have finished, or an error occurs. Results is an array of the
* transformed items from the `coll`. Invoked with (err, results).
* @returns {Promise} a promise, if no callback is passed
*/
function mapLimit (coll, limit, iteratee, callback) {
return _asyncMap(eachOfLimit(limit), coll, iteratee, callback)
}
var mapLimit$1 = awaitify(mapLimit, 4);
/**
* The same as [`concat`]{@link module:Collections.concat} but runs a maximum of `limit` async operations at a time.
*
* @name concatLimit
* @static
* @memberOf module:Collections
* @method
* @see [async.concat]{@link module:Collections.concat}
* @category Collection
* @alias flatMapLimit
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`,
* which should use an array as its result. Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished, or an error occurs. Results is an array
* containing the concatenated results of the `iteratee` function. Invoked with
* (err, results).
* @returns A Promise, if no callback is passed
*/
function concatLimit(coll, limit, iteratee, callback) {
var _iteratee = wrapAsync(iteratee);
return mapLimit$1(coll, limit, (val, iterCb) => {
_iteratee(val, (err, ...args) => {
if (err) return iterCb(err);
return iterCb(err, args);
});
}, (err, mapResults) => {
var result = [];
for (var i = 0; i < mapResults.length; i++) {
if (mapResults[i]) {
result = result.concat(...mapResults[i]);
}
}
return callback(err, result);
});
}
var concatLimit$1 = awaitify(concatLimit, 4);
/**
* Applies `iteratee` to each item in `coll`, concatenating the results. Returns
* the concatenated list. The `iteratee`s are called in parallel, and the
* results are concatenated as they return. The results array will be returned in
* the original order of `coll` passed to the `iteratee` function.
*
* @name concat
* @static
* @memberOf module:Collections
* @method
* @category Collection
* @alias flatMap
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`,
* which should use an array as its result. Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished, or an error occurs. Results is an array
* containing the concatenated results of the `iteratee` function. Invoked with
* (err, results).
* @returns A Promise, if no callback is passed
* @example
*
* async.concat(['dir1','dir2','dir3'], fs.readdir, function(err, files) {
* // files is now a list of filenames that exist in the 3 directories
* });
*/
function concat(coll, iteratee, callback) {
return concatLimit$1(coll, Infinity, iteratee, callback)
}
var concat$1 = awaitify(concat, 3);
/**
* The same as [`concat`]{@link module:Collections.concat} but runs only a single async operation at a time.
*
* @name concatSeries
* @static
* @memberOf module:Collections
* @method
* @see [async.concat]{@link module:Collections.concat}
* @category Collection
* @alias flatMapSeries
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`.
* The iteratee should complete with an array an array of results.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished, or an error occurs. Results is an array
* containing the concatenated results of the `iteratee` function. Invoked with
* (err, results).
* @returns A Promise, if no callback is passed
*/
function concatSeries(coll, iteratee, callback) {
return concatLimit$1(coll, 1, iteratee, callback)
}
var concatSeries$1 = awaitify(concatSeries, 3);
/**
* Returns a function that when called, calls-back with the values provided.
* Useful as the first function in a [`waterfall`]{@link module:ControlFlow.waterfall}, or for plugging values in to
* [`auto`]{@link module:ControlFlow.auto}.
*
* @name constant
* @static
* @memberOf module:Utils
* @method
* @category Util
* @param {...*} arguments... - Any number of arguments to automatically invoke
* callback with.
* @returns {AsyncFunction} Returns a function that when invoked, automatically
* invokes the callback with the previous given arguments.
* @example
*
* async.waterfall([
* async.constant(42),
* function (value, next) {
* // value === 42
* },
* //...
* ], callback);
*
* async.waterfall([
* async.constant(filename, "utf8"),
* fs.readFile,
* function (fileData, next) {
* //...
* }
* //...
* ], callback);
*
* async.auto({
* hostname: async.constant("https://server.net/"),
* port: findFreePort,
* launchServer: ["hostname", "port", function (options, cb) {
* startServer(options, cb);
* }],
* //...
* }, callback);
*/
function constant(...args) {
return function (...ignoredArgs/*, callback*/) {
var callback = ignoredArgs.pop();
return callback(null, ...args);
};
}
function _createTester(check, getResult) {
return (eachfn, arr, _iteratee, cb) => {
var testPassed = false;
var testResult;
const iteratee = wrapAsync(_iteratee);
eachfn(arr, (value, _, callback) => {
iteratee(value, (err, result) => {
if (err || err === false) return callback(err);
if (check(result) && !testResult) {
testPassed = true;
testResult = getResult(true, value);
return callback(null, breakLoop);
}
callback();
});
}, err => {
if (err) return cb(err);
cb(null, testPassed ? testResult : getResult(false));
});
};
}
/**
* Returns the first value in `coll` that passes an async truth test. The
* `iteratee` is applied in parallel, meaning the first iteratee to return
* `true` will fire the detect `callback` with that result. That means the
* result might not be the first item in the original `coll` (in terms of order)
* that passes the test.
* If order within the original `coll` is important, then look at
* [`detectSeries`]{@link module:Collections.detectSeries}.
*
* @name detect
* @static
* @memberOf module:Collections
* @method
* @alias find
* @category Collections
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A truth test to apply to each item in `coll`.
* The iteratee must complete with a boolean value as its result.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called as soon as any
* iteratee returns `true`, or after all the `iteratee` functions have finished.
* Result will be the first item in the array that passes the truth test
* (iteratee) or the value `undefined` if none passed. Invoked with
* (err, result).
* @returns A Promise, if no callback is passed
* @example
*
* async.detect(['file1','file2','file3'], function(filePath, callback) {
* fs.access(filePath, function(err) {
* callback(null, !err)
* });
* }, function(err, result) {
* // result now equals the first file in the list that exists
* });
*/
function detect(coll, iteratee, callback) {
return _createTester(bool => bool, (res, item) => item)(eachOf$1, coll, iteratee, callback)
}
var detect$1 = awaitify(detect, 3);
/**
* The same as [`detect`]{@link module:Collections.detect} but runs a maximum of `limit` async operations at a
* time.
*
* @name detectLimit
* @static
* @memberOf module:Collections
* @method
* @see [async.detect]{@link module:Collections.detect}
* @alias findLimit
* @category Collections
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - A truth test to apply to each item in `coll`.
* The iteratee must complete with a boolean value as its result.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called as soon as any
* iteratee returns `true`, or after all the `iteratee` functions have finished.
* Result will be the first item in the array that passes the truth test
* (iteratee) or the value `undefined` if none passed. Invoked with
* (err, result).
* @returns a Promise if no callback is passed
*/
function detectLimit(coll, limit, iteratee, callback) {
return _createTester(bool => bool, (res, item) => item)(eachOfLimit(limit), coll, iteratee, callback)
}
var detectLimit$1 = awaitify(detectLimit, 4);
/**
* The same as [`detect`]{@link module:Collections.detect} but runs only a single async operation at a time.
*
* @name detectSeries
* @static
* @memberOf module:Collections
* @method
* @see [async.detect]{@link module:Collections.detect}
* @alias findSeries
* @category Collections
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A truth test to apply to each item in `coll`.
* The iteratee must complete with a boolean value as its result.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called as soon as any
* iteratee returns `true`, or after all the `iteratee` functions have finished.
* Result will be the first item in the array that passes the truth test
* (iteratee) or the value `undefined` if none passed. Invoked with
* (err, result).
* @returns a Promise if no callback is passed
*/
function detectSeries(coll, iteratee, callback) {
return _createTester(bool => bool, (res, item) => item)(eachOfLimit(1), coll, iteratee, callback)
}
var detectSeries$1 = awaitify(detectSeries, 3);
function consoleFunc(name) {
return (fn, ...args) => wrapAsync(fn)(...args, (err, ...resultArgs) => {
if (typeof console === 'object') {
if (err) {
if (console.error) {
console.error(err);
}
} else if (console[name]) {
resultArgs.forEach(x => console[name](x));
}
}
})
}
/**
* Logs the result of an [`async` function]{@link AsyncFunction} to the
* `console` using `console.dir` to display the properties of the resulting object.
* Only works in Node.js or in browsers that support `console.dir` and
* `console.error` (such as FF and Chrome).
* If multiple arguments are returned from the async function,
* `console.dir` is called on each argument in order.
*
* @name dir
* @static
* @memberOf module:Utils
* @method
* @category Util
* @param {AsyncFunction} function - The function you want to eventually apply
* all arguments to.
* @param {...*} arguments... - Any number of arguments to apply to the function.
* @example
*
* // in a module
* var hello = function(name, callback) {
* setTimeout(function() {
* callback(null, {hello: name});
* }, 1000);
* };
*
* // in the node repl
* node> async.dir(hello, 'world');
* {hello: 'world'}
*/
var dir = consoleFunc('dir');
/**
* The post-check version of [`whilst`]{@link module:ControlFlow.whilst}. To reflect the difference in
* the order of operations, the arguments `test` and `iteratee` are switched.
*
* `doWhilst` is to `whilst` as `do while` is to `while` in plain JavaScript.
*
* @name doWhilst
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.whilst]{@link module:ControlFlow.whilst}
* @category Control Flow
* @param {AsyncFunction} iteratee - A function which is called each time `test`
* passes. Invoked with (callback).
* @param {AsyncFunction} test - asynchronous truth test to perform after each
* execution of `iteratee`. Invoked with (...args, callback), where `...args` are the
* non-error args from the previous callback of `iteratee`.
* @param {Function} [callback] - A callback which is called after the test
* function has failed and repeated execution of `iteratee` has stopped.
* `callback` will be passed an error and any arguments passed to the final
* `iteratee`'s callback. Invoked with (err, [results]);
* @returns {Promise} a promise, if no callback is passed
*/
function doWhilst(iteratee, test, callback) {
callback = onlyOnce(callback);
var _fn = wrapAsync(iteratee);
var _test = wrapAsync(test);
var results;
function next(err, ...args) {
if (err) return callback(err);
if (err === false) return;
results = args;
_test(...args, check);
}
function check(err, truth) {
if (err) return callback(err);
if (err === false) return;
if (!truth) return callback(null, ...results);
_fn(next);
}
return check(null, true);
}
var doWhilst$1 = awaitify(doWhilst, 3);
/**
* Like ['doWhilst']{@link module:ControlFlow.doWhilst}, except the `test` is inverted. Note the
* argument ordering differs from `until`.
*
* @name doUntil
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.doWhilst]{@link module:ControlFlow.doWhilst}
* @category Control Flow
* @param {AsyncFunction} iteratee - An async function which is called each time
* `test` fails. Invoked with (callback).
* @param {AsyncFunction} test - asynchronous truth test to perform after each
* execution of `iteratee`. Invoked with (...args, callback), where `...args` are the
* non-error args from the previous callback of `iteratee`
* @param {Function} [callback] - A callback which is called after the test
* function has passed and repeated execution of `iteratee` has stopped. `callback`
* will be passed an error and any arguments passed to the final `iteratee`'s
* callback. Invoked with (err, [results]);
* @returns {Promise} a promise, if no callback is passed
*/
function doUntil(iteratee, test, callback) {
const _test = wrapAsync(test);
return doWhilst$1(iteratee, (...args) => {
const cb = args.pop();
_test(...args, (err, truth) => cb (err, !truth));
}, callback);
}
function _withoutIndex(iteratee) {
return (value, index, callback) => iteratee(value, callback);
}
/**
* Applies the function `iteratee` to each item in `coll`, in parallel.
* The `iteratee` is called with an item from the list, and a callback for when
* it has finished. If the `iteratee` passes an error to its `callback`, the
* main `callback` (for the `each` function) is immediately called with the
* error.
*
* Note, that since this function applies `iteratee` to each item in parallel,
* there is no guarantee that the iteratee functions will complete in order.
*
* @name each
* @static
* @memberOf module:Collections
* @method
* @alias forEach
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to
* each item in `coll`. Invoked with (item, callback).
* The array index is not passed to the iteratee.
* If you need the index, use `eachOf`.
* @param {Function} [callback] - A callback which is called when all
* `iteratee` functions have finished, or an error occurs. Invoked with (err).
* @returns {Promise} a promise, if a callback is omitted
* @example
*
* // assuming openFiles is an array of file names and saveFile is a function
* // to save the modified contents of that file:
*
* async.each(openFiles, saveFile, function(err){
* // if any of the saves produced an error, err would equal that error
* });
*
* // assuming openFiles is an array of file names
* async.each(openFiles, function(file, callback) {
*
* // Perform operation on file here.
* console.log('Processing file ' + file);
*
* if( file.length > 32 ) {
* console.log('This file name is too long');
* callback('File name too long');
* } else {
* // Do work to process file here
* console.log('File processed');
* callback();
* }
* }, function(err) {
* // if any of the file processing produced an error, err would equal that error
* if( err ) {
* // One of the iterations produced an error.
* // All processing will now stop.
* console.log('A file failed to process');
* } else {
* console.log('All files have been processed successfully');
* }
* });
*/
function eachLimit(coll, iteratee, callback) {
return eachOf$1(coll, _withoutIndex(wrapAsync(iteratee)), callback);
}
var each = awaitify(eachLimit, 3);
/**
* The same as [`each`]{@link module:Collections.each} but runs a maximum of `limit` async operations at a time.
*
* @name eachLimit
* @static
* @memberOf module:Collections
* @method
* @see [async.each]{@link module:Collections.each}
* @alias forEachLimit
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* The array index is not passed to the iteratee.
* If you need the index, use `eachOfLimit`.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called when all
* `iteratee` functions have finished, or an error occurs. Invoked with (err).
* @returns {Promise} a promise, if a callback is omitted
*/
function eachLimit$1(coll, limit, iteratee, callback) {
return eachOfLimit(limit)(coll, _withoutIndex(wrapAsync(iteratee)), callback);
}
var eachLimit$2 = awaitify(eachLimit$1, 4);
/**
* The same as [`each`]{@link module:Collections.each} but runs only a single async operation at a time.
*
* Note, that unlike [`each`]{@link module:Collections.each}, this function applies iteratee to each item
* in series and therefore the iteratee functions will complete in order.
* @name eachSeries
* @static
* @memberOf module:Collections
* @method
* @see [async.each]{@link module:Collections.each}
* @alias forEachSeries
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each
* item in `coll`.
* The array index is not passed to the iteratee.
* If you need the index, use `eachOfSeries`.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called when all
* `iteratee` functions have finished, or an error occurs. Invoked with (err).
* @returns {Promise} a promise, if a callback is omitted
*/
function eachSeries(coll, iteratee, callback) {
return eachLimit$2(coll, 1, iteratee, callback)
}
var eachSeries$1 = awaitify(eachSeries, 3);
/**
* Wrap an async function and ensure it calls its callback on a later tick of
* the event loop. If the function already calls its callback on a next tick,
* no extra deferral is added. This is useful for preventing stack overflows
* (`RangeError: Maximum call stack size exceeded`) and generally keeping
* [Zalgo](http://blog.izs.me/post/59142742143/designing-apis-for-asynchrony)
* contained. ES2017 `async` functions are returned as-is -- they are immune
* to Zalgo's corrupting influences, as they always resolve on a later tick.
*
* @name ensureAsync
* @static
* @memberOf module:Utils
* @method
* @category Util
* @param {AsyncFunction} fn - an async function, one that expects a node-style
* callback as its last argument.
* @returns {AsyncFunction} Returns a wrapped function with the exact same call
* signature as the function passed in.
* @example
*
* function sometimesAsync(arg, callback) {
* if (cache[arg]) {
* return callback(null, cache[arg]); // this would be synchronous!!
* } else {
* doSomeIO(arg, callback); // this IO would be asynchronous
* }
* }
*
* // this has a risk of stack overflows if many results are cached in a row
* async.mapSeries(args, sometimesAsync, done);
*
* // this will defer sometimesAsync's callback if necessary,
* // preventing stack overflows
* async.mapSeries(args, async.ensureAsync(sometimesAsync), done);
*/
function ensureAsync(fn) {
if (isAsync(fn)) return fn;
return function (...args/*, callback*/) {
var callback = args.pop();
var sync = true;
args.push((...innerArgs) => {
if (sync) {
setImmediate$1(() => callback(...innerArgs));
} else {
callback(...innerArgs);
}
});
fn.apply(this, args);
sync = false;
};
}
/**
* Returns `true` if every element in `coll` satisfies an async test. If any
* iteratee call returns `false`, the main `callback` is immediately called.
*
* @name every
* @static
* @memberOf module:Collections
* @method
* @alias all
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async truth test to apply to each item
* in the collection in parallel.
* The iteratee must complete with a boolean result value.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished. Result will be either `true` or `false`
* depending on the values of the async tests. Invoked with (err, result).
* @returns {Promise} a promise, if no callback provided
* @example
*
* async.every(['file1','file2','file3'], function(filePath, callback) {
* fs.access(filePath, function(err) {
* callback(null, !err)
* });
* }, function(err, result) {
* // if result is true then every file exists
* });
*/
function every(coll, iteratee, callback) {
return _createTester(bool => !bool, res => !res)(eachOf$1, coll, iteratee, callback)
}
var every$1 = awaitify(every, 3);
/**
* The same as [`every`]{@link module:Collections.every} but runs a maximum of `limit` async operations at a time.
*
* @name everyLimit
* @static
* @memberOf module:Collections
* @method
* @see [async.every]{@link module:Collections.every}
* @alias allLimit
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async truth test to apply to each item
* in the collection in parallel.
* The iteratee must complete with a boolean result value.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished. Result will be either `true` or `false`
* depending on the values of the async tests. Invoked with (err, result).
* @returns {Promise} a promise, if no callback provided
*/
function everyLimit(coll, limit, iteratee, callback) {
return _createTester(bool => !bool, res => !res)(eachOfLimit(limit), coll, iteratee, callback)
}
var everyLimit$1 = awaitify(everyLimit, 4);
/**
* The same as [`every`]{@link module:Collections.every} but runs only a single async operation at a time.
*
* @name everySeries
* @static
* @memberOf module:Collections
* @method
* @see [async.every]{@link module:Collections.every}
* @alias allSeries
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async truth test to apply to each item
* in the collection in series.
* The iteratee must complete with a boolean result value.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished. Result will be either `true` or `false`
* depending on the values of the async tests. Invoked with (err, result).
* @returns {Promise} a promise, if no callback provided
*/
function everySeries(coll, iteratee, callback) {
return _createTester(bool => !bool, res => !res)(eachOfSeries$1, coll, iteratee, callback)
}
var everySeries$1 = awaitify(everySeries, 3);
function filterArray(eachfn, arr, iteratee, callback) {
var truthValues = new Array(arr.length);
eachfn(arr, (x, index, iterCb) => {
iteratee(x, (err, v) => {
truthValues[index] = !!v;
iterCb(err);
});
}, err => {
if (err) return callback(err);
var results = [];
for (var i = 0; i < arr.length; i++) {
if (truthValues[i]) results.push(arr[i]);
}
callback(null, results);
});
}
function filterGeneric(eachfn, coll, iteratee, callback) {
var results = [];
eachfn(coll, (x, index, iterCb) => {
iteratee(x, (err, v) => {
if (err) return iterCb(err);
if (v) {
results.push({index, value: x});
}
iterCb(err);
});
}, err => {
if (err) return callback(err);
callback(null, results
.sort((a, b) => a.index - b.index)
.map(v => v.value));
});
}
function _filter(eachfn, coll, iteratee, callback) {
var filter = isArrayLike(coll) ? filterArray : filterGeneric;
return filter(eachfn, coll, wrapAsync(iteratee), callback);
}
/**
* Returns a new array of all the values in `coll` which pass an async truth
* test. This operation is performed in parallel, but the results array will be
* in the same order as the original.
*
* @name filter
* @static
* @memberOf module:Collections
* @method
* @alias select
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {Function} iteratee - A truth test to apply to each item in `coll`.
* The `iteratee` is passed a `callback(err, truthValue)`, which must be called
* with a boolean argument once it has completed. Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished. Invoked with (err, results).
* @returns {Promise} a promise, if no callback provided
* @example
*
* async.filter(['file1','file2','file3'], function(filePath, callback) {
* fs.access(filePath, function(err) {
* callback(null, !err)
* });
* }, function(err, results) {
* // results now equals an array of the existing files
* });
*/
function filter (coll, iteratee, callback) {
return _filter(eachOf$1, coll, iteratee, callback)
}
var filter$1 = awaitify(filter, 3);
/**
* The same as [`filter`]{@link module:Collections.filter} but runs a maximum of `limit` async operations at a
* time.
*
* @name filterLimit
* @static
* @memberOf module:Collections
* @method
* @see [async.filter]{@link module:Collections.filter}
* @alias selectLimit
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {Function} iteratee - A truth test to apply to each item in `coll`.
* The `iteratee` is passed a `callback(err, truthValue)`, which must be called
* with a boolean argument once it has completed. Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished. Invoked with (err, results).
* @returns {Promise} a promise, if no callback provided
*/
function filterLimit (coll, limit, iteratee, callback) {
return _filter(eachOfLimit(limit), coll, iteratee, callback)
}
var filterLimit$1 = awaitify(filterLimit, 4);
/**
* The same as [`filter`]{@link module:Collections.filter} but runs only a single async operation at a time.
*
* @name filterSeries
* @static
* @memberOf module:Collections
* @method
* @see [async.filter]{@link module:Collections.filter}
* @alias selectSeries
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {Function} iteratee - A truth test to apply to each item in `coll`.
* The `iteratee` is passed a `callback(err, truthValue)`, which must be called
* with a boolean argument once it has completed. Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished. Invoked with (err, results)
* @returns {Promise} a promise, if no callback provided
*/
function filterSeries (coll, iteratee, callback) {
return _filter(eachOfSeries$1, coll, iteratee, callback)
}
var filterSeries$1 = awaitify(filterSeries, 3);
/**
* Calls the asynchronous function `fn` with a callback parameter that allows it
* to call itself again, in series, indefinitely.
* If an error is passed to the callback then `errback` is called with the
* error, and execution stops, otherwise it will never be called.
*
* @name forever
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {AsyncFunction} fn - an async function to call repeatedly.
* Invoked with (next).
* @param {Function} [errback] - when `fn` passes an error to it's callback,
* this function will be called, and execution stops. Invoked with (err).
* @returns {Promise} a promise that rejects if an error occurs and an errback
* is not passed
* @example
*
* async.forever(
* function(next) {
* // next is suitable for passing to things that need a callback(err [, whatever]);
* // it will result in this function being called again.
* },
* function(err) {
* // if next is called with a value in its first parameter, it will appear
* // in here as 'err', and execution will stop.
* }
* );
*/
function forever(fn, errback) {
var done = onlyOnce(errback);
var task = wrapAsync(ensureAsync(fn));
function next(err) {
if (err) return done(err);
if (err === false) return;
task(next);
}
return next();
}
var forever$1 = awaitify(forever, 2);
/**
* The same as [`groupBy`]{@link module:Collections.groupBy} but runs a maximum of `limit` async operations at a time.
*
* @name groupByLimit
* @static
* @memberOf module:Collections
* @method
* @see [async.groupBy]{@link module:Collections.groupBy}
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* The iteratee should complete with a `key` to group the value under.
* Invoked with (value, callback).
* @param {Function} [callback] - A callback which is called when all `iteratee`
* functions have finished, or an error occurs. Result is an `Object` whoses
* properties are arrays of values which returned the corresponding key.
* @returns {Promise} a promise, if no callback is passed
*/
function groupByLimit(coll, limit, iteratee, callback) {
var _iteratee = wrapAsync(iteratee);
return mapLimit$1(coll, limit, (val, iterCb) => {
_iteratee(val, (err, key) => {
if (err) return iterCb(err);
return iterCb(err, {key, val});
});
}, (err, mapResults) => {
var result = {};
// from MDN, handle object having an `hasOwnProperty` prop
var {hasOwnProperty} = Object.prototype;
for (var i = 0; i < mapResults.length; i++) {
if (mapResults[i]) {
var {key} = mapResults[i];
var {val} = mapResults[i];
if (hasOwnProperty.call(result, key)) {
result[key].push(val);
} else {
result[key] = [val];
}
}
}
return callback(err, result);
});
}
var groupByLimit$1 = awaitify(groupByLimit, 4);
/**
* Returns a new object, where each value corresponds to an array of items, from
* `coll`, that returned the corresponding key. That is, the keys of the object
* correspond to the values passed to the `iteratee` callback.
*
* Note: Since this function applies the `iteratee` to each item in parallel,
* there is no guarantee that the `iteratee` functions will complete in order.
* However, the values for each key in the `result` will be in the same order as
* the original `coll`. For Objects, the values will roughly be in the order of
* the original Objects' keys (but this can vary across JavaScript engines).
*
* @name groupBy
* @static
* @memberOf module:Collections
* @method
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* The iteratee should complete with a `key` to group the value under.
* Invoked with (value, callback).
* @param {Function} [callback] - A callback which is called when all `iteratee`
* functions have finished, or an error occurs. Result is an `Object` whoses
* properties are arrays of values which returned the corresponding key.
* @returns {Promise} a promise, if no callback is passed
* @example
*
* async.groupBy(['userId1', 'userId2', 'userId3'], function(userId, callback) {
* db.findById(userId, function(err, user) {
* if (err) return callback(err);
* return callback(null, user.age);
* });
* }, function(err, result) {
* // result is object containing the userIds grouped by age
* // e.g. { 30: ['userId1', 'userId3'], 42: ['userId2']};
* });
*/
function groupBy (coll, iteratee, callback) {
return groupByLimit$1(coll, Infinity, iteratee, callback)
}
/**
* The same as [`groupBy`]{@link module:Collections.groupBy} but runs only a single async operation at a time.
*
* @name groupBySeries
* @static
* @memberOf module:Collections
* @method
* @see [async.groupBy]{@link module:Collections.groupBy}
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* The iteratee should complete with a `key` to group the value under.
* Invoked with (value, callback).
* @param {Function} [callback] - A callback which is called when all `iteratee`
* functions have finished, or an error occurs. Result is an `Object` whoses
* properties are arrays of values which returned the corresponding key.
* @returns {Promise} a promise, if no callback is passed
*/
function groupBySeries (coll, iteratee, callback) {
return groupByLimit$1(coll, 1, iteratee, callback)
}
/**
* Logs the result of an `async` function to the `console`. Only works in
* Node.js or in browsers that support `console.log` and `console.error` (such
* as FF and Chrome). If multiple arguments are returned from the async
* function, `console.log` is called on each argument in order.
*
* @name log
* @static
* @memberOf module:Utils
* @method
* @category Util
* @param {AsyncFunction} function - The function you want to eventually apply
* all arguments to.
* @param {...*} arguments... - Any number of arguments to apply to the function.
* @example
*
* // in a module
* var hello = function(name, callback) {
* setTimeout(function() {
* callback(null, 'hello ' + name);
* }, 1000);
* };
*
* // in the node repl
* node> async.log(hello, 'world');
* 'hello world'
*/
var log = consoleFunc('log');
/**
* The same as [`mapValues`]{@link module:Collections.mapValues} but runs a maximum of `limit` async operations at a
* time.
*
* @name mapValuesLimit
* @static
* @memberOf module:Collections
* @method
* @see [async.mapValues]{@link module:Collections.mapValues}
* @category Collection
* @param {Object} obj - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - A function to apply to each value and key
* in `coll`.
* The iteratee should complete with the transformed value as its result.
* Invoked with (value, key, callback).
* @param {Function} [callback] - A callback which is called when all `iteratee`
* functions have finished, or an error occurs. `result` is a new object consisting
* of each key from `obj`, with each transformed value on the right-hand side.
* Invoked with (err, result).
* @returns {Promise} a promise, if no callback is passed
*/
function mapValuesLimit(obj, limit, iteratee, callback) {
callback = once(callback);
var newObj = {};
var _iteratee = wrapAsync(iteratee);
return eachOfLimit(limit)(obj, (val, key, next) => {
_iteratee(val, key, (err, result) => {
if (err) return next(err);
newObj[key] = result;
next(err);
});
}, err => callback(err, newObj));
}
var mapValuesLimit$1 = awaitify(mapValuesLimit, 4);
/**
* A relative of [`map`]{@link module:Collections.map}, designed for use with objects.
*
* Produces a new Object by mapping each value of `obj` through the `iteratee`
* function. The `iteratee` is called each `value` and `key` from `obj` and a
* callback for when it has finished processing. Each of these callbacks takes
* two arguments: an `error`, and the transformed item from `obj`. If `iteratee`
* passes an error to its callback, the main `callback` (for the `mapValues`
* function) is immediately called with the error.
*
* Note, the order of the keys in the result is not guaranteed. The keys will
* be roughly in the order they complete, (but this is very engine-specific)
*
* @name mapValues
* @static
* @memberOf module:Collections
* @method
* @category Collection
* @param {Object} obj - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each value and key
* in `coll`.
* The iteratee should complete with the transformed value as its result.
* Invoked with (value, key, callback).
* @param {Function} [callback] - A callback which is called when all `iteratee`
* functions have finished, or an error occurs. `result` is a new object consisting
* of each key from `obj`, with each transformed value on the right-hand side.
* Invoked with (err, result).
* @returns {Promise} a promise, if no callback is passed
* @example
*
* async.mapValues({
* f1: 'file1',
* f2: 'file2',
* f3: 'file3'
* }, function (file, key, callback) {
* fs.stat(file, callback);
* }, function(err, result) {
* // result is now a map of stats for each file, e.g.
* // {
* // f1: [stats for file1],
* // f2: [stats for file2],
* // f3: [stats for file3]
* // }
* });
*/
function mapValues(obj, iteratee, callback) {
return mapValuesLimit$1(obj, Infinity, iteratee, callback)
}
/**
* The same as [`mapValues`]{@link module:Collections.mapValues} but runs only a single async operation at a time.
*
* @name mapValuesSeries
* @static
* @memberOf module:Collections
* @method
* @see [async.mapValues]{@link module:Collections.mapValues}
* @category Collection
* @param {Object} obj - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each value and key
* in `coll`.
* The iteratee should complete with the transformed value as its result.
* Invoked with (value, key, callback).
* @param {Function} [callback] - A callback which is called when all `iteratee`
* functions have finished, or an error occurs. `result` is a new object consisting
* of each key from `obj`, with each transformed value on the right-hand side.
* Invoked with (err, result).
* @returns {Promise} a promise, if no callback is passed
*/
function mapValuesSeries(obj, iteratee, callback) {
return mapValuesLimit$1(obj, 1, iteratee, callback)
}
/**
* Caches the results of an async function. When creating a hash to store
* function results against, the callback is omitted from the hash and an
* optional hash function can be used.
*
* **Note: if the async function errs, the result will not be cached and
* subsequent calls will call the wrapped function.**
*
* If no hash function is specified, the first argument is used as a hash key,
* which may work reasonably if it is a string or a data type that converts to a
* distinct string. Note that objects and arrays will not behave reasonably.
* Neither will cases where the other arguments are significant. In such cases,
* specify your own hash function.
*
* The cache of results is exposed as the `memo` property of the function
* returned by `memoize`.
*
* @name memoize
* @static
* @memberOf module:Utils
* @method
* @category Util
* @param {AsyncFunction} fn - The async function to proxy and cache results from.
* @param {Function} hasher - An optional function for generating a custom hash
* for storing results. It has all the arguments applied to it apart from the
* callback, and must be synchronous.
* @returns {AsyncFunction} a memoized version of `fn`
* @example
*
* var slow_fn = function(name, callback) {
* // do something
* callback(null, result);
* };
* var fn = async.memoize(slow_fn);
*
* // fn can now be used as if it were slow_fn
* fn('some name', function() {
* // callback
* });
*/
function memoize(fn, hasher = v => v) {
var memo = Object.create(null);
var queues = Object.create(null);
var _fn = wrapAsync(fn);
var memoized = initialParams((args, callback) => {
var key = hasher(...args);
if (key in memo) {
setImmediate$1(() => callback(null, ...memo[key]));
} else if (key in queues) {
queues[key].push(callback);
} else {
queues[key] = [callback];
_fn(...args, (err, ...resultArgs) => {
// #1465 don't memoize if an error occurred
if (!err) {
memo[key] = resultArgs;
}
var q = queues[key];
delete queues[key];
for (var i = 0, l = q.length; i < l; i++) {
q[i](err, ...resultArgs);
}
});
}
});
memoized.memo = memo;
memoized.unmemoized = fn;
return memoized;
}
/**
* Calls `callback` on a later loop around the event loop. In Node.js this just
* calls `process.nextTick`. In the browser it will use `setImmediate` if
* available, otherwise `setTimeout(callback, 0)`, which means other higher
* priority events may precede the execution of `callback`.
*
* This is used internally for browser-compatibility purposes.
*
* @name nextTick
* @static
* @memberOf module:Utils
* @method
* @see [async.setImmediate]{@link module:Utils.setImmediate}
* @category Util
* @param {Function} callback - The function to call on a later loop around
* the event loop. Invoked with (args...).
* @param {...*} args... - any number of additional arguments to pass to the
* callback on the next tick.
* @example
*
* var call_order = [];
* async.nextTick(function() {
* call_order.push('two');
* // call_order now equals ['one','two']
* });
* call_order.push('one');
*
* async.setImmediate(function (a, b, c) {
* // a, b, and c equal 1, 2, and 3
* }, 1, 2, 3);
*/
var _defer$1;
if (hasNextTick) {
_defer$1 = process.nextTick;
} else if (hasSetImmediate) {
_defer$1 = setImmediate;
} else {
_defer$1 = fallback;
}
var nextTick = wrap(_defer$1);
var _parallel = awaitify((eachfn, tasks, callback) => {
var results = isArrayLike(tasks) ? [] : {};
eachfn(tasks, (task, key, taskCb) => {
wrapAsync(task)((err, ...result) => {
if (result.length < 2) {
[result] = result;
}
results[key] = result;
taskCb(err);
});
}, err => callback(err, results));
}, 3);
/**
* Run the `tasks` collection of functions in parallel, without waiting until
* the previous function has completed. If any of the functions pass an error to
* its callback, the main `callback` is immediately called with the value of the
* error. Once the `tasks` have completed, the results are passed to the final
* `callback` as an array.
*
* **Note:** `parallel` is about kicking-off I/O tasks in parallel, not about
* parallel execution of code. If your tasks do not use any timers or perform
* any I/O, they will actually be executed in series. Any synchronous setup
* sections for each task will happen one after the other. JavaScript remains
* single-threaded.
*
* **Hint:** Use [`reflect`]{@link module:Utils.reflect} to continue the
* execution of other tasks when a task fails.
*
* It is also possible to use an object instead of an array. Each property will
* be run as a function and the results will be passed to the final `callback`
* as an object instead of an array. This can be a more readable way of handling
* results from {@link async.parallel}.
*
* @name parallel
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {Array|Iterable|AsyncIterable|Object} tasks - A collection of
* [async functions]{@link AsyncFunction} to run.
* Each async function can complete with any number of optional `result` values.
* @param {Function} [callback] - An optional callback to run once all the
* functions have completed successfully. This function gets a results array
* (or object) containing all the result arguments passed to the task callbacks.
* Invoked with (err, results).
* @returns {Promise} a promise, if a callback is not passed
*
* @example
* async.parallel([
* function(callback) {
* setTimeout(function() {
* callback(null, 'one');
* }, 200);
* },
* function(callback) {
* setTimeout(function() {
* callback(null, 'two');
* }, 100);
* }
* ],
* // optional callback
* function(err, results) {
* // the results array will equal ['one','two'] even though
* // the second function had a shorter timeout.
* });
*
* // an example using an object instead of an array
* async.parallel({
* one: function(callback) {
* setTimeout(function() {
* callback(null, 1);
* }, 200);
* },
* two: function(callback) {
* setTimeout(function() {
* callback(null, 2);
* }, 100);
* }
* }, function(err, results) {
* // results is now equals to: {one: 1, two: 2}
* });
*/
function parallel(tasks, callback) {
return _parallel(eachOf$1, tasks, callback);
}
/**
* The same as [`parallel`]{@link module:ControlFlow.parallel} but runs a maximum of `limit` async operations at a
* time.
*
* @name parallelLimit
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.parallel]{@link module:ControlFlow.parallel}
* @category Control Flow
* @param {Array|Iterable|AsyncIterable|Object} tasks - A collection of
* [async functions]{@link AsyncFunction} to run.
* Each async function can complete with any number of optional `result` values.
* @param {number} limit - The maximum number of async operations at a time.
* @param {Function} [callback] - An optional callback to run once all the
* functions have completed successfully. This function gets a results array
* (or object) containing all the result arguments passed to the task callbacks.
* Invoked with (err, results).
* @returns {Promise} a promise, if a callback is not passed
*/
function parallelLimit(tasks, limit, callback) {
return _parallel(eachOfLimit(limit), tasks, callback);
}
/**
* A queue of tasks for the worker function to complete.
* @typedef {Iterable} QueueObject
* @memberOf module:ControlFlow
* @property {Function} length - a function returning the number of items
* waiting to be processed. Invoke with `queue.length()`.
* @property {boolean} started - a boolean indicating whether or not any
* items have been pushed and processed by the queue.
* @property {Function} running - a function returning the number of items
* currently being processed. Invoke with `queue.running()`.
* @property {Function} workersList - a function returning the array of items
* currently being processed. Invoke with `queue.workersList()`.
* @property {Function} idle - a function returning false if there are items
* waiting or being processed, or true if not. Invoke with `queue.idle()`.
* @property {number} concurrency - an integer for determining how many `worker`
* functions should be run in parallel. This property can be changed after a
* `queue` is created to alter the concurrency on-the-fly.
* @property {number} payload - an integer that specifies how many items are
* passed to the worker function at a time. only applies if this is a
* [cargo]{@link module:ControlFlow.cargo} object
* @property {AsyncFunction} push - add a new task to the `queue`. Calls `callback`
* once the `worker` has finished processing the task. Instead of a single task,
* a `tasks` array can be submitted. The respective callback is used for every
* task in the list. Invoke with `queue.push(task, [callback])`,
* @property {AsyncFunction} unshift - add a new task to the front of the `queue`.
* Invoke with `queue.unshift(task, [callback])`.
* @property {AsyncFunction} pushAsync - the same as `q.push`, except this returns
* a promise that rejects if an error occurs.
* @property {AsyncFunction} unshirtAsync - the same as `q.unshift`, except this returns
* a promise that rejects if an error occurs.
* @property {Function} remove - remove items from the queue that match a test
* function. The test function will be passed an object with a `data` property,
* and a `priority` property, if this is a
* [priorityQueue]{@link module:ControlFlow.priorityQueue} object.
* Invoked with `queue.remove(testFn)`, where `testFn` is of the form
* `function ({data, priority}) {}` and returns a Boolean.
* @property {Function} saturated - a function that sets a callback that is
* called when the number of running workers hits the `concurrency` limit, and
* further tasks will be queued. If the callback is omitted, `q.saturated()`
* returns a promise for the next occurrence.
* @property {Function} unsaturated - a function that sets a callback that is
* called when the number of running workers is less than the `concurrency` &
* `buffer` limits, and further tasks will not be queued. If the callback is
* omitted, `q.unsaturated()` returns a promise for the next occurrence.
* @property {number} buffer - A minimum threshold buffer in order to say that
* the `queue` is `unsaturated`.
* @property {Function} empty - a function that sets a callback that is called
* when the last item from the `queue` is given to a `worker`. If the callback
* is omitted, `q.empty()` returns a promise for the next occurrence.
* @property {Function} drain - a function that sets a callback that is called
* when the last item from the `queue` has returned from the `worker`. If the
* callback is omitted, `q.drain()` returns a promise for the next occurrence.
* @property {Function} error - a function that sets a callback that is called
* when a task errors. Has the signature `function(error, task)`. If the
* callback is omitted, `error()` returns a promise that rejects on the next
* error.
* @property {boolean} paused - a boolean for determining whether the queue is
* in a paused state.
* @property {Function} pause - a function that pauses the processing of tasks
* until `resume()` is called. Invoke with `queue.pause()`.
* @property {Function} resume - a function that resumes the processing of
* queued tasks when the queue is paused. Invoke with `queue.resume()`.
* @property {Function} kill - a function that removes the `drain` callback and
* empties remaining tasks from the queue forcing it to go idle. No more tasks
* should be pushed to the queue after calling this function. Invoke with `queue.kill()`.
*
* @example
* const q = aync.queue(worker, 2)
* q.push(item1)
* q.push(item2)
* q.push(item3)
* // queues are iterable, spread into an array to inspect
* const items = [...q] // [item1, item2, item3]
* // or use for of
* for (let item of q) {
* console.log(item)
* }
*
* q.drain(() => {
* console.log('all done')
* })
* // or
* await q.drain()
*/
/**
* Creates a `queue` object with the specified `concurrency`. Tasks added to the
* `queue` are processed in parallel (up to the `concurrency` limit). If all
* `worker`s are in progress, the task is queued until one becomes available.
* Once a `worker` completes a `task`, that `task`'s callback is called.
*
* @name queue
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {AsyncFunction} worker - An async function for processing a queued task.
* If you want to handle errors from an individual task, pass a callback to
* `q.push()`. Invoked with (task, callback).
* @param {number} [concurrency=1] - An `integer` for determining how many
* `worker` functions should be run in parallel. If omitted, the concurrency
* defaults to `1`. If the concurrency is `0`, an error is thrown.
* @returns {module:ControlFlow.QueueObject} A queue object to manage the tasks. Callbacks can be
* attached as certain properties to listen for specific events during the
* lifecycle of the queue.
* @example
*
* // create a queue object with concurrency 2
* var q = async.queue(function(task, callback) {
* console.log('hello ' + task.name);
* callback();
* }, 2);
*
* // assign a callback
* q.drain(function() {
* console.log('all items have been processed');
* });
* // or await the end
* await q.drain()
*
* // assign an error callback
* q.error(function(err, task) {
* console.error('task experienced an error');
* });
*
* // add some items to the queue
* q.push({name: 'foo'}, function(err) {
* console.log('finished processing foo');
* });
* // callback is optional
* q.push({name: 'bar'});
*
* // add some items to the queue (batch-wise)
* q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) {
* console.log('finished processing item');
* });
*
* // add some items to the front of the queue
* q.unshift({name: 'bar'}, function (err) {
* console.log('finished processing bar');
* });
*/
function queue$1 (worker, concurrency) {
var _worker = wrapAsync(worker);
return queue((items, cb) => {
_worker(items[0], cb);
}, concurrency, 1);
}
// Binary min-heap implementation used for priority queue.
// Implementation is stable, i.e. push time is considered for equal priorities
class Heap {
constructor() {
this.heap = [];
this.pushCount = Number.MIN_SAFE_INTEGER;
}
get length() {
return this.heap.length;
}
empty () {
this.heap = [];
return this;
}
percUp(index) {
let p;
while (index > 0 && smaller(this.heap[index], this.heap[p=parent(index)])) {
let t = this.heap[index];
this.heap[index] = this.heap[p];
this.heap[p] = t;
index = p;
}
}
percDown(index) {
let l;
while ((l=leftChi(index)) < this.heap.length) {
if (l+1 < this.heap.length && smaller(this.heap[l+1], this.heap[l])) {
l = l+1;
}
if (smaller(this.heap[index], this.heap[l])) {
break;
}
let t = this.heap[index];
this.heap[index] = this.heap[l];
this.heap[l] = t;
index = l;
}
}
push(node) {
node.pushCount = ++this.pushCount;
this.heap.push(node);
this.percUp(this.heap.length-1);
}
unshift(node) {
return this.heap.push(node);
}
shift() {
let [top] = this.heap;
this.heap[0] = this.heap[this.heap.length-1];
this.heap.pop();
this.percDown(0);
return top;
}
toArray() {
return [...this];
}
*[Symbol.iterator] () {
for (let i = 0; i < this.heap.length; i++) {
yield this.heap[i].data;
}
}
remove (testFn) {
let j = 0;
for (let i = 0; i < this.heap.length; i++) {
if (!testFn(this.heap[i])) {
this.heap[j] = this.heap[i];
j++;
}
}
this.heap.splice(j);
for (let i = parent(this.heap.length-1); i >= 0; i--) {
this.percDown(i);
}
return this;
}
}
function leftChi(i) {
return (i<<1)+1;
}
function parent(i) {
return ((i+1)>>1)-1;
}
function smaller(x, y) {
if (x.priority !== y.priority) {
return x.priority < y.priority;
}
else {
return x.pushCount < y.pushCount;
}
}
/**
* The same as [async.queue]{@link module:ControlFlow.queue} only tasks are assigned a priority and
* completed in ascending priority order.
*
* @name priorityQueue
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.queue]{@link module:ControlFlow.queue}
* @category Control Flow
* @param {AsyncFunction} worker - An async function for processing a queued task.
* If you want to handle errors from an individual task, pass a callback to
* `q.push()`.
* Invoked with (task, callback).
* @param {number} concurrency - An `integer` for determining how many `worker`
* functions should be run in parallel. If omitted, the concurrency defaults to
* `1`. If the concurrency is `0`, an error is thrown.
* @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are two
* differences between `queue` and `priorityQueue` objects:
* * `push(task, priority, [callback])` - `priority` should be a number. If an
* array of `tasks` is given, all tasks will be assigned the same priority.
* * The `unshift` method was removed.
*/
function priorityQueue(worker, concurrency) {
// Start with a normal queue
var q = queue$1(worker, concurrency);
q._tasks = new Heap();
// Override push to accept second parameter representing priority
q.push = function(data, priority = 0, callback = () => {}) {
if (typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
q.started = true;
if (!Array.isArray(data)) {
data = [data];
}
if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
return setImmediate$1(() => q.drain());
}
for (var i = 0, l = data.length; i < l; i++) {
var item = {
data: data[i],
priority,
callback
};
q._tasks.push(item);
}
setImmediate$1(q.process);
};
// Remove unshift function
delete q.unshift;
return q;
}
/**
* Runs the `tasks` array of functions in parallel, without waiting until the
* previous function has completed. Once any of the `tasks` complete or pass an
* error to its callback, the main `callback` is immediately called. It's
* equivalent to `Promise.race()`.
*
* @name race
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {Array} tasks - An array containing [async functions]{@link AsyncFunction}
* to run. Each function can complete with an optional `result` value.
* @param {Function} callback - A callback to run once any of the functions have
* completed. This function gets an error or result from the first function that
* completed. Invoked with (err, result).
* @returns undefined
* @example
*
* async.race([
* function(callback) {
* setTimeout(function() {
* callback(null, 'one');
* }, 200);
* },
* function(callback) {
* setTimeout(function() {
* callback(null, 'two');
* }, 100);
* }
* ],
* // main callback
* function(err, result) {
* // the result will be equal to 'two' as it finishes earlier
* });
*/
function race(tasks, callback) {
callback = once(callback);
if (!Array.isArray(tasks)) return callback(new TypeError('First argument to race must be an array of functions'));
if (!tasks.length) return callback();
for (var i = 0, l = tasks.length; i < l; i++) {
wrapAsync(tasks[i])(callback);
}
}
var race$1 = awaitify(race, 2);
/**
* Same as [`reduce`]{@link module:Collections.reduce}, only operates on `array` in reverse order.
*
* @name reduceRight
* @static
* @memberOf module:Collections
* @method
* @see [async.reduce]{@link module:Collections.reduce}
* @alias foldr
* @category Collection
* @param {Array} array - A collection to iterate over.
* @param {*} memo - The initial state of the reduction.
* @param {AsyncFunction} iteratee - A function applied to each item in the
* array to produce the next step in the reduction.
* The `iteratee` should complete with the next state of the reduction.
* If the iteratee complete with an error, the reduction is stopped and the
* main `callback` is immediately called with the error.
* Invoked with (memo, item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished. Result is the reduced value. Invoked with
* (err, result).
* @returns {Promise} a promise, if no callback is passed
*/
function reduceRight (array, memo, iteratee, callback) {
var reversed = [...array].reverse();
return reduce$1(reversed, memo, iteratee, callback);
}
/**
* Wraps the async function in another function that always completes with a
* result object, even when it errors.
*
* The result object has either the property `error` or `value`.
*
* @name reflect
* @static
* @memberOf module:Utils
* @method
* @category Util
* @param {AsyncFunction} fn - The async function you want to wrap
* @returns {Function} - A function that always passes null to it's callback as
* the error. The second argument to the callback will be an `object` with
* either an `error` or a `value` property.
* @example
*
* async.parallel([
* async.reflect(function(callback) {
* // do some stuff ...
* callback(null, 'one');
* }),
* async.reflect(function(callback) {
* // do some more stuff but error ...
* callback('bad stuff happened');
* }),
* async.reflect(function(callback) {
* // do some more stuff ...
* callback(null, 'two');
* })
* ],
* // optional callback
* function(err, results) {
* // values
* // results[0].value = 'one'
* // results[1].error = 'bad stuff happened'
* // results[2].value = 'two'
* });
*/
function reflect(fn) {
var _fn = wrapAsync(fn);
return initialParams(function reflectOn(args, reflectCallback) {
args.push((error, ...cbArgs) => {
let retVal = {};
if (error) {
retVal.error = error;
}
if (cbArgs.length > 0){
var value = cbArgs;
if (cbArgs.length <= 1) {
[value] = cbArgs;
}
retVal.value = value;
}
reflectCallback(null, retVal);
});
return _fn.apply(this, args);
});
}
/**
* A helper function that wraps an array or an object of functions with `reflect`.
*
* @name reflectAll
* @static
* @memberOf module:Utils
* @method
* @see [async.reflect]{@link module:Utils.reflect}
* @category Util
* @param {Array|Object|Iterable} tasks - The collection of
* [async functions]{@link AsyncFunction} to wrap in `async.reflect`.
* @returns {Array} Returns an array of async functions, each wrapped in
* `async.reflect`
* @example
*
* let tasks = [
* function(callback) {
* setTimeout(function() {
* callback(null, 'one');
* }, 200);
* },
* function(callback) {
* // do some more stuff but error ...
* callback(new Error('bad stuff happened'));
* },
* function(callback) {
* setTimeout(function() {
* callback(null, 'two');
* }, 100);
* }
* ];
*
* async.parallel(async.reflectAll(tasks),
* // optional callback
* function(err, results) {
* // values
* // results[0].value = 'one'
* // results[1].error = Error('bad stuff happened')
* // results[2].value = 'two'
* });
*
* // an example using an object instead of an array
* let tasks = {
* one: function(callback) {
* setTimeout(function() {
* callback(null, 'one');
* }, 200);
* },
* two: function(callback) {
* callback('two');
* },
* three: function(callback) {
* setTimeout(function() {
* callback(null, 'three');
* }, 100);
* }
* };
*
* async.parallel(async.reflectAll(tasks),
* // optional callback
* function(err, results) {
* // values
* // results.one.value = 'one'
* // results.two.error = 'two'
* // results.three.value = 'three'
* });
*/
function reflectAll(tasks) {
var results;
if (Array.isArray(tasks)) {
results = tasks.map(reflect);
} else {
results = {};
Object.keys(tasks).forEach(key => {
results[key] = reflect.call(this, tasks[key]);
});
}
return results;
}
function reject(eachfn, arr, _iteratee, callback) {
const iteratee = wrapAsync(_iteratee);
return _filter(eachfn, arr, (value, cb) => {
iteratee(value, (err, v) => {
cb(err, !v);
});
}, callback);
}
/**
* The opposite of [`filter`]{@link module:Collections.filter}. Removes values that pass an `async` truth test.
*
* @name reject
* @static
* @memberOf module:Collections
* @method
* @see [async.filter]{@link module:Collections.filter}
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {Function} iteratee - An async truth test to apply to each item in
* `coll`.
* The should complete with a boolean value as its `result`.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished. Invoked with (err, results).
* @returns {Promise} a promise, if no callback is passed
* @example
*
* async.reject(['file1','file2','file3'], function(filePath, callback) {
* fs.access(filePath, function(err) {
* callback(null, !err)
* });
* }, function(err, results) {
* // results now equals an array of missing files
* createFiles(results);
* });
*/
function reject$1 (coll, iteratee, callback) {
return reject(eachOf$1, coll, iteratee, callback)
}
var reject$2 = awaitify(reject$1, 3);
/**
* The same as [`reject`]{@link module:Collections.reject} but runs a maximum of `limit` async operations at a
* time.
*
* @name rejectLimit
* @static
* @memberOf module:Collections
* @method
* @see [async.reject]{@link module:Collections.reject}
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {Function} iteratee - An async truth test to apply to each item in
* `coll`.
* The should complete with a boolean value as its `result`.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished. Invoked with (err, results).
* @returns {Promise} a promise, if no callback is passed
*/
function rejectLimit (coll, limit, iteratee, callback) {
return reject(eachOfLimit(limit), coll, iteratee, callback)
}
var rejectLimit$1 = awaitify(rejectLimit, 4);
/**
* The same as [`reject`]{@link module:Collections.reject} but runs only a single async operation at a time.
*
* @name rejectSeries
* @static
* @memberOf module:Collections
* @method
* @see [async.reject]{@link module:Collections.reject}
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {Function} iteratee - An async truth test to apply to each item in
* `coll`.
* The should complete with a boolean value as its `result`.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished. Invoked with (err, results).
* @returns {Promise} a promise, if no callback is passed
*/
function rejectSeries (coll, iteratee, callback) {
return reject(eachOfSeries$1, coll, iteratee, callback)
}
var rejectSeries$1 = awaitify(rejectSeries, 3);
function constant$1(value) {
return function () {
return value;
}
}
/**
* Attempts to get a successful response from `task` no more than `times` times
* before returning an error. If the task is successful, the `callback` will be
* passed the result of the successful task. If all attempts fail, the callback
* will be passed the error and result (if any) of the final attempt.
*
* @name retry
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @see [async.retryable]{@link module:ControlFlow.retryable}
* @param {Object|number} [opts = {times: 5, interval: 0}| 5] - Can be either an
* object with `times` and `interval` or a number.
* * `times` - The number of attempts to make before giving up. The default
* is `5`.
* * `interval` - The time to wait between retries, in milliseconds. The
* default is `0`. The interval may also be specified as a function of the
* retry count (see example).
* * `errorFilter` - An optional synchronous function that is invoked on
* erroneous result. If it returns `true` the retry attempts will continue;
* if the function returns `false` the retry flow is aborted with the current
* attempt's error and result being returned to the final callback.
* Invoked with (err).
* * If `opts` is a number, the number specifies the number of times to retry,
* with the default interval of `0`.
* @param {AsyncFunction} task - An async function to retry.
* Invoked with (callback).
* @param {Function} [callback] - An optional callback which is called when the
* task has succeeded, or after the final failed attempt. It receives the `err`
* and `result` arguments of the last attempt at completing the `task`. Invoked
* with (err, results).
* @returns {Promise} a promise if no callback provided
*
* @example
*
* // The `retry` function can be used as a stand-alone control flow by passing
* // a callback, as shown below:
*
* // try calling apiMethod 3 times
* async.retry(3, apiMethod, function(err, result) {
* // do something with the result
* });
*
* // try calling apiMethod 3 times, waiting 200 ms between each retry
* async.retry({times: 3, interval: 200}, apiMethod, function(err, result) {
* // do something with the result
* });
*
* // try calling apiMethod 10 times with exponential backoff
* // (i.e. intervals of 100, 200, 400, 800, 1600, ... milliseconds)
* async.retry({
* times: 10,
* interval: function(retryCount) {
* return 50 * Math.pow(2, retryCount);
* }
* }, apiMethod, function(err, result) {
* // do something with the result
* });
*
* // try calling apiMethod the default 5 times no delay between each retry
* async.retry(apiMethod, function(err, result) {
* // do something with the result
* });
*
* // try calling apiMethod only when error condition satisfies, all other
* // errors will abort the retry control flow and return to final callback
* async.retry({
* errorFilter: function(err) {
* return err.message === 'Temporary error'; // only retry on a specific error
* }
* }, apiMethod, function(err, result) {
* // do something with the result
* });
*
* // to retry individual methods that are not as reliable within other
* // control flow functions, use the `retryable` wrapper:
* async.auto({
* users: api.getUsers.bind(api),
* payments: async.retryable(3, api.getPayments.bind(api))
* }, function(err, results) {
* // do something with the results
* });
*
*/
const DEFAULT_TIMES = 5;
const DEFAULT_INTERVAL = 0;
function retry(opts, task, callback) {
var options = {
times: DEFAULT_TIMES,
intervalFunc: constant$1(DEFAULT_INTERVAL)
};
if (arguments.length < 3 && typeof opts === 'function') {
callback = task || promiseCallback();
task = opts;
} else {
parseTimes(options, opts);
callback = callback || promiseCallback();
}
if (typeof task !== 'function') {
throw new Error("Invalid arguments for async.retry");
}
var _task = wrapAsync(task);
var attempt = 1;
function retryAttempt() {
_task((err, ...args) => {
if (err === false) return
if (err && attempt++ < options.times &&
(typeof options.errorFilter != 'function' ||
options.errorFilter(err))) {
setTimeout(retryAttempt, options.intervalFunc(attempt - 1));
} else {
callback(err, ...args);
}
});
}
retryAttempt();
return callback[PROMISE_SYMBOL]
}
function parseTimes(acc, t) {
if (typeof t === 'object') {
acc.times = +t.times || DEFAULT_TIMES;
acc.intervalFunc = typeof t.interval === 'function' ?
t.interval :
constant$1(+t.interval || DEFAULT_INTERVAL);
acc.errorFilter = t.errorFilter;
} else if (typeof t === 'number' || typeof t === 'string') {
acc.times = +t || DEFAULT_TIMES;
} else {
throw new Error("Invalid arguments for async.retry");
}
}
/**
* A close relative of [`retry`]{@link module:ControlFlow.retry}. This method
* wraps a task and makes it retryable, rather than immediately calling it
* with retries.
*
* @name retryable
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.retry]{@link module:ControlFlow.retry}
* @category Control Flow
* @param {Object|number} [opts = {times: 5, interval: 0}| 5] - optional
* options, exactly the same as from `retry`, except for a `opts.arity` that
* is the arity of the `task` function, defaulting to `task.length`
* @param {AsyncFunction} task - the asynchronous function to wrap.
* This function will be passed any arguments passed to the returned wrapper.
* Invoked with (...args, callback).
* @returns {AsyncFunction} The wrapped function, which when invoked, will
* retry on an error, based on the parameters specified in `opts`.
* This function will accept the same parameters as `task`.
* @example
*
* async.auto({
* dep1: async.retryable(3, getFromFlakyService),
* process: ["dep1", async.retryable(3, function (results, cb) {
* maybeProcessData(results.dep1, cb);
* })]
* }, callback);
*/
function retryable (opts, task) {
if (!task) {
task = opts;
opts = null;
}
let arity = (opts && opts.arity) || task.length;
if (isAsync(task)) {
arity += 1;
}
var _task = wrapAsync(task);
return initialParams((args, callback) => {
if (args.length < arity - 1 || callback == null) {
args.push(callback);
callback = promiseCallback();
}
function taskFn(cb) {
_task(...args, cb);
}
if (opts) retry(opts, taskFn, callback);
else retry(taskFn, callback);
return callback[PROMISE_SYMBOL]
});
}
/**
* Run the functions in the `tasks` collection in series, each one running once
* the previous function has completed. If any functions in the series pass an
* error to its callback, no more functions are run, and `callback` is
* immediately called with the value of the error. Otherwise, `callback`
* receives an array of results when `tasks` have completed.
*
* It is also possible to use an object instead of an array. Each property will
* be run as a function, and the results will be passed to the final `callback`
* as an object instead of an array. This can be a more readable way of handling
* results from {@link async.series}.
*
* **Note** that while many implementations preserve the order of object
* properties, the [ECMAScript Language Specification](http://www.ecma-international.org/ecma-262/5.1/#sec-8.6)
* explicitly states that
*
* > The mechanics and order of enumerating the properties is not specified.
*
* So if you rely on the order in which your series of functions are executed,
* and want this to work on all platforms, consider using an array.
*
* @name series
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {Array|Iterable|AsyncIterable|Object} tasks - A collection containing
* [async functions]{@link AsyncFunction} to run in series.
* Each function can complete with any number of optional `result` values.
* @param {Function} [callback] - An optional callback to run once all the
* functions have completed. This function gets a results array (or object)
* containing all the result arguments passed to the `task` callbacks. Invoked
* with (err, result).
* @return {Promise} a promise, if no callback is passed
* @example
* async.series([
* function(callback) {
* // do some stuff ...
* callback(null, 'one');
* },
* function(callback) {
* // do some more stuff ...
* callback(null, 'two');
* }
* ],
* // optional callback
* function(err, results) {
* // results is now equal to ['one', 'two']
* });
*
* async.series({
* one: function(callback) {
* setTimeout(function() {
* callback(null, 1);
* }, 200);
* },
* two: function(callback){
* setTimeout(function() {
* callback(null, 2);
* }, 100);
* }
* }, function(err, results) {
* // results is now equal to: {one: 1, two: 2}
* });
*/
function series(tasks, callback) {
return _parallel(eachOfSeries$1, tasks, callback);
}
/**
* Returns `true` if at least one element in the `coll` satisfies an async test.
* If any iteratee call returns `true`, the main `callback` is immediately
* called.
*
* @name some
* @static
* @memberOf module:Collections
* @method
* @alias any
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async truth test to apply to each item
* in the collections in parallel.
* The iteratee should complete with a boolean `result` value.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called as soon as any
* iteratee returns `true`, or after all the iteratee functions have finished.
* Result will be either `true` or `false` depending on the values of the async
* tests. Invoked with (err, result).
* @returns {Promise} a promise, if no callback provided
* @example
*
* async.some(['file1','file2','file3'], function(filePath, callback) {
* fs.access(filePath, function(err) {
* callback(null, !err)
* });
* }, function(err, result) {
* // if result is true then at least one of the files exists
* });
*/
function some(coll, iteratee, callback) {
return _createTester(Boolean, res => res)(eachOf$1, coll, iteratee, callback)
}
var some$1 = awaitify(some, 3);
/**
* The same as [`some`]{@link module:Collections.some} but runs a maximum of `limit` async operations at a time.
*
* @name someLimit
* @static
* @memberOf module:Collections
* @method
* @see [async.some]{@link module:Collections.some}
* @alias anyLimit
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async truth test to apply to each item
* in the collections in parallel.
* The iteratee should complete with a boolean `result` value.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called as soon as any
* iteratee returns `true`, or after all the iteratee functions have finished.
* Result will be either `true` or `false` depending on the values of the async
* tests. Invoked with (err, result).
* @returns {Promise} a promise, if no callback provided
*/
function someLimit(coll, limit, iteratee, callback) {
return _createTester(Boolean, res => res)(eachOfLimit(limit), coll, iteratee, callback)
}
var someLimit$1 = awaitify(someLimit, 4);
/**
* The same as [`some`]{@link module:Collections.some} but runs only a single async operation at a time.
*
* @name someSeries
* @static
* @memberOf module:Collections
* @method
* @see [async.some]{@link module:Collections.some}
* @alias anySeries
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async truth test to apply to each item
* in the collections in series.
* The iteratee should complete with a boolean `result` value.
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called as soon as any
* iteratee returns `true`, or after all the iteratee functions have finished.
* Result will be either `true` or `false` depending on the values of the async
* tests. Invoked with (err, result).
* @returns {Promise} a promise, if no callback provided
*/
function someSeries(coll, iteratee, callback) {
return _createTester(Boolean, res => res)(eachOfSeries$1, coll, iteratee, callback)
}
var someSeries$1 = awaitify(someSeries, 3);
/**
* Sorts a list by the results of running each `coll` value through an async
* `iteratee`.
*
* @name sortBy
* @static
* @memberOf module:Collections
* @method
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* The iteratee should complete with a value to use as the sort criteria as
* its `result`.
* Invoked with (item, callback).
* @param {Function} callback - A callback which is called after all the
* `iteratee` functions have finished, or an error occurs. Results is the items
* from the original `coll` sorted by the values returned by the `iteratee`
* calls. Invoked with (err, results).
* @returns {Promise} a promise, if no callback passed
* @example
*
* async.sortBy(['file1','file2','file3'], function(file, callback) {
* fs.stat(file, function(err, stats) {
* callback(err, stats.mtime);
* });
* }, function(err, results) {
* // results is now the original array of files sorted by
* // modified date
* });
*
* // By modifying the callback parameter the
* // sorting order can be influenced:
*
* // ascending order
* async.sortBy([1,9,3,5], function(x, callback) {
* callback(null, x);
* }, function(err,result) {
* // result callback
* });
*
* // descending order
* async.sortBy([1,9,3,5], function(x, callback) {
* callback(null, x*-1); //<- x*-1 instead of x, turns the order around
* }, function(err,result) {
* // result callback
* });
*/
function sortBy (coll, iteratee, callback) {
var _iteratee = wrapAsync(iteratee);
return map$1(coll, (x, iterCb) => {
_iteratee(x, (err, criteria) => {
if (err) return iterCb(err);
iterCb(err, {value: x, criteria});
});
}, (err, results) => {
if (err) return callback(err);
callback(null, results.sort(comparator).map(v => v.value));
});
function comparator(left, right) {
var a = left.criteria, b = right.criteria;
return a < b ? -1 : a > b ? 1 : 0;
}
}
var sortBy$1 = awaitify(sortBy, 3);
/**
* Sets a time limit on an asynchronous function. If the function does not call
* its callback within the specified milliseconds, it will be called with a
* timeout error. The code property for the error object will be `'ETIMEDOUT'`.
*
* @name timeout
* @static
* @memberOf module:Utils
* @method
* @category Util
* @param {AsyncFunction} asyncFn - The async function to limit in time.
* @param {number} milliseconds - The specified time limit.
* @param {*} [info] - Any variable you want attached (`string`, `object`, etc)
* to timeout Error for more information..
* @returns {AsyncFunction} Returns a wrapped function that can be used with any
* of the control flow functions.
* Invoke this function with the same parameters as you would `asyncFunc`.
* @example
*
* function myFunction(foo, callback) {
* doAsyncTask(foo, function(err, data) {
* // handle errors
* if (err) return callback(err);
*
* // do some stuff ...
*
* // return processed data
* return callback(null, data);
* });
* }
*
* var wrapped = async.timeout(myFunction, 1000);
*
* // call `wrapped` as you would `myFunction`
* wrapped({ bar: 'bar' }, function(err, data) {
* // if `myFunction` takes < 1000 ms to execute, `err`
* // and `data` will have their expected values
*
* // else `err` will be an Error with the code 'ETIMEDOUT'
* });
*/
function timeout(asyncFn, milliseconds, info) {
var fn = wrapAsync(asyncFn);
return initialParams((args, callback) => {
var timedOut = false;
var timer;
function timeoutCallback() {
var name = asyncFn.name || 'anonymous';
var error = new Error('Callback function "' + name + '" timed out.');
error.code = 'ETIMEDOUT';
if (info) {
error.info = info;
}
timedOut = true;
callback(error);
}
args.push((...cbArgs) => {
if (!timedOut) {
callback(...cbArgs);
clearTimeout(timer);
}
});
// setup timer and call original function
timer = setTimeout(timeoutCallback, milliseconds);
fn(...args);
});
}
function range(size) {
var result = Array(size);
while (size--) {
result[size] = size;
}
return result;
}
/**
* The same as [times]{@link module:ControlFlow.times} but runs a maximum of `limit` async operations at a
* time.
*
* @name timesLimit
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.times]{@link module:ControlFlow.times}
* @category Control Flow
* @param {number} count - The number of times to run the function.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - The async function to call `n` times.
* Invoked with the iteration index and a callback: (n, next).
* @param {Function} callback - see [async.map]{@link module:Collections.map}.
* @returns {Promise} a promise, if no callback is provided
*/
function timesLimit(count, limit, iteratee, callback) {
var _iteratee = wrapAsync(iteratee);
return mapLimit$1(range(count), limit, _iteratee, callback);
}
/**
* Calls the `iteratee` function `n` times, and accumulates results in the same
* manner you would use with [map]{@link module:Collections.map}.
*
* @name times
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.map]{@link module:Collections.map}
* @category Control Flow
* @param {number} n - The number of times to run the function.
* @param {AsyncFunction} iteratee - The async function to call `n` times.
* Invoked with the iteration index and a callback: (n, next).
* @param {Function} callback - see {@link module:Collections.map}.
* @returns {Promise} a promise, if no callback is provided
* @example
*
* // Pretend this is some complicated async factory
* var createUser = function(id, callback) {
* callback(null, {
* id: 'user' + id
* });
* };
*
* // generate 5 users
* async.times(5, function(n, next) {
* createUser(n, function(err, user) {
* next(err, user);
* });
* }, function(err, users) {
* // we should now have 5 users
* });
*/
function times (n, iteratee, callback) {
return timesLimit(n, Infinity, iteratee, callback)
}
/**
* The same as [times]{@link module:ControlFlow.times} but runs only a single async operation at a time.
*
* @name timesSeries
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.times]{@link module:ControlFlow.times}
* @category Control Flow
* @param {number} n - The number of times to run the function.
* @param {AsyncFunction} iteratee - The async function to call `n` times.
* Invoked with the iteration index and a callback: (n, next).
* @param {Function} callback - see {@link module:Collections.map}.
* @returns {Promise} a promise, if no callback is provided
*/
function timesSeries (n, iteratee, callback) {
return timesLimit(n, 1, iteratee, callback)
}
/**
* A relative of `reduce`. Takes an Object or Array, and iterates over each
* element in parallel, each step potentially mutating an `accumulator` value.
* The type of the accumulator defaults to the type of collection passed in.
*
* @name transform
* @static
* @memberOf module:Collections
* @method
* @category Collection
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {*} [accumulator] - The initial state of the transform. If omitted,
* it will default to an empty Object or Array, depending on the type of `coll`
* @param {AsyncFunction} iteratee - A function applied to each item in the
* collection that potentially modifies the accumulator.
* Invoked with (accumulator, item, key, callback).
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished. Result is the transformed accumulator.
* Invoked with (err, result).
* @returns {Promise} a promise, if no callback provided
* @example
*
* async.transform([1,2,3], function(acc, item, index, callback) {
* // pointless async:
* process.nextTick(function() {
* acc[index] = item * 2
* callback(null)
* });
* }, function(err, result) {
* // result is now equal to [2, 4, 6]
* });
*
* @example
*
* async.transform({a: 1, b: 2, c: 3}, function (obj, val, key, callback) {
* setImmediate(function () {
* obj[key] = val * 2;
* callback();
* })
* }, function (err, result) {
* // result is equal to {a: 2, b: 4, c: 6}
* })
*/
function transform (coll, accumulator, iteratee, callback) {
if (arguments.length <= 3 && typeof accumulator === 'function') {
callback = iteratee;
iteratee = accumulator;
accumulator = Array.isArray(coll) ? [] : {};
}
callback = once(callback || promiseCallback());
var _iteratee = wrapAsync(iteratee);
eachOf$1(coll, (v, k, cb) => {
_iteratee(accumulator, v, k, cb);
}, err => callback(err, accumulator));
return callback[PROMISE_SYMBOL]
}
/**
* It runs each task in series but stops whenever any of the functions were
* successful. If one of the tasks were successful, the `callback` will be
* passed the result of the successful task. If all tasks fail, the callback
* will be passed the error and result (if any) of the final attempt.
*
* @name tryEach
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {Array|Iterable|AsyncIterable|Object} tasks - A collection containing functions to
* run, each function is passed a `callback(err, result)` it must call on
* completion with an error `err` (which can be `null`) and an optional `result`
* value.
* @param {Function} [callback] - An optional callback which is called when one
* of the tasks has succeeded, or all have failed. It receives the `err` and
* `result` arguments of the last attempt at completing the `task`. Invoked with
* (err, results).
* @returns {Promise} a promise, if no callback is passed
* @example
* async.tryEach([
* function getDataFromFirstWebsite(callback) {
* // Try getting the data from the first website
* callback(err, data);
* },
* function getDataFromSecondWebsite(callback) {
* // First website failed,
* // Try getting the data from the backup website
* callback(err, data);
* }
* ],
* // optional callback
* function(err, results) {
* Now do something with the data.
* });
*
*/
function tryEach(tasks, callback) {
var error = null;
var result;
return eachSeries$1(tasks, (task, taskCb) => {
wrapAsync(task)((err, ...args) => {
if (err === false) return taskCb(err);
if (args.length < 2) {
[result] = args;
} else {
result = args;
}
error = err;
taskCb(err ? null : {});
});
}, () => callback(error, result));
}
var tryEach$1 = awaitify(tryEach);
/**
* Undoes a [memoize]{@link module:Utils.memoize}d function, reverting it to the original,
* unmemoized form. Handy for testing.
*
* @name unmemoize
* @static
* @memberOf module:Utils
* @method
* @see [async.memoize]{@link module:Utils.memoize}
* @category Util
* @param {AsyncFunction} fn - the memoized function
* @returns {AsyncFunction} a function that calls the original unmemoized function
*/
function unmemoize(fn) {
return (...args) => {
return (fn.unmemoized || fn)(...args);
};
}
/**
* Repeatedly call `iteratee`, while `test` returns `true`. Calls `callback` when
* stopped, or an error occurs.
*
* @name whilst
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {AsyncFunction} test - asynchronous truth test to perform before each
* execution of `iteratee`. Invoked with ().
* @param {AsyncFunction} iteratee - An async function which is called each time
* `test` passes. Invoked with (callback).
* @param {Function} [callback] - A callback which is called after the test
* function has failed and repeated execution of `iteratee` has stopped. `callback`
* will be passed an error and any arguments passed to the final `iteratee`'s
* callback. Invoked with (err, [results]);
* @returns {Promise} a promise, if no callback is passed
* @example
*
* var count = 0;
* async.whilst(
* function test(cb) { cb(null, count < 5;) },
* function iter(callback) {
* count++;
* setTimeout(function() {
* callback(null, count);
* }, 1000);
* },
* function (err, n) {
* // 5 seconds have passed, n = 5
* }
* );
*/
function whilst(test, iteratee, callback) {
callback = onlyOnce(callback);
var _fn = wrapAsync(iteratee);
var _test = wrapAsync(test);
var results = [];
function next(err, ...rest) {
if (err) return callback(err);
results = rest;
if (err === false) return;
_test(check);
}
function check(err, truth) {
if (err) return callback(err);
if (err === false) return;
if (!truth) return callback(null, ...results);
_fn(next);
}
return _test(check);
}
var whilst$1 = awaitify(whilst, 3);
/**
* Repeatedly call `iteratee` until `test` returns `true`. Calls `callback` when
* stopped, or an error occurs. `callback` will be passed an error and any
* arguments passed to the final `iteratee`'s callback.
*
* The inverse of [whilst]{@link module:ControlFlow.whilst}.
*
* @name until
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.whilst]{@link module:ControlFlow.whilst}
* @category Control Flow
* @param {AsyncFunction} test - asynchronous truth test to perform before each
* execution of `iteratee`. Invoked with (callback).
* @param {AsyncFunction} iteratee - An async function which is called each time
* `test` fails. Invoked with (callback).
* @param {Function} [callback] - A callback which is called after the test
* function has passed and repeated execution of `iteratee` has stopped. `callback`
* will be passed an error and any arguments passed to the final `iteratee`'s
* callback. Invoked with (err, [results]);
* @returns {Promise} a promise, if a callback is not passed
*
* @example
* const results = []
* async.until(function test(page, cb) {
* cb(null, page.next == null)
* }, function iter(next) {
* fetchPage(url, (err, body) => {
* if (err) return next(err)
* results = results.concat(body.objects)
* next(err, body)
* })
* }, function done (err) {
* // all pages have been fetched
* })
*/
function until(test, iteratee, callback) {
const _test = wrapAsync(test);
return whilst$1((cb) => _test((err, truth) => cb (err, !truth)), iteratee, callback);
}
/**
* Runs the `tasks` array of functions in series, each passing their results to
* the next in the array. However, if any of the `tasks` pass an error to their
* own callback, the next function is not executed, and the main `callback` is
* immediately called with the error.
*
* @name waterfall
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {Array} tasks - An array of [async functions]{@link AsyncFunction}
* to run.
* Each function should complete with any number of `result` values.
* The `result` values will be passed as arguments, in order, to the next task.
* @param {Function} [callback] - An optional callback to run once all the
* functions have completed. This will be passed the results of the last task's
* callback. Invoked with (err, [results]).
* @returns undefined
* @example
*
* async.waterfall([
* function(callback) {
* callback(null, 'one', 'two');
* },
* function(arg1, arg2, callback) {
* // arg1 now equals 'one' and arg2 now equals 'two'
* callback(null, 'three');
* },
* function(arg1, callback) {
* // arg1 now equals 'three'
* callback(null, 'done');
* }
* ], function (err, result) {
* // result now equals 'done'
* });
*
* // Or, with named functions:
* async.waterfall([
* myFirstFunction,
* mySecondFunction,
* myLastFunction,
* ], function (err, result) {
* // result now equals 'done'
* });
* function myFirstFunction(callback) {
* callback(null, 'one', 'two');
* }
* function mySecondFunction(arg1, arg2, callback) {
* // arg1 now equals 'one' and arg2 now equals 'two'
* callback(null, 'three');
* }
* function myLastFunction(arg1, callback) {
* // arg1 now equals 'three'
* callback(null, 'done');
* }
*/
function waterfall (tasks, callback) {
callback = once(callback);
if (!Array.isArray(tasks)) return callback(new Error('First argument to waterfall must be an array of functions'));
if (!tasks.length) return callback();
var taskIndex = 0;
function nextTask(args) {
var task = wrapAsync(tasks[taskIndex++]);
task(...args, onlyOnce(next));
}
function next(err, ...args) {
if (err === false) return
if (err || taskIndex === tasks.length) {
return callback(err, ...args);
}
nextTask(args);
}
nextTask([]);
}
var waterfall$1 = awaitify(waterfall);
/**
* An "async function" in the context of Async is an asynchronous function with
* a variable number of parameters, with the final parameter being a callback.
* (`function (arg1, arg2, ..., callback) {}`)
* The final callback is of the form `callback(err, results...)`, which must be
* called once the function is completed. The callback should be called with a
* Error as its first argument to signal that an error occurred.
* Otherwise, if no error occurred, it should be called with `null` as the first
* argument, and any additional `result` arguments that may apply, to signal
* successful completion.
* The callback must be called exactly once, ideally on a later tick of the
* JavaScript event loop.
*
* This type of function is also referred to as a "Node-style async function",
* or a "continuation passing-style function" (CPS). Most of the methods of this
* library are themselves CPS/Node-style async functions, or functions that
* return CPS/Node-style async functions.
*
* Wherever we accept a Node-style async function, we also directly accept an
* [ES2017 `async` function]{@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function}.
* In this case, the `async` function will not be passed a final callback
* argument, and any thrown error will be used as the `err` argument of the
* implicit callback, and the return value will be used as the `result` value.
* (i.e. a `rejected` of the returned Promise becomes the `err` callback
* argument, and a `resolved` value becomes the `result`.)
*
* Note, due to JavaScript limitations, we can only detect native `async`
* functions and not transpilied implementations.
* Your environment must have `async`/`await` support for this to work.
* (e.g. Node > v7.6, or a recent version of a modern browser).
* If you are using `async` functions through a transpiler (e.g. Babel), you
* must still wrap the function with [asyncify]{@link module:Utils.asyncify},
* because the `async function` will be compiled to an ordinary function that
* returns a promise.
*
* @typedef {Function} AsyncFunction
* @static
*/
var index = {
apply,
applyEach: applyEach$1,
applyEachSeries,
asyncify,
auto,
autoInject,
cargo,
cargoQueue: cargo$1,
compose,
concat: concat$1,
concatLimit: concatLimit$1,
concatSeries: concatSeries$1,
constant,
detect: detect$1,
detectLimit: detectLimit$1,
detectSeries: detectSeries$1,
dir,
doUntil,
doWhilst: doWhilst$1,
each,
eachLimit: eachLimit$2,
eachOf: eachOf$1,
eachOfLimit: eachOfLimit$2,
eachOfSeries: eachOfSeries$1,
eachSeries: eachSeries$1,
ensureAsync,
every: every$1,
everyLimit: everyLimit$1,
everySeries: everySeries$1,
filter: filter$1,
filterLimit: filterLimit$1,
filterSeries: filterSeries$1,
forever: forever$1,
groupBy,
groupByLimit: groupByLimit$1,
groupBySeries,
log,
map: map$1,
mapLimit: mapLimit$1,
mapSeries: mapSeries$1,
mapValues,
mapValuesLimit: mapValuesLimit$1,
mapValuesSeries,
memoize,
nextTick,
parallel,
parallelLimit,
priorityQueue,
queue: queue$1,
race: race$1,
reduce: reduce$1,
reduceRight,
reflect,
reflectAll,
reject: reject$2,
rejectLimit: rejectLimit$1,
rejectSeries: rejectSeries$1,
retry,
retryable,
seq,
series,
setImmediate: setImmediate$1,
some: some$1,
someLimit: someLimit$1,
someSeries: someSeries$1,
sortBy: sortBy$1,
timeout,
times,
timesLimit,
timesSeries,
transform,
tryEach: tryEach$1,
unmemoize,
until,
waterfall: waterfall$1,
whilst: whilst$1,
// aliases
all: every$1,
allLimit: everyLimit$1,
allSeries: everySeries$1,
any: some$1,
anyLimit: someLimit$1,
anySeries: someSeries$1,
find: detect$1,
findLimit: detectLimit$1,
findSeries: detectSeries$1,
flatMap: concat$1,
flatMapLimit: concatLimit$1,
flatMapSeries: concatSeries$1,
forEach: each,
forEachSeries: eachSeries$1,
forEachLimit: eachLimit$2,
forEachOf: eachOf$1,
forEachOfSeries: eachOfSeries$1,
forEachOfLimit: eachOfLimit$2,
inject: reduce$1,
foldl: reduce$1,
foldr: reduceRight,
select: filter$1,
selectLimit: filterLimit$1,
selectSeries: filterSeries$1,
wrapSync: asyncify,
during: whilst$1,
doDuring: doWhilst$1
};
exports.default = index;
exports.apply = apply;
exports.applyEach = applyEach$1;
exports.applyEachSeries = applyEachSeries;
exports.asyncify = asyncify;
exports.auto = auto;
exports.autoInject = autoInject;
exports.cargo = cargo;
exports.cargoQueue = cargo$1;
exports.compose = compose;
exports.concat = concat$1;
exports.concatLimit = concatLimit$1;
exports.concatSeries = concatSeries$1;
exports.constant = constant;
exports.detect = detect$1;
exports.detectLimit = detectLimit$1;
exports.detectSeries = detectSeries$1;
exports.dir = dir;
exports.doUntil = doUntil;
exports.doWhilst = doWhilst$1;
exports.each = each;
exports.eachLimit = eachLimit$2;
exports.eachOf = eachOf$1;
exports.eachOfLimit = eachOfLimit$2;
exports.eachOfSeries = eachOfSeries$1;
exports.eachSeries = eachSeries$1;
exports.ensureAsync = ensureAsync;
exports.every = every$1;
exports.everyLimit = everyLimit$1;
exports.everySeries = everySeries$1;
exports.filter = filter$1;
exports.filterLimit = filterLimit$1;
exports.filterSeries = filterSeries$1;
exports.forever = forever$1;
exports.groupBy = groupBy;
exports.groupByLimit = groupByLimit$1;
exports.groupBySeries = groupBySeries;
exports.log = log;
exports.map = map$1;
exports.mapLimit = mapLimit$1;
exports.mapSeries = mapSeries$1;
exports.mapValues = mapValues;
exports.mapValuesLimit = mapValuesLimit$1;
exports.mapValuesSeries = mapValuesSeries;
exports.memoize = memoize;
exports.nextTick = nextTick;
exports.parallel = parallel;
exports.parallelLimit = parallelLimit;
exports.priorityQueue = priorityQueue;
exports.queue = queue$1;
exports.race = race$1;
exports.reduce = reduce$1;
exports.reduceRight = reduceRight;
exports.reflect = reflect;
exports.reflectAll = reflectAll;
exports.reject = reject$2;
exports.rejectLimit = rejectLimit$1;
exports.rejectSeries = rejectSeries$1;
exports.retry = retry;
exports.retryable = retryable;
exports.seq = seq;
exports.series = series;
exports.setImmediate = setImmediate$1;
exports.some = some$1;
exports.someLimit = someLimit$1;
exports.someSeries = someSeries$1;
exports.sortBy = sortBy$1;
exports.timeout = timeout;
exports.times = times;
exports.timesLimit = timesLimit;
exports.timesSeries = timesSeries;
exports.transform = transform;
exports.tryEach = tryEach$1;
exports.unmemoize = unmemoize;
exports.until = until;
exports.waterfall = waterfall$1;
exports.whilst = whilst$1;
exports.all = every$1;
exports.allLimit = everyLimit$1;
exports.allSeries = everySeries$1;
exports.any = some$1;
exports.anyLimit = someLimit$1;
exports.anySeries = someSeries$1;
exports.find = detect$1;
exports.findLimit = detectLimit$1;
exports.findSeries = detectSeries$1;
exports.flatMap = concat$1;
exports.flatMapLimit = concatLimit$1;
exports.flatMapSeries = concatSeries$1;
exports.forEach = each;
exports.forEachSeries = eachSeries$1;
exports.forEachLimit = eachLimit$2;
exports.forEachOf = eachOf$1;
exports.forEachOfSeries = eachOfSeries$1;
exports.forEachOfLimit = eachOfLimit$2;
exports.inject = reduce$1;
exports.foldl = reduce$1;
exports.foldr = reduceRight;
exports.select = filter$1;
exports.selectLimit = filterLimit$1;
exports.selectSeries = filterSeries$1;
exports.wrapSync = asyncify;
exports.during = whilst$1;
exports.doDuring = doWhilst$1;
Object.defineProperty(exports, '__esModule', { value: true });
})));