diff --git a/clients/nodejs/modules/JsonRpcServer.js b/clients/nodejs/modules/JsonRpcServer.js index 7acbad223..cb1ff75a9 100644 --- a/clients/nodejs/modules/JsonRpcServer.js +++ b/clients/nodejs/modules/JsonRpcServer.js @@ -5,6 +5,8 @@ const JSON5 = require('json5'); const btoa = require('btoa'); const NodeUtils = require('./NodeUtils.js'); const Nimiq = require('../../../dist/node.js'); +const WebSocket = require('ws'); + class JsonRpcServer { /** @@ -22,7 +24,7 @@ class JsonRpcServer { if (typeof config.allowip === 'string') config.allowip = [config.allowip]; if (!config.allowip) config.allowip = []; - http.createServer((req, res) => { + const server = http.createServer((req, res) => { // Block requests that might originate from a website in the users browser, // unless the origin is explicitly whitelisted. if (config.corsdomain.includes(req.headers.origin)) { @@ -61,8 +63,17 @@ class JsonRpcServer { res.writeHead(200); res.end(); } - }).listen(config.port, config.allowip.length ? '0.0.0.0' : '127.0.0.1'); + }); + server.listen(config.port, config.allowip.length ? '0.0.0.0' : '127.0.0.1'); + + this._ws = new WebSocket.Server({ server: server, path: '/' }); + //Handle connections + this._ws.on('connection', (ws) =>{ + this._onWebsocketConnection(ws) + }); + + Nimiq.Log.i(JsonRpcServer, `RPC server listening on ${config.port}`); /** @type {Map.} */ this._methods = new Map(); @@ -137,6 +148,49 @@ class JsonRpcServer { this._methods.set('constant', this.constant.bind(this)); this._methods.set('log', this.log.bind(this)); + const $ = {}; + $.blockchain = consensus.blockchain; + $.accounts = $.blockchain.accounts; + $.mempool = consensus.mempool; + $.network = consensus.network; + + // $.network.on('*', (event, data) => { + // this.emitEvent(event, data); + // }); + + $.blockchain.on('*', async (event, ...data) => { + switch(event){ + case 'head-changed': + data = await this._blockToObj(data[0], true); + break; + case 'block': + data = data[0].toHex(); + break; + case 'rebranched': + data = { + revertBlocks: data[0].map((e) => this._blockToObj(e)), + forkBlocks: data[0].map((e) => this._blockToObj(e)), + blockHash: data[2].toHex() + }; + break; + case 'extended': + data = {height: data[0].height}; + break; + } + this.emitEvent('blockchain/'+event, data); + }); + + + $.mempool.on('*', async (event, data) => { + switch(event){ + case 'transaction-added': + case 'transaction-removed': + data = await this._transactionToObj(data); + break; + } + this.emitEvent('mempool/'+event, data); + }); + // Apply method whitelist if configured. if (this._config.methods && this._config.methods.length > 0) { const whitelist = new Set(this._config.methods); @@ -148,6 +202,18 @@ class JsonRpcServer { } } + emitEvent(event, data){ + Nimiq.Log.i(JsonRpcServer,` Emitting Event: ${event}`); + this._ws.clients.forEach(function each(client) { + if (client.readyState === WebSocket.OPEN) { + client.send(JSON.stringify({ + event, + data + })); + } + }); + } + /** * @param req * @param res @@ -165,7 +231,6 @@ class JsonRpcServer { return true; } - /* * Network */ @@ -771,10 +836,10 @@ class JsonRpcServer { timestamp: block ? block.timestamp : undefined, confirmations: block ? (await this._client.getHeadHeight()) - block.height + 1 : 0, transactionIndex: i, - from: tx.sender.toHex(), - fromAddress: tx.sender.toUserFriendlyAddress(), - to: tx.recipient.toHex(), - toAddress: tx.recipient.toUserFriendlyAddress(), + from: tx.sender ? tx.sender.toHex() : undefined, + fromAddress: tx.sender ? tx.sender.toUserFriendlyAddress() : undefined, + to: tx.recipient ? tx.recipient.toHex() : undefined, + toAddress: tx.recipient ? tx.recipient.toUserFriendlyAddress() : undefined, value: tx.value, fee: tx.fee, data: Nimiq.BufferUtils.toHex(tx.data) || null, @@ -830,6 +895,38 @@ class JsonRpcServer { }; } + + _onWebsocketConnection(ws) { + Nimiq.Log.i(JsonRpcServer, 'A websocket client has connected'); + const that = this; + ws.on('message', async (data) => { + let single = false; + let body = []; + try { + body = JSON.parse(data); + single = !(body instanceof Array); + } catch (e) { + body = null; + } + if (!body || body.length > 100) { + ws.send(JSON.stringify({ + 'jsonrpc': '2.0', + 'error': {'code': -32600, 'message': 'Invalid Request'}, + 'id': null + })); + } + if (single) { + body = [body]; + } + const result = await that.handleJson(body); + if (single && result.length === 1) { + ws.send(JSON.stringify(result[0])); + } else if (!single) { + ws.send(JSON.stringify(result)); + } + }); + } + _onRequest(req, res) { let body = []; req.on('data', (chunk) => { @@ -854,40 +951,10 @@ class JsonRpcServer { if (single) { body = [body]; } + res.writeHead(200); - const result = []; - for (const msg of body) { - if (!msg || msg.jsonrpc !== '2.0' || !msg.method) { - result.push({ - 'jsonrpc': '2.0', - 'error': {'code': -32600, 'message': 'Invalid Request'}, - 'id': msg ? msg.id : null - }); - continue; - } - if (!this._methods.has(msg.method)) { - Nimiq.Log.w(JsonRpcServer, 'Unknown method called', msg.method); - result.push({ - 'jsonrpc': '2.0', - 'error': {'code': -32601, 'message': 'Method not found'}, - 'id': msg.id - }); - continue; - } - try { - const methodRes = await this._methods.get(msg.method).apply(null, msg.params instanceof Array ? msg.params : [msg.params]); - if (typeof msg.id === 'string' || Number.isInteger(msg.id)) { - result.push({'jsonrpc': '2.0', 'result': methodRes, 'id': msg.id}); - } - } catch (e) { - Nimiq.Log.d(JsonRpcServer, e.stack); - result.push({ - 'jsonrpc': '2.0', - 'error': {'code': e.code || 1, 'message': e.message || e.toString()}, - 'id': msg.id - }); - } - } + const result = await this.handleJson(body); + if (single && result.length === 1) { res.write(JSON.stringify(result[0])); } else if (!single) { @@ -895,6 +962,45 @@ class JsonRpcServer { } res.end("\r\n"); }); + + + } + + async handleJson(data) { + const result = []; + for (const msg of data) { + if (!msg || msg.jsonrpc !== '2.0' || !msg.method) { + result.push({ + 'jsonrpc': '2.0', + 'error': {'code': -32600, 'message': 'Invalid Request'}, + 'id': msg ? msg.id : null + }); + continue; + } + if (!this._methods.has(msg.method)) { + Nimiq.Log.w(JsonRpcServer, 'Unknown method called', msg.method); + result.push({ + 'jsonrpc': '2.0', + 'error': {'code': -32601, 'message': 'Method not found'}, + 'id': msg.id + }); + continue; + } + try { + const methodRes = await this._methods.get(msg.method).apply(null, msg.params instanceof Array ? msg.params : [msg.params]); + if (typeof msg.id === 'string' || Number.isInteger(msg.id)) { + result.push({'jsonrpc': '2.0', 'result': methodRes, 'id': msg.id}); + } + } catch (e) { + Nimiq.Log.d(JsonRpcServer, e.stack); + result.push({ + 'jsonrpc': '2.0', + 'error': {'code': e.code || 1, 'message': e.message || e.toString()}, + 'id': msg.id + }); + } + } + return result; } }