mqtt-log/index.js
2015-12-30 08:41:30 -05:00

129 lines
2.9 KiB
JavaScript

var http = require('http'),
nedb = require('nedb'),
aedes = require('aedes')(),
url = require('url'),
db = new nedb('mqtt.db'),
mqtt_port = process.env.MQTT_PORT || 1883,
http_port = process.env.HTTP_PORT || 8080;
var mqtt = require('net').createServer(aedes.handle);
mqtt.listen(mqtt_port);
aedes.on('client', function(client) {
db.insert({
topic: '/',
action: 'connect',
timestamp: new Date(),
message: client.id
});
});
aedes.on('clientDisconnect', function(client) {
db.insert({
topic: '/',
action: 'disconnect',
timestamp: new Date(),
message: client.id
});
});
aedes.on('subscribe', function(topic, client) {
db.insert({
topic: '/',
action: 'subscribe',
timestamp: new Date(),
message: client.id + ' ' + topic
});
});
aedes.on('unsubscribe', function(topic, client) {
db.insert({
topic: '/',
action: 'unsubscribe',
timestamp: new Date(),
message: client.id + ' ' + topic
});
});
aedes.on('publish', function(packet, client) {
if(! client) return;
packet.payloadString = packet.payload.toString();
packet.payloadLength = packet.payload.length;
packet.payload = JSON.stringify(packet.payload);
packet.timestamp = new Date();
db.insert(packet);
db.insert({
topic: '/',
action: 'publish',
timestamp: new Date(),
message: client.id + ' ' + packet.topic
});
});
var web = http.createServer(function(req, res) {
// Set CORS headers
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Access-Control-Allow-Methods', 'OPTIONS, GET');
res.setHeader('Access-Control-Allow-Headers', 'DNT,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type');
var topic = url.parse(req.url).pathname;
if(topic === '/')
loadIndex(req, res);
else
loadTopic(topic, req, res);
});
db.loadDatabase(function(err) {
web.listen(http_port);
console.log('listening on mqtt port %d and http port %d...', mqtt_port, http_port);
});
function loadIndex(req, res) {
db.find({topic: '/'}).sort({timestamp: -1}).exec(function(err, docs) {
if(err) {
res.writeHead(500, {'Content-Type': 'application/json'});
return res.end({error: err.toString()});
}
res.writeHead(200, {'Content-Type': 'text/html'});
res.write('<html><head></head><body><pre>')
docs.forEach(function(doc) {
res.write(doc.timestamp + ' ');
res.write(doc.action + ' ');
res.write(doc.message + '\n');
});
res.end('</pre></body></html>');
});
}
function loadTopic(topic, req, res) {
topic = topic.substring(1);
db.find({topic: topic}).sort({timestamp: -1}).exec(function(err, docs) {
if(err) {
res.writeHead(500, {'Content-Type': 'application/json'});
return res.end({error: err.toString()});
}
res.writeHead(200, {'Content-Type': 'application/json'});
res.end(JSON.stringify({count: docs.length, packets: docs}, null, 2));
});
}