diff --git a/contrib/nodejs/dynamic-peers/lib/cjdns b/contrib/nodejs/dynamic-peers/lib/cjdns new file mode 120000 index 000000000..4e9c59a58 --- /dev/null +++ b/contrib/nodejs/dynamic-peers/lib/cjdns @@ -0,0 +1 @@ +../../../../tools/lib \ No newline at end of file diff --git a/contrib/nodejs/dynamic-peers/lib/dynamic-peers.js b/contrib/nodejs/dynamic-peers/lib/dynamic-peers.js new file mode 100644 index 000000000..ecf76f19f --- /dev/null +++ b/contrib/nodejs/dynamic-peers/lib/dynamic-peers.js @@ -0,0 +1,92 @@ +var path = require('path'); +var os = require('os'); +var fs = require('fs'); +var dns = require('dns'); +var Promise = require('./promise'); + +var Cjdns = require('./cjdns/cjdnsadmin/cjdnsadmin'); + +var readFile = Promise.wrap(fs.readFile); +var resolve = Promise.wrap(dns.resolve); + +var conf_name = process.env.conf; +if(!conf_name) conf_name = "cjdns_dynamic.conf"; + +readFile +(path.join(os.homedir(),".config",conf_name)) +.then(JSON.parse) +.then(function (nodes) { + console.log("got nodes"); + Cjdns.connectWithAdminInfo(function(cjdns) { + console.log("connected"); + const peerStats = + Promise.wrap(cjdns.InterfaceController_peerStats); + + function link_up(node) { + // can't attach port until the DNS lookup + var address = node.address + ":" + node.port; + console.log("link up", address, node.publicKey); + cjdns.UDPInterface_beginConnection(node.publicKey, + address, + node.password, + function() { + } + ); + } + function lookup(node) { + if(node.name) { + resolve(node.name) + .then(function (addresses) { + node.address = addresses[0]; + link_up(node); + // no need to wait for callback? + },function(err) { + print(err); + link_up(node) + }); + } else { + link_up(node); + } + } + + function collectPeers(then) { + var peers = {}; + var count = 0; + function again(i) { + peerStats(i).then(function(ret) { + ret.peers.forEach(function (peer) { + if(peer.state != 'UNRESPONSIVE') { + if(!(peer.publicKey in peers)) + ++count; + peers[peer.publicKey] = peer; + } + }); + if (typeof(ret.more) !== 'undefined') { + again(i+1); + } else { + then(peers,count); + setTimeout(function() { + collectPeers(then); + },10000); + } + }); + } + again(0); + } + + for(var key in nodes) { + var node = nodes[key]; + node.publicKey = key; + } + collectPeers(function(peers,npeers) { + console.log("checking",npeers,"peers") + for(var key in nodes) { + if(key in peers) continue; + console.log("Peer not found",key,"poking."); + lookup(nodes[key]); + } + }); + }); +}); + + diff --git a/contrib/nodejs/dynamic-peers/lib/promise.js b/contrib/nodejs/dynamic-peers/lib/promise.js new file mode 100644 index 000000000..e0858719c --- /dev/null +++ b/contrib/nodejs/dynamic-peers/lib/promise.js @@ -0,0 +1,120 @@ +// because depending on acorn is dumb + +var sentinel = "unset"; + +const ACCEPT = true; +const REJECT = false; + +var Promise = function(callback) { + var result = sentinel; + var err = sentinel; + var stages = []; + function doit(res,mode) { + if(result !== sentinel) { + throw new Error("Don't call this twice"); + } + // don't you love tail recursion? + if(mode === ACCEPT) { + result = res; + } else { + err = res; + } + while(stages.length > 0) { + var stage = stages[0]; + try { + if(mode === ACCEPT) { + res = stage.ok(res); + stages.shift(); + } else { + while(!stage.err) { + if(stages.length == 0) + throw err; + stage = stages.shift(); + } + res = stage.err(err,res); + mode = ACCEPT; + stages.shift(); + } + } catch(e) { + mode = REJECT; + err = e; + stages.shift(); + if(!stages.length) + throw e; + } + } + } + function accept(res) { + return doit(res,ACCEPT); + } + function reject(e) { + return doit(e,REJECT); + } + callback(accept,reject); + return { + then: function(ok,notok) { + // if already set, don't bother with stages + if(err !== sentinel && notok) { + notok(err,result); + return this; + } + if(result !== sentinel) { + ok(result); + return this; + } + var stage = { + ok: ok, + err: notok + }; + stages.push(stage); + return this; + }, + catch: function(notok) { + return this.then(undefined,notok); + } + } +} + +module.exports = Promise; + +Promise.wrap = function(fn) { + return function() { + var that = this; + var args = Array.prototype.slice.call(arguments); + return new Promise(function(accept,reject) { + args.push(function(err, res) { + if(err) reject(err); + else accept(res); + }); + try { fn.apply(that, args); } + catch(e) { + reject(e); + } + }); + } +} + +Promise.all = function(promises) { + return new Promise(function(accept,reject) { + var results = new Array(promises.length); + var i = 0; + var count = 0; + for(var promise of promises) { + ++count; + promise.then(function(res) { + results[i] = res; + if(count == 0) { + accept(results); + } else { + --count; + } + },function(err) { + reject(err); + }); + } + }); +} + +Promise.success = new Promise(function(accept,reject) { + accept(true); +}); diff --git a/contrib/nodejs/dynamic-peers/lib/test-promise.js b/contrib/nodejs/dynamic-peers/lib/test-promise.js new file mode 100644 index 000000000..170bfe6f6 --- /dev/null +++ b/contrib/nodejs/dynamic-peers/lib/test-promise.js @@ -0,0 +1,42 @@ +var Promise = require('./promise'); +var fs = require('fs'); + +function dummyPromise() { + return new Promise(function(accept, reject) { + setTimeout(function() { + accept(0); + },1); + }); +} +var p = dummyPromise(); + +p.then(function herp(i) { + return i + 19; +}).then(function derp(i) { + return i + 23; +}).then(function done(i) { + if(i != 42) { + throw new Error("Fail"); + } + dummyPromise() + .then(function(i) { + throw new Error("failure"); + },function(e) { + console.log("should never get here!"); + }) + .then(function(i) { + console.log("or here!"); + },function(e) { + //console.log("got error",e); + throw new Error("something else went wrong"); + }) + .catch(function(e) { + //console.log("we caught e",e); + return 42; + }) + .then(function(res) { + if(42 != res) throw new Error("fail..."); + }); +}); + + diff --git a/contrib/nodejs/dynamic-peers/package.json b/contrib/nodejs/dynamic-peers/package.json new file mode 100644 index 000000000..13d065726 --- /dev/null +++ b/contrib/nodejs/dynamic-peers/package.json @@ -0,0 +1,7 @@ +{ + "name": "cjdns-dynamic-peers", + "version": "0.1.1", + "dependencies": { + "cjdns": "~0.17.2" + } +} diff --git a/contrib/python/cjdnsadmin/cjdnsadmin.py b/contrib/python/cjdnsadmin/cjdnsadmin.py index baa758917..494bbb7c2 100644 --- a/contrib/python/cjdnsadmin/cjdnsadmin.py +++ b/contrib/python/cjdnsadmin/cjdnsadmin.py @@ -28,276 +28,284 @@ class Session(): - """Current cjdns admin session""" + """Current cjdns admin session""" - def __init__(self, socket): - self.socket = socket - self.queue = Queue.Queue() - self.messages = {} + def __init__(self, socket): + self.socket = socket + self.queue = Queue.Queue() + self.messages = {} - def disconnect(self): - self.socket.close() + def disconnect(self): + self.socket.close() - def getMessage(self, txid): - # print self, txid - return _getMessage(self, txid) + def getMessage(self, txid): + # print self, txid + return _getMessage(self, txid) - def functions(self): - print(self._functions) + def functions(self): + print(self._functions) def _randomString(): - """Random string for message signing""" + """Random string for message signing""" - return ''.join( - random.choice(string.ascii_uppercase + string.digits) - for x in range(10)) + return ''.join( + random.choice(string.ascii_uppercase + string.digits) + for x in range(10)) def _callFunc(session, funcName, password, args): - """Call custom cjdns admin function""" - - txid = _randomString() - sock = session.socket - sock.send('d1:q6:cookie4:txid10:' + txid + 'e') - msg = _getMessage(session, txid) - cookie = msg['cookie'] - txid = _randomString() - req = { - 'q': funcName, - 'hash': hashlib.sha256(password + cookie).hexdigest(), - 'cookie': cookie, - 'args': args, - 'txid': txid - } - - if password: - req['aq'] = req['q'] - req['q'] = 'auth' - reqBenc = bencode(req) - req['hash'] = hashlib.sha256(reqBenc).hexdigest() - - reqBenc = bencode(req) - sock.send(reqBenc) - return _getMessage(session, txid) + """Call custom cjdns admin function""" + + txid = _randomString() + sock = session.socket + sock.send('d1:q6:cookie4:txid10:' + txid + 'e') + msg = _getMessage(session, txid) + cookie = msg['cookie'] + txid = _randomString() + req = { + 'q': funcName, + 'hash': hashlib.sha256(password + cookie).hexdigest(), + 'cookie': cookie, + 'args': args, + 'txid': txid + } + + if password: + req['aq'] = req['q'] + req['q'] = 'auth' + reqBenc = bencode(req) + req['hash'] = hashlib.sha256(reqBenc).hexdigest() + + reqBenc = bencode(req) + sock.send(reqBenc) + return _getMessage(session, txid) def _receiverThread(session): - """Receiving messages from cjdns admin server""" - - timeOfLastSend = time.time() - timeOfLastRecv = time.time() - try: - while True: - if (timeOfLastSend + KEEPALIVE_INTERVAL_SECONDS < time.time()): - if (timeOfLastRecv + 10 < time.time()): - raise Exception("ping timeout") - session.socket.send( - 'd1:q18:Admin_asyncEnabled4:txid8:keepalive') - timeOfLastSend = time.time() - - # Did we get data from the socket? - got_data = False - - while True: - # This can be interrupted and we need to loop it. - - try: - data = session.socket.recv(BUFFER_SIZE) - except (socket.timeout): - # Stop retrying, but note we have no data - break - except socket.error as e: - if e.errno != errno.EINTR: - # Forward errors that aren't being interrupted - raise - # Otherwise it was interrupted so we try again. - else: - # Don't try again, we got data - got_data = True - break - - if not got_data: - # Try asking again. - continue - - - try: - benc = bdecode(data) - except (KeyError, ValueError): - print("error decoding [" + data + "]") - continue - - if benc['txid'] == 'keepaliv': - if benc['asyncEnabled'] == 0: - raise Exception("lost session") - timeOfLastRecv = time.time() - else: - # print "putting to queue " + str(benc) - session.queue.put(benc) - - except KeyboardInterrupt: - print("interrupted") - import thread - thread.interrupt_main() - except Exception as e: - # Forward along any errors, before killing the thread. - session.queue.put(e) + """Receiving messages from cjdns admin server""" + + timeOfLastSend = time.time() + timeOfLastRecv = time.time() + try: + while True: + if (timeOfLastSend + KEEPALIVE_INTERVAL_SECONDS < time.time()): + if (timeOfLastRecv + 10 < time.time()): + raise Exception("ping timeout") + session.socket.send( + 'd1:q18:Admin_asyncEnabled4:txid8:keepalive') + timeOfLastSend = time.time() + + # Did we get data from the socket? + got_data = False + + while True: + # This can be interrupted and we need to loop it. + + try: + data = session.socket.recv(BUFFER_SIZE) + except (socket.timeout): + # Stop retrying, but note we have no data + break + except socket.error as e: + if e.errno != errno.EINTR: + # Forward errors that aren't being interrupted + raise + # Otherwise it was interrupted so we try again. + else: + # Don't try again, we got data + got_data = True + break + + if not got_data: + # Try asking again. + continue + + + try: + benc = bdecode(data) + except (KeyError, ValueError): + print("error decoding [" + data + "]") + continue + + if benc['txid'] == 'keepaliv': + if benc['asyncEnabled'] == 0: + raise Exception("lost session") + timeOfLastRecv = time.time() + else: + # print "putting to queue " + str(benc) + session.queue.put(benc) + + except KeyboardInterrupt: + print("interrupted") + import thread + thread.interrupt_main() + except Exception as e: + # Forward along any errors, before killing the thread. + session.queue.put(e) def _getMessage(session, txid): - """Getting message associated with txid""" - - while True: - if txid in session.messages: - msg = session.messages[txid] - del session.messages[txid] - return msg - else: - # print "getting from queue" - try: - # apparently any timeout at all allows the thread to be - # stopped but none make it unstoppable with ctrl+c - next = session.queue.get(timeout=100) - except Queue.Empty: - continue - - if isinstance(next, Exception): - # If the receiveing thread had an error, throw one here too. - raise next - - if 'txid' in next: - session.messages[next['txid']] = next - # print "adding message [" + str(next) + "]" - else: - print "message with no txid: " + str(next) - - -def _functionFabric(func_name, argList, oargList, password): - """Function fabric for Session class""" - - def functionHandler(self, *args, **kwargs): - call_args = {} - - for (key, value) in oargList.items(): - call_args[key] = value - - for i, arg in enumerate(argList): - if (i < len(args)): - call_args[arg] = args[i] - - for (key, value) in kwargs.items(): - call_args[key] = value - - return _callFunc(self, func_name, password, call_args) - - functionHandler.__name__ = func_name - return functionHandler + """Getting message associated with txid""" + + while True: + if txid in session.messages: + msg = session.messages[txid] + del session.messages[txid] + return msg + else: + # print "getting from queue" + try: + # apparently any timeout at all allows the thread to be + # stopped but none make it unstoppable with ctrl+c + next = session.queue.get(timeout=100) + except Queue.Empty: + continue + + if isinstance(next, Exception): + # If the receiveing thread had an error, throw one here too. + raise next + + if 'txid' in next: + session.messages[next['txid']] = next + # print "adding message [" + str(next) + "]" + else: + print "message with no txid: " + str(next) + + +def _functionFabric(func_name, argList, oargs, password): + """Function fabric for Session class""" + + def functionHandler(self, *args, **kwargs): + call_args = {} + + pos = 0 + for value in args: + if (pos < len(argList)): + call_args[argList[pos]] = value + pos += 1 + + for (key, value) in kwargs.items(): + if not key in oargs: + # probably a positional argument, given a keyword name + # that happens in python + if pos < len(argList) and argList[pos] == key: + call_args[argList[pos]] = value + pos += 1 + continue + else: + print("warning: not an argument to this function",func_name,key) + print(oargs) + # TODO: check oargs[key] type matches value + # warn, if doesn't + call_args[key] = value + + return _callFunc(self, func_name, password, call_args) + + functionHandler.__name__ = func_name + return functionHandler def connect(ipAddr, port, password): - """Connect to cjdns admin with this attributes""" - - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.connect((ipAddr, port)) - sock.settimeout(2) - - # Make sure it pongs. - sock.send('d1:q4:pinge') - data = sock.recv(BUFFER_SIZE) - if (not data.endswith('1:q4:ponge')): - raise Exception( - "Looks like " + ipAddr + ":" + str(port) + - " is to a non-cjdns socket.") - - # Get the functions and make the object - page = 0 - availableFunctions = {} - while True: - sock.send( - 'd1:q24:Admin_availableFunctions4:argsd4:pagei' + - str(page) + 'eee') - data = sock.recv(BUFFER_SIZE) - benc = bdecode(data) - for func in benc['availableFunctions']: - availableFunctions[func] = benc['availableFunctions'][func] - if (not 'more' in benc): - break - page = page+1 - - funcArgs = {} - funcOargs = {} - - for (i, func) in availableFunctions.items(): - items = func.items() - - # grab all the required args first - # append all the optional args - rargList = [arg for arg,atts in items if atts['required']] - argList = rargList + [arg for arg,atts in items if not atts['required']] - - # for each optional arg setup a default value with - # a type which will be ignored by the core. - oargList = {} - for (arg,atts) in items: - if not atts['required']: - oargList[arg] = ( - "''" if (func[arg]['type'] == 'Int') - else "0") - - setattr(Session, i, _functionFabric( - i, argList, oargList, password)) - - funcArgs[i] = rargList - funcOargs[i] = oargList - - session = Session(sock) - - kat = threading.Thread(target=_receiverThread, args=[session]) - kat.setDaemon(True) - kat.start() - - # Check our password. - ret = _callFunc(session, "ping", password, {}) - if ('error' in ret): - raise Exception( - "Connect failed, incorrect admin password?\n" + str(ret)) - - session._functions = "" - - funcOargs_c = {} - for func in funcOargs: - funcOargs_c[func] = list( - [key + "=" + str(value) - for (key, value) in funcOargs[func].items()]) - - for func in availableFunctions: - session._functions += ( - func + "(" + ', '.join(funcArgs[func] + funcOargs_c[func]) + ")\n") - - # print session.functions - return session + """Connect to cjdns admin with this attributes""" + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.connect((ipAddr, port)) + sock.settimeout(2) + + # Make sure it pongs. + sock.send('d1:q4:pinge') + data = sock.recv(BUFFER_SIZE) + if (not data.endswith('1:q4:ponge')): + raise Exception( + "Looks like " + ipAddr + ":" + str(port) + + " is to a non-cjdns socket.") + + # Get the functions and make the object + page = 0 + availableFunctions = {} + while True: + sock.send( + 'd1:q24:Admin_availableFunctions4:argsd4:pagei' + + str(page) + 'eee') + data = sock.recv(BUFFER_SIZE) + benc = bdecode(data) + for func in benc['availableFunctions']: + availableFunctions[func] = benc['availableFunctions'][func] + if (not 'more' in benc): + break + page = page+1 + + funcArgs = {} + funcOargs = {} + + for (i, func) in availableFunctions.items(): + items = func.items() + + # required args + argList = [] + # optional args + oargs = {} + + for (arg,atts) in items: + if atts['required']: + argList.append(arg) + else: + oargs[arg] = atts['type'] + + setattr(Session, i, _functionFabric( + i, argList, oargs, password)) + + funcArgs[i] = argList + funcOargs[i] = oargs + + session = Session(sock) + + kat = threading.Thread(target=_receiverThread, args=[session]) + kat.setDaemon(True) + kat.start() + + # Check our password. + ret = _callFunc(session, "ping", password, {}) + if ('error' in ret): + raise Exception( + "Connect failed, incorrect admin password?\n" + str(ret)) + + session._functions = "" + + funcOargs_c = {} + for func in funcOargs: + funcOargs_c[func] = list( + [key + "=" + str(value) + for (key, value) in funcOargs[func].items()]) + + for func in availableFunctions: + session._functions += ( + func + "(" + ', '.join(funcArgs[func] + funcOargs_c[func]) + ")\n") + + # print session.functions + return session def connectWithAdminInfo(path = None): - """Connect to cjdns admin with data from user file""" - - if path is None: - path = os.path.expanduser('~/.cjdnsadmin') - try: - with open(path, 'r') as adminInfo: - data = json.load(adminInfo) - except IOError: - sys.stderr.write("""Please create a file named .cjdnsadmin in your + """Connect to cjdns admin with data from user file""" + + if path is None: + path = os.path.expanduser('~/.cjdnsadmin') + try: + with open(path, 'r') as adminInfo: + data = json.load(adminInfo) + except IOError: + sys.stderr.write("""Please create a file named .cjdnsadmin in your home directory with ip, port, and password of your cjdns engine in json. for example: { - "addr": "127.0.0.1", - "port": 11234, - "password": "You tell me! (Search in ~/cjdroute.conf)" + "addr": "127.0.0.1", + "port": 11234, + "password": "You tell me! (Search in ~/cjdroute.conf)" } """) - raise + raise - return connect(data['addr'], data['port'], data['password']) + return connect(data['addr'], data['port'], data['password']) diff --git a/contrib/python/dynamicEndpoints.py b/contrib/python/dynamicEndpoints.py index 46f8eebb7..a54faf2b6 100755 --- a/contrib/python/dynamicEndpoints.py +++ b/contrib/python/dynamicEndpoints.py @@ -114,11 +114,6 @@ def __init__(self, cjdns, configuration): # unresponsive. self.unresponsive = dict() - # Holds a cjdns log message subscription to messages about unresponsive - # nodes. - self.sub = self.cjdns.AdminLog_subscribe(MESSAGE_LINE, MESSAGE_FILE, - 'DEBUG') - # Add nodes from the given ConfigParser parser. for section in configuration.sections(): # Each section is named with a node key, and contains a @@ -130,16 +125,21 @@ def __init__(self, cjdns, configuration): # Add the node self.addNode(peerHostname, peerPort, peerPassword, section) + # Add all the nodes we're supposed to watch. + for node in self.nodes.values(): + self.lookup(node) + logging.info("{} peers added!".format(len(self.nodes))) + # Holds a cjdns log message subscription to messages about unresponsive + # nodes. + self.sub = self.cjdns.AdminLog_subscribe(MESSAGE_LINE, MESSAGE_FILE, + 'DEBUG') + if self.sub['error'] == 'none': # We successfully subscribed to messages. # When we die, try to unsubscribe atexit.register(self.stop) - # Add all the nodes we're supposed to watch. - for node in self.nodes.values(): - self.lookup(node) - logging.info("{} peers added!".format(len(self.nodes))) else: logging.error(self.sub) @@ -346,7 +346,7 @@ def main(argv): # Announce we dropped privs logging.info("Dropped privileges: running as {}:{}".format( pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0])) - except OSError: + except (OSError,KeyError): # Complain we couldn't drop privs right logging.warning("Could not drop privileges: running as {}:{}".format( pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0]))