rewrite module in es6

This commit is contained in:
Todd Treece 2016-05-12 00:45:47 +00:00
parent 6d0f817cf1
commit de85d1869e
2 changed files with 132 additions and 153 deletions

132
mqemitter-io.js Normal file
View file

@ -0,0 +1,132 @@
'use strict'
const Redis = require('ioredis'),
MQEmitter = require('mqemitter'),
shortid = require('shortid'),
LRU = require('lru-cache'),
msgpack = require('msgpack-lite');
class MQEmitterIO extends MQEmitter {
constructor(opts) {
super(opts);
this.subConn = new Redis(opts);
this.pubConn = new Redis(opts);
this._topics = {};
this._cache = LRU({
max: 10000,
maxAge: 60 * 1000 // one minute
});
this.subConn.on('messageBuffer', this.handler.bind(this));
this.subConn.on('pmessageBuffer', this.handler.bind(this));
}
handler(sub, topic, payload) {
const packet = msgpack.decode(payload);
if(! this._cache.get(packet.id))
super.emit(packet.msg)
this._cache.set(packet.id, true);
}
close(cb) {
let count = 2
const onEnd = () => {
if(--count === 0) super.close(cb);
};
this.subConn.on('end', onEnd)
this.subConn.quit()
this.pubConn.on('end', onEnd)
this.pubConn.quit()
return this;
}
_subTopic(topic) {
return topic.replace(this._opts.wildcardOne, '*')
.replace(this._opts.wildcardSome, '*')
}
on(topic, cb, done) {
const subTopic = this._subTopic(topic);
const onFinish = () => {
if(done) setImmediate(done)
};
super.on(topic, cb);
if(this._topics[subTopic]) {
this._topics[subTopic]++;
onFinish();
return this;
}
this._topics[subTopic] = 1;
if(this._containsWildcard(topic))
this.subConn.psubscribe(subTopic, onFinish);
else
this.subConn.subscribe(subTopic, onFinish);
return this;
}
emit(msg, done) {
if(this.closed)
return done(new Error('mqemitter-io is closed'))
setImmediate(done);
}
removeListener(topic, cb, done) {
const subTopic = this._subTopic(topic);
const onFinish = () => {
if(done) setImmediate(done);
};
this._removeListener(topic, cb)
if(--this._topics[subTopic] > 0) {
onFinish();
return this;
}
delete this._topics[subTopic];
if (this._containsWildcard(topic))
this.subConn.punsubscribe(subTopic, onFinish);
else if (this._matcher.match(topic))
this.subConn.unsubscribe(subTopic, onFinish);
return this;
}
_containsWildcard(topic) {
return (topic.indexOf(this._opts.wildcardOne) >= 0) || (topic.indexOf(this._opts.wildcardSome) >= 0);
}
}
exports = module.exports = MQEmitterIO;

View file

@ -1,153 +0,0 @@
'use strict'
var Redis = require('ioredis')
var MQEmitter = require('mqemitter')
var shortid = require('shortid')
var inherits = require('inherits')
var LRU = require('lru-cache')
var msgpack = require('msgpack-lite')
function MQEmitterRedis (opts) {
if (!(this instanceof MQEmitterRedis)) {
return new MQEmitterRedis(opts)
}
opts = opts || {}
this._opts = opts
this.subConn = new Redis(opts)
this.pubConn = new Redis(opts)
this._topics = {}
this._cache = LRU({
max: 10000,
maxAge: 60 * 1000 // one minute
})
var that = this
function handler (sub, topic, payload) {
var packet = msgpack.decode(payload)
if (!that._cache.get(packet.id)) {
that._emit(packet.msg)
}
that._cache.set(packet.id, true)
}
this.subConn.on('messageBuffer', function (topic, message) {
handler(topic, topic, message)
})
this.subConn.on('pmessageBuffer', function (sub, topic, message) {
handler(sub, topic, message)
})
MQEmitter.call(this, opts)
}
inherits(MQEmitterRedis, MQEmitter)
;['emit', 'on', 'removeListener', 'close'].forEach(function (name) {
MQEmitterRedis.prototype['_' + name] = MQEmitterRedis.prototype[name]
})
MQEmitterRedis.prototype.close = function (cb) {
var count = 2
var that = this
function onEnd () {
if (--count === 0) {
that._close(cb)
}
}
this.subConn.on('end', onEnd)
this.subConn.quit()
this.pubConn.on('end', onEnd)
this.pubConn.quit()
return this
}
MQEmitterRedis.prototype._subTopic = function (topic) {
return topic.replace(this._opts.wildcardOne, '*')
.replace(this._opts.wildcardSome, '*')
}
MQEmitterRedis.prototype.on = function on (topic, cb, done) {
var subTopic = this._subTopic(topic)
var onFinish = function () {
if (done) {
setImmediate(done)
}
}
this._on(topic, cb)
if (this._topics[subTopic]) {
this._topics[subTopic]++
onFinish()
return this
}
this._topics[subTopic] = 1
if (this._containsWildcard(topic)) {
this.subConn.psubscribe(subTopic, onFinish)
} else {
this.subConn.subscribe(subTopic, onFinish)
}
return this
}
MQEmitterRedis.prototype.emit = function (msg, done) {
if (this.closed) {
return done(new Error('mqemitter-redis is closed'))
}
var packet = {
id: shortid(),
msg: msg
}
this.pubConn.publish(msg.topic, msgpack.encode(packet))
if (done) {
setImmediate(done)
}
}
MQEmitterRedis.prototype.removeListener = function (topic, cb, done) {
var subTopic = this._subTopic(topic)
var onFinish = function () {
if (done) {
setImmediate(done)
}
}
this._removeListener(topic, cb)
if (--this._topics[subTopic] > 0) {
onFinish()
return this
}
delete this._topics[subTopic]
if (this._containsWildcard(topic)) {
this.subConn.punsubscribe(subTopic, onFinish)
} else if (this._matcher.match(topic)) {
this.subConn.unsubscribe(subTopic, onFinish)
}
return this
}
MQEmitterRedis.prototype._containsWildcard = function (topic) {
return (topic.indexOf(this._opts.wildcardOne) >= 0) ||
(topic.indexOf(this._opts.wildcardSome) >= 0)
}
module.exports = MQEmitterRedis