Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 35 additions & 63 deletions src/worker/tasks/nonceResyncWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
} from "../../db/wallets/walletNonce";
import { getConfig } from "../../utils/cache/getConfig";
import { getChain } from "../../utils/chain";
import { prettifyError } from "../../utils/error";
import { logger } from "../../utils/logger";
import { redis } from "../../utils/redis/redis";
import { thirdwebClient } from "../../utils/sdk";
Expand Down Expand Up @@ -40,78 +41,49 @@ export const initNonceResyncWorker = async () => {
*/
const handler: Processor<any, void, string> = async (job: Job<string>) => {
const sentNoncesKeys = await redis.keys("nonce-sent*");
job.log(`Found ${sentNoncesKeys.length} nonce-sent* keys`);
if (sentNoncesKeys.length === 0) {
job.log("No active wallets.");
return;
}

for (const sentNonceKey of sentNoncesKeys) {
const { chainId, walletAddress } = splitSentNoncesKey(sentNonceKey);

const rpcRequest = getRpcClient({
client: thirdwebClient,
chain: await getChain(chainId),
});
try {
const { chainId, walletAddress } = splitSentNoncesKey(sentNonceKey);

const [transactionCount, lastUsedNonceDb] = await Promise.all([
eth_getTransactionCount(rpcRequest, {
address: walletAddress,
blockTag: "latest",
}),
inspectNonce(chainId, walletAddress),
]);

if (Number.isNaN(transactionCount)) {
job.log(
`Received invalid onchain transaction count for ${walletAddress}: ${transactionCount}`,
);
logger({
level: "error",
message: `[nonceResyncWorker] Received invalid onchain transaction count for ${walletAddress}: ${transactionCount}`,
service: "worker",
const rpcRequest = getRpcClient({
client: thirdwebClient,
chain: await getChain(chainId),
});
continue;
}
const lastUsedNonceOnchain =
(await eth_getTransactionCount(rpcRequest, {
address: walletAddress,
blockTag: "latest",
})) - 1;
const lastUsedNonceDb = await inspectNonce(chainId, walletAddress);

const lastUsedNonceOnchain = transactionCount - 1;

job.log(
`${walletAddress} last used onchain nonce: ${lastUsedNonceOnchain} and last used db nonce: ${lastUsedNonceDb}`,
);
logger({
level: "debug",
message: `[nonceResyncWorker] last used onchain nonce: ${transactionCount} and last used db nonce: ${lastUsedNonceDb}`,
service: "worker",
});

// If the last used nonce onchain is the same as or ahead of the last used nonce in the db,
// There is no need to resync the nonce.
if (lastUsedNonceOnchain >= lastUsedNonceDb) {
job.log(`No need to resync nonce for ${walletAddress}`);
logger({
level: "debug",
message: `[nonceResyncWorker] No need to resync nonce for ${walletAddress}`,
service: "worker",
});
continue;
}
// Recycle all nonces between (onchain nonce, db nonce] if they aren't in-flight ("sent nonce").
const recycled: number[] = [];
for (
let nonce = lastUsedNonceOnchain + 1;
nonce <= lastUsedNonceDb;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only intended behavioral change: This worker should try to cancel up to lastUsedNonceDb inclusive, in case the most recent nonce is dropped/missing somehow.

nonce++
) {
const exists = await isSentNonce(chainId, walletAddress, nonce);
if (!exists) {
await recycleNonce(chainId, walletAddress, nonce);
recycled.push(nonce);
}
}

// for each nonce between last used db nonce and last used onchain nonce
// check if nonce exists in nonce-sent set
// if it does not exist, recycle it
for (
let _nonce = lastUsedNonceOnchain + 1;
_nonce < lastUsedNonceDb;
_nonce++
) {
const exists = await isSentNonce(chainId, walletAddress, _nonce);
const message = `wallet=${chainId}:${walletAddress} lastUsedNonceOnchain=${lastUsedNonceOnchain} lastUsedNonceDb=${lastUsedNonceDb}, recycled=${recycled.join(",")}`;
job.log(message);
logger({ level: "debug", service: "worker", message });
} catch (error) {
logger({
level: "debug",
message: `[nonceResyncWorker] nonce ${_nonce} exists in nonce-sent set: ${exists}`,
level: "error",
message: `[nonceResyncWorker] ${prettifyError(error)}`,
service: "worker",
});

// If nonce does not exist in nonce-sent set, recycle it
if (!exists) {
await recycleNonce(chainId, walletAddress, _nonce);
}
}
}
};
Loading