groovebasin/lib/deduped_queue.js
Andrew Kelley 11068623b5 fix upload for multiple files
also when auto queuing, sort by position in library
2014-03-24 17:12:54 -07:00

96 lines
2.5 KiB
JavaScript

var cpuCount = require('os').cpus().length;
var EventEmitter = require('events').EventEmitter;
var util = require('util');
module.exports = DedupedQueue;
util.inherits(DedupedQueue, EventEmitter);
function DedupedQueue(options) {
EventEmitter.call(this);
this.maxAsync = options.maxAsync || cpuCount;
this.processOne = options.processOne;
this.pendingQueue = [];
this.pendingSet = {};
this.processingCount = 0;
this.processingSet = {};
}
DedupedQueue.prototype.idInQueue = function(id) {
return !!(this.pendingSet[id] || this.processingSet[id]);
};
DedupedQueue.prototype.add = function(id, item, cb) {
var queueItem = this.pendingSet[id];
if (queueItem) {
if (cb) queueItem.cbs.push(cb);
return;
}
queueItem = new QueueItem(id, item);
if (cb) queueItem.cbs.push(cb);
this.pendingSet[id] = queueItem;
this.pendingQueue.push(queueItem);
this.flush();
};
DedupedQueue.prototype.waitForId = function(id, cb) {
var queueItem = this.pendingSet[id] || this.processingSet[id];
if (!queueItem) return cb();
queueItem.cbs.push(cb);
};
DedupedQueue.prototype.flush = function() {
// if an item cannot go into the processing queue because an item with the
// same ID is already there, it goes into deferred
var deferred = [];
while (this.processingCount < this.maxAsync && this.pendingQueue.length > 0) {
var queueItem = this.pendingQueue.shift();
if (this.processingSet[queueItem.id]) {
deferred.push(queueItem);
} else {
delete this.pendingSet[queueItem.id];
this.processingSet[queueItem.id] = queueItem;
this.processingCount += 1;
this.startOne(queueItem);
}
}
for (var i = 0; i < deferred.length; i += 1) {
this.pendingQueue.push(deferred[i]);
}
};
DedupedQueue.prototype.startOne = function(queueItem) {
var self = this;
var callbackCalled = false;
self.processOne(queueItem.item, function(err) {
if (callbackCalled) {
self.emit('error', new Error("callback called more than once"));
return;
}
callbackCalled = true;
delete self.processingSet[queueItem.id];
self.processingCount -= 1;
if (queueItem.cbs.length === 0) {
defaultCb(err);
} else {
for (var i = 0; i < queueItem.cbs.length; i += 1) {
queueItem.cbs[i](err);
}
}
self.flush();
function defaultCb(err) {
if (err) self.emit('error', err);
self.emit('oneEnd');
}
});
};
function QueueItem(id, item) {
this.id = id;
this.item = item;
this.cbs = [];
}