Updated dependencies. Standardified.

This commit is contained in:
Matteo Collina 2015-06-02 18:02:10 +02:00
parent 2c65fe441a
commit 873e4d8b14
4 changed files with 80 additions and 69 deletions

View file

@ -6,6 +6,9 @@ Redis-powered [MQEmitter](http://github.com/mcollina/mqemitter).
See [MQEmitter](http://github.com/mcollina/mqemitter) for the actual
API.
[![js-standard-style](https://raw.githubusercontent.com/feross/standard/master/badge.png)](https://github.com/feross/standard)
Install
-------
@ -18,25 +21,25 @@ Example
```js
var redis = require('mqemitter-redis')
, mq = redis({
port: 12345
, localhost: 12.34.56.78
, password: 'my secret'
, db: 4
})
, msg = {
topic: 'hello world'
, payload: 'or any other fields'
}
var mq = redis({
port: 12345,
localhost: 12.34.56.78,
password: 'my secret',
db: 4
})
var msg = {
topic: 'hello world'
payload: 'or any other fields'
}
mq.on('hello world', function(message, cb) {
mq.on('hello world', function (message, cb) {
// call callback when you are done
// do not pass any errors, the emitter cannot handle it.
cb()
})
// topic is mandatory
mq.emit(msg, function() {
mq.emit(msg, function () {
// emitter will never return an error
})
```

View file

@ -1,43 +1,44 @@
'use strict'
var redis = require('redis')
, MQEmitter = require('mqemitter')
, shortid = require('shortid')
, inherits = require('inherits')
, LRU = require("lru-cache")
var redis = require('redis')
var MQEmitter = require('mqemitter')
var shortid = require('shortid')
var inherits = require('inherits')
var LRU = require('lru-cache')
function MQEmitterRedis(opts) {
function MQEmitterRedis (opts) {
if (!(this instanceof MQEmitterRedis)) {
return new MQEmitterRedis(opts)
}
opts = opts || {}
opts = opts || {}
this._opts = opts
this._opts = opts
this.subConn = createConn(opts)
this.pubConn = createConn(opts)
this.subConn = createConn(opts)
this.pubConn = createConn(opts)
this._topics = {}
this._topics = {}
this._cache = LRU({
max: 10000,
maxAge: 60 * 1000 // one minute
})
this._cache = LRU({
max: 10000
, maxAge: 60 * 1000 // one minute
})
var that = this
var that = this
function handler(sub, topic, payload) {
function handler (sub, topic, payload) {
var packet = JSON.parse(payload)
if (!that._cache.get(packet.id))
if (!that._cache.get(packet.id)) {
that._emit(packet.msg)
}
that._cache.set(packet.id, true)
}
this.subConn.on("message", function (topic, message) {
this.subConn.on('message', function (topic, message) {
handler(topic, topic, message)
})
this.subConn.on("pmessage", function(sub, topic, message) {
this.subConn.on('pmessage', function (sub, topic, message) {
handler(sub, topic, message)
})
@ -46,7 +47,7 @@ function MQEmitterRedis(opts) {
inherits(MQEmitterRedis, MQEmitter)
function createConn(opts) {
function createConn (opts) {
var conn = redis.createClient(opts.port || null,
opts.host || null,
opts.redis)
@ -61,17 +62,18 @@ function createConn(opts) {
return conn
}
['emit', 'on', 'removeListener', 'close'].forEach(function(name) {
['emit', 'on', 'removeListener', 'close'].forEach(function (name) {
MQEmitterRedis.prototype['_' + name] = MQEmitterRedis.prototype[name]
})
MQEmitterRedis.prototype.close = function close(cb) {
MQEmitterRedis.prototype.close = function (cb) {
var count = 2
, that = this
var that = this
function onEnd() {
if (--count === 0)
function onEnd () {
if (--count === 0) {
that._close(cb)
}
}
this.subConn.on('end', onEnd)
@ -83,14 +85,14 @@ MQEmitterRedis.prototype.close = function close(cb) {
return this
}
MQEmitterRedis.prototype._subTopic = function(topic) {
return topic.replace(this._opts.wildcardOne, '*')
.replace(this._opts.wildcardSome, '*')
MQEmitterRedis.prototype._subTopic = function (topic) {
return topic.replace(this._opts.wildcardOne, '*')
.replace(this._opts.wildcardSome, '*')
}
MQEmitterRedis.prototype.on = function on(topic, cb, done) {
MQEmitterRedis.prototype.on = function on (topic, cb, done) {
var subTopic = this._subTopic(topic)
var onFinish = function() {
var onFinish = function () {
if (done) {
setImmediate(done)
}
@ -115,25 +117,26 @@ MQEmitterRedis.prototype.on = function on(topic, cb, done) {
return this
}
MQEmitterRedis.prototype.emit = function emit(msg, done) {
if (this.closed)
MQEmitterRedis.prototype.emit = function (msg, done) {
if (this.closed) {
return done(new Error('mqemitter-redis is closed'))
var packet = {
id: shortid()
, msg: msg
}
this.pubConn.publish(msg.topic, JSON.stringify(packet), function() {
var packet = {
id: shortid(),
msg: msg
}
this.pubConn.publish(msg.topic, JSON.stringify(packet), function () {
if (done) {
setImmediate(done)
}
})
}
MQEmitterRedis.prototype.removeListener = function removeListener(topic, cb, done) {
MQEmitterRedis.prototype.removeListener = function (topic, cb, done) {
var subTopic = this._subTopic(topic)
var onFinish = function() {
var onFinish = function () {
if (done) {
setImmediate(done)
}
@ -157,7 +160,7 @@ MQEmitterRedis.prototype.removeListener = function removeListener(topic, cb, don
return this
}
MQEmitterRedis.prototype._containsWildcard = function(topic) {
MQEmitterRedis.prototype._containsWildcard = function (topic) {
return (topic.indexOf(this._opts.wildcardOne) >= 0) ||
(topic.indexOf(this._opts.wildcardSome) >= 0)
}

View file

@ -5,18 +5,22 @@
"main": "mqemitter-redis.js",
"dependencies": {
"inherits": "^2.0.1",
"mqemitter": "^0.3.0",
"mqemitter": "^0.4.0",
"redis": "^0.12.1",
"shortid": "^2.0.1",
"lru-cache": "~2.5.0"
"shortid": "^2.2.2",
"lru-cache": "^2.6.4"
},
"devDependencies": {
"faucet": "^0.0.1",
"tape": "^2.14.0"
"pre-commit": "^1.0.7",
"standard": "^4.0.1",
"tape": "^4.0.0"
},
"scripts": {
"lint": "standard",
"test": "tape test.js | faucet"
},
"pre-commit": ["lint", "test"],
"repository": {
"type": "git",
"url": "https://github.com/mcollina/mqemitter-redis.git"

23
test.js
View file

@ -1,26 +1,27 @@
'use strict'
var redis = require('./')
, test = require('tape').test
, abstractTests = require('mqemitter/abstractTest.js')
var redis = require('./')
var test = require('tape').test
var abstractTests = require('mqemitter/abstractTest.js')
abstractTests({
builder: redis
, test: test
builder: redis,
test: test
})
test('actual unsubscribe from Redis', function(t) {
function noop () {}
test('actual unsubscribe from Redis', function (t) {
var e = redis()
function noop() {}
e.subConn.on('message', function(topic, message) {
e.subConn.on('message', function (topic, message) {
t.fail('the message should not be emitted')
})
e.on('hello', noop)
e.removeListener('hello', noop)
e.emit({ topic: 'hello' }, function() {
e.close(function() {
e.emit({ topic: 'hello' }, function () {
e.close(function () {
t.end()
})
})