diff --git a/hive-interface.js b/hive-interface.js index 6984bbe..fda1c7f 100644 --- a/hive-interface.js +++ b/hive-interface.js @@ -40,7 +40,7 @@ class Hive { return new Promise(async (resolve, reject) => { try { resolve(await this.rpcCall(client => client.rc.getRCMana(account_name), 'get_rc_mana', [account_name])); - } catch(err) { + } catch(err) { if(!utils.isTxError(err)) utils.log(`All nodes failed making API call [rc_api.get_rc_mana].`, 1, 'Red'); @@ -53,7 +53,7 @@ class Hive { return new Promise(async (resolve, reject) => { try { resolve(await this.rpcCall(client => client.call(api, method_name, params), method_name, params)); - } catch(err) { + } catch(err) { if(!utils.isTxError(err)) utils.log(`All nodes failed making API call [${api}.${method_name}].`, 1, 'Red'); @@ -66,7 +66,7 @@ class Hive { return new Promise(async (resolve, reject) => { try { resolve(await this.rpcCall(client => client.database.call(method_name, params), method_name, params)); - } catch(err) { + } catch(err) { if(!utils.isTxError(err)) utils.log(`All nodes failed making API call [${method_name}].`, 1, 'Red'); @@ -82,7 +82,7 @@ class Hive { } catch(err) { if(!utils.isTxError(err)) utils.log(`All nodes failed broadcasting operation [${method_name}].`, 1, 'Red'); - + reject(err); } }); @@ -149,7 +149,7 @@ class Hive { try { return resolve(await call(client)); - } catch(err) { + } catch(err) { err.is_tx_error = utils.isTxError(err); if(err.is_tx_error) return reject(err); @@ -168,7 +168,7 @@ class Hive { updateClientErrors(client) { // Check if the client has had errors within the last 10 minutes if(client.last_error_date && client.last_error_date > Date.now() - 10 * 60 * 1000) - client.errors++; + client.errors++; else client.errors = 1; @@ -193,7 +193,7 @@ class Hive { // Left for backwards compatibility async custom_json(id, json, account, key, use_active) { var data = { - id, + id, json: JSON.stringify(json), required_auths: use_active ? [account] : [], required_posting_auths: use_active ? [] : [account] @@ -216,12 +216,12 @@ class Hive { // Left for backwards compatibility async customJsonNoQueue(id, json, account, key, use_active) { - + if(this.checkAccountUsageLimit(account)) return this.custom_json(id, json, account, key, use_active); var data = { - id: id, + id: id, json: JSON.stringify(json), required_auths: use_active ? [account] : [], required_posting_auths: use_active ? [] : [account] @@ -418,21 +418,27 @@ class Hive { }); } + getAccountUsage(account) { + return this.accounts_used.has(account) + ? this.accounts_used.get(account) + : 0 + } + + setAccountUsage(account,count) { + this.accounts_used.set(account,count) + } + // Check if you have already broadcast 4 transactions from an account, in which case return true. // Otherwise, increment the count and return false. checkAccountUsageLimit(account) { - if(this.accounts_used.has(account)) { - let count = this.accounts_used.get(account); - if(4 <= count) - return true; - else - this.accounts_used.set(account, count++); + const count = this.getAccountUsage(account) + + if(count <= 4) { + this.setAccountUsage(account,count+1) + return true } - else - { - this.accounts_used.set(account, 1); - } - return false; + + return false; } async queueTx(data, key, tx_call) { @@ -440,32 +446,35 @@ class Hive { } async processTxQueue() { - this.clear_ctr++; - // Every 3 seconds, or one block, clear the transaction counts - if(this.clear_ctr >= 3) - { - for (const key of this.accounts_used.keys()) { - this.accounts_used.set(key, 0); - } - this.clear_ctr = 0; + if( ++this.clear_ctr >= 3 ) { + // Every 3 seconds, or one block, clear the transaction usage map + this.accounts_used = new Map() + this.clear_ctr = 0 } - // If the queue is empty, exit here - if(this.tx_queue.length <= 0) return; - let exit = false; - let next_account = this.tx_queue[0].data.required_auths.length > 0 ? this.tx_queue[0].data.required_auths[0] : this.tx_queue[0].data.required_posting_auths[0]; - exit = this.checkAccountUsageLimit(next_account); - - while(exit == false) - { - const item = this.tx_queue.shift(); - utils.log(`Processing queue item ${item.data.id}`, 3); - - item.tx_call(item.data, item.key); - if(this.tx_queue.length <= 0) exit = true; - else { - next_account = this.tx_queue[0].data.required_auths.length > 0 ? this.tx_queue[0].data.required_auths[0] : this.tx_queue[0].data.required_posting_auths[0]; - exit = this.checkAccountUsageLimit(next_account); + + const requeuedItems = [] + + // If the queue contains items, then keep processing them + while( this.tx_queue.length > 0 ) { + const item = this.tx_queue.shift(); + + // Use required_auth if specified, fall back to required_posting_auths + const account = item.data.required_auths[0] || item.data.required_posting_auths[0]; + + if( !this.checkAccountUsageLimit(account) ) { + // if this account has reached its limit, then + // requeue item and wait for next tick + utils.log(`Account usage limit reached for ${account}`, 3); + requeuedItems.push(item) } + + utils.log(`Processing queue item ${item.data.id}`, 3); + item.tx_call(item.data, item.key); + } + + if( requeuedItems.length > 0 ) { + // add requeued items back to the front of the queue for next run + this.tx_queue.unshift(...requeuedItems) } } }