diff --git a/src/active/context.cpp b/src/active/context.cpp index dcc00b2f7ca2..831a1b4ccc67 100644 --- a/src/active/context.cpp +++ b/src/active/context.cpp @@ -35,7 +35,7 @@ ActiveContext::ActiveContext(CBLSWorker& bls_worker, ChainstateManager& chainman m_isman{isman}, m_qman{qman}, nodeman{std::make_unique(connman, dmnman, operator_sk)}, - dkgdbgman{std::make_unique()}, + dkgdbgman{std::make_unique(dmnman, qsnapman, chainman)}, qdkgsman{std::make_unique(dmnman, qsnapman, chainman, sporkman, db_params, quorums_watch)}, shareman{std::make_unique(connman, chainman, sigman, *nodeman, qman, sporkman)}, gov_signer{std::make_unique(connman, dmnman, govman, *nodeman, chainman, mn_sync)}, @@ -63,9 +63,9 @@ ActiveContext::~ActiveContext() m_isman.DisconnectSigner(); } -void ActiveContext::Start(CConnman& connman, PeerManager& peerman) +void ActiveContext::Start(CConnman& connman, PeerManager& peerman, int16_t worker_count) { - qman_handler->Start(); + qman_handler->Start(worker_count); qdkgsman->StartThreads(connman, peerman); cl_signer->Start(); cl_signer->RegisterRecoveryInterface(); diff --git a/src/active/context.h b/src/active/context.h index e6b90f4b3532..c93690c4bc4a 100644 --- a/src/active/context.h +++ b/src/active/context.h @@ -72,7 +72,7 @@ struct ActiveContext final : public CValidationInterface { bool quorums_recovery, bool quorums_watch); ~ActiveContext(); - void Start(CConnman& connman, PeerManager& peerman); + void Start(CConnman& connman, PeerManager& peerman, int16_t worker_count); void Stop(); CCoinJoinServer& GetCJServer() const; diff --git a/src/active/dkgsession.cpp b/src/active/dkgsession.cpp index 15eb449ece7f..9cdcc3981c81 100644 --- a/src/active/dkgsession.cpp +++ b/src/active/dkgsession.cpp @@ -37,14 +37,14 @@ ActiveDKGSession::~ActiveDKGSession() = default; void ActiveDKGSession::Contribute(CDKGPendingMessages& pendingMessages, PeerManager& peerman) { - CDKGLogger logger(*this, __func__, __LINE__); - if (!AreWeMember()) { return; } assert(params.threshold > 1); // we should not get there with single-node-quorums + CDKGLogger logger(*this, __func__, __LINE__); + cxxtimer::Timer t1(true); logger.Batch("generating contributions"); if (!blsWorker.GenerateContributions(params.threshold, memberIds, vvecContribution, m_sk_contributions)) { @@ -304,7 +304,7 @@ void ActiveDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages, Pe CDKGLogger logger(*this, __func__, __LINE__); - std::set justifyFor; + Uint256HashSet justifyFor; for (const auto& m : members) { if (m->bad) { @@ -338,7 +338,7 @@ void ActiveDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages, Pe } void ActiveDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, - const std::set& forMembers) + const Uint256HashSet& forMembers) { CDKGLogger logger(*this, __func__, __LINE__); diff --git a/src/active/dkgsession.h b/src/active/dkgsession.h index 412d69e7d84b..3350a06c6b2d 100644 --- a/src/active/dkgsession.h +++ b/src/active/dkgsession.h @@ -43,7 +43,7 @@ class ActiveDKGSession final : public llmq::CDKGSession void VerifyAndJustify(CDKGPendingMessages& pendingMessages, PeerManager& peerman) override EXCLUSIVE_LOCKS_REQUIRED(!invCs); void SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, - const std::set& forMembers) override; + const Uint256HashSet& forMembers) override; // Phase 4: commit void VerifyAndCommit(CDKGPendingMessages& pendingMessages, PeerManager& peerman) override; diff --git a/src/active/dkgsessionhandler.cpp b/src/active/dkgsessionhandler.cpp index 1c74095ad7d2..6cafa09979d9 100644 --- a/src/active/dkgsessionhandler.cpp +++ b/src/active/dkgsessionhandler.cpp @@ -251,19 +251,19 @@ void ActiveDKGSessionHandler::HandlePhase(QuorumPhase curPhase, QuorumPhase next // returns a set of NodeIds which sent invalid messages template -std::set BatchVerifyMessageSigs(CDKGSession& session, const std::vector>>& messages) +std::unordered_set BatchVerifyMessageSigs(CDKGSession& session, const std::vector>>& messages) { if (messages.empty()) { return {}; } - std::set ret; + std::unordered_set ret; bool revertToSingleVerification = false; CBLSSignature aggSig; std::vector pubKeys; std::vector messageHashes; - std::set messageHashesSet; + Uint256HashSet messageHashesSet; pubKeys.reserve(messages.size()); messageHashes.reserve(messages.size()); bool first = true; diff --git a/src/bench/bls.cpp b/src/bench/bls.cpp index fef358d4d4b4..3384b710a577 100644 --- a/src/bench/bls.cpp +++ b/src/bench/bls.cpp @@ -3,7 +3,10 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include + #include +#include + #include #include @@ -321,7 +324,7 @@ static void BLS_Verify_BatchedParallel(benchmark::Bench& bench) }; CBLSWorker blsWorker; - blsWorker.Start(); + blsWorker.Start(llmq::DEFAULT_WORKER_COUNT); // Benchmark. bench.minEpochIterations(bench.output() ? 1000 : 1).run([&] { diff --git a/src/bench/bls_dkg.cpp b/src/bench/bls_dkg.cpp index 574658dbfda5..cd9367b086d4 100644 --- a/src/bench/bls_dkg.cpp +++ b/src/bench/bls_dkg.cpp @@ -3,10 +3,13 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include + #include -#include +#include #include +#include + struct Member { CBLSId id; @@ -67,7 +70,7 @@ class DKG ids.emplace_back(id); } - blsWorker.Start(); + blsWorker.Start(llmq::DEFAULT_WORKER_COUNT); for (auto& member : members) { blsWorker.GenerateContributions(quorumSize / 2 + 1, ids, member.vvec, member.skShares); } @@ -110,7 +113,7 @@ class DKG static void BLSDKG_GenerateContributions(benchmark::Bench& bench, uint32_t epoch_iters, int quorumSize) { CBLSWorker blsWorker; - blsWorker.Start(); + blsWorker.Start(llmq::DEFAULT_WORKER_COUNT); std::vector ids; std::vector members; if (!bench.output()) { diff --git a/src/bls/bls_worker.cpp b/src/bls/bls_worker.cpp index 3191e95338e4..f8d5f607e418 100644 --- a/src/bls/bls_worker.cpp +++ b/src/bls/bls_worker.cpp @@ -56,11 +56,10 @@ CBLSWorker::~CBLSWorker() Stop(); } -void CBLSWorker::Start() +void CBLSWorker::Start(int16_t worker_count) { - int workerCount = std::thread::hardware_concurrency() / 2; - workerCount = std::clamp(workerCount, 1, 4); - workerPool.resize(workerCount); + assert(worker_count > 0); + workerPool.resize(worker_count); RenameThreadPool(workerPool, "bls-work"); } diff --git a/src/bls/bls_worker.h b/src/bls/bls_worker.h index a5ba5d37bbb4..529c9427baad 100644 --- a/src/bls/bls_worker.h +++ b/src/bls/bls_worker.h @@ -54,7 +54,7 @@ class CBLSWorker CBLSWorker(); ~CBLSWorker(); - void Start(); + void Start(int16_t worker_count); void Stop(); bool GenerateContributions(int threshold, Span ids, BLSVerificationVectorPtr& vvecRet, std::vector& skSharesRet); diff --git a/src/init.cpp b/src/init.cpp index fdb3e5af3998..5c2a0bcf0ec2 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -2018,6 +2018,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // Subtract 1 because the main thread counts towards the par threads return std::clamp(threads - 1, 0, llmq::MAX_BLSCHECK_THREADS); }(), + llmq::DEFAULT_WORKER_COUNT, args.GetIntArg("-maxrecsigsage", llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE), /*shutdown_requested=*/ShutdownRequested, /*coins_error_cb=*/[]() { @@ -2321,7 +2322,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.peerman->StartHandlers(); node.clhandler->Start(); - if (node.observer_ctx) node.observer_ctx->Start(); + if (node.observer_ctx) node.observer_ctx->Start(llmq::DEFAULT_WORKER_COUNT); node.scheduler->scheduleEvery(std::bind(&CNetFulfilledRequestManager::DoMaintenance, std::ref(*node.netfulfilledman)), std::chrono::minutes{1}); node.scheduler->scheduleEvery(std::bind(&CMasternodeUtils::DoMaintenance, std::ref(*node.connman), std::ref(*node.dmnman), std::ref(*node.mn_sync), node.cj_walletman.get()), std::chrono::minutes{1}); @@ -2329,7 +2330,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.peerman->ScheduleHandlers(*node.scheduler); if (node.active_ctx) { - node.active_ctx->Start(*node.connman, *node.peerman); + node.active_ctx->Start(*node.connman, *node.peerman, llmq::DEFAULT_WORKER_COUNT); node.scheduler->scheduleEvery(std::bind(&llmq::CDKGSessionManager::CleanupOldContributions, std::ref(*node.active_ctx->qdkgsman)), std::chrono::hours{1}); } diff --git a/src/llmq/context.cpp b/src/llmq/context.cpp index cc43675a29ef..f617c2f77342 100644 --- a/src/llmq/context.cpp +++ b/src/llmq/context.cpp @@ -15,7 +15,7 @@ LLMQContext::LLMQContext(CDeterministicMNManager& dmnman, CEvoDB& evo_db, CSporkManager& sporkman, chainlock::Chainlocks& chainlocks, CTxMemPool& mempool, ChainstateManager& chainman, const CMasternodeSync& mn_sync, const util::DbWrapperParams& db_params, int8_t bls_threads, - int64_t max_recsigs_age) : + int16_t worker_count, int64_t max_recsigs_age) : bls_worker{std::make_shared()}, qsnapman{std::make_unique(evo_db)}, quorum_block_processor{std::make_unique(chainman.ActiveChainstate(), dmnman, evo_db, @@ -27,7 +27,7 @@ LLMQContext::LLMQContext(CDeterministicMNManager& dmnman, CEvoDB& evo_db, CSpork mempool, mn_sync, db_params)} { // Have to start it early to let VerifyDB check ChainLock signatures in coinbase - bls_worker->Start(); + bls_worker->Start(worker_count); } LLMQContext::~LLMQContext() diff --git a/src/llmq/context.h b/src/llmq/context.h index 677d5b6a76c9..5f8a4ea65378 100644 --- a/src/llmq/context.h +++ b/src/llmq/context.h @@ -41,7 +41,7 @@ struct LLMQContext { explicit LLMQContext(CDeterministicMNManager& dmnman, CEvoDB& evo_db, CSporkManager& sporkman, chainlock::Chainlocks& chainlocks, CTxMemPool& mempool, ChainstateManager& chainman, const CMasternodeSync& mn_sync, const util::DbWrapperParams& db_params, int8_t bls_threads, - int64_t max_recsigs_age); + int16_t worker_count, int64_t max_recsigs_age); ~LLMQContext(); /** Guaranteed if LLMQContext is initialized then all members are valid too diff --git a/src/llmq/core_write.cpp b/src/llmq/core_write.cpp index 89add66af10b..a5846169a78a 100644 --- a/src/llmq/core_write.cpp +++ b/src/llmq/core_write.cpp @@ -77,8 +77,8 @@ RPCResult CDKGDebugSessionStatus::GetJsonHelp(const std::string& key, bool optio }}; } -// CDKGDebugStatus::ToJson() defined in llmq/debug.cpp -RPCResult CDKGDebugStatus::GetJsonHelp(const std::string& key, bool optional, bool inner_optional) +// CDKGDebugManager::ToJson() defined in llmq/debug.cpp +RPCResult CDKGDebugManager::GetJsonHelp(const std::string& key, bool optional, bool inner_optional) { return {RPCResult::Type::OBJ, key, optional, key.empty() ? "" : "The state of the node's DKG sessions", { diff --git a/src/llmq/debug.cpp b/src/llmq/debug.cpp index 5d48d36682d4..5bd4887d22d4 100644 --- a/src/llmq/debug.cpp +++ b/src/llmq/debug.cpp @@ -107,21 +107,32 @@ UniValue CDKGDebugSessionStatus::ToJson(CDeterministicMNManager& dmnman, CQuorum return ret; } -CDKGDebugManager::CDKGDebugManager() = default; +CDKGDebugManager::CDKGDebugManager(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman, + const ChainstateManager& chainman) : + m_dmnman{dmnman}, + m_qsnapman{qsnapman}, + m_chainman{chainman} +{ +} CDKGDebugManager::~CDKGDebugManager() = default; -UniValue CDKGDebugStatus::ToJson(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman, - const ChainstateManager& chainman, int detailLevel) const +size_t CDKGDebugManager::GetSessionCount() const { - UniValue ret(UniValue::VOBJ); + return WITH_LOCK(cs_lockStatus, return localStatus.sessions.size()); +} + +UniValue CDKGDebugManager::ToJson(int detailLevel) const +{ + LOCK(cs_lockStatus); - ret.pushKV("time", nTime); - ret.pushKV("timeStr", FormatISO8601DateTime(nTime)); + UniValue ret(UniValue::VOBJ); + ret.pushKV("time", localStatus.nTime); + ret.pushKV("timeStr", FormatISO8601DateTime(localStatus.nTime)); // TODO Support array of sessions UniValue sessionsArrJson(UniValue::VARR); - for (const auto& p : sessions) { + for (const auto& p : localStatus.sessions) { const auto& llmq_params_opt = Params().GetLLMQ(p.first.first); if (!llmq_params_opt.has_value()) { continue; @@ -129,7 +140,7 @@ UniValue CDKGDebugStatus::ToJson(CDeterministicMNManager& dmnman, CQuorumSnapsho UniValue s(UniValue::VOBJ); s.pushKV("llmqType", std::string(llmq_params_opt->name)); s.pushKV("quorumIndex", p.first.second); - s.pushKV("status", p.second.ToJson(dmnman, qsnapman, chainman, p.first.second, detailLevel)); + s.pushKV("status", p.second.ToJson(m_dmnman, m_qsnapman, m_chainman, p.first.second, detailLevel)); sessionsArrJson.push_back(s); } @@ -138,12 +149,6 @@ UniValue CDKGDebugStatus::ToJson(CDeterministicMNManager& dmnman, CQuorumSnapsho return ret; } -void CDKGDebugManager::GetLocalDebugStatus(llmq::CDKGDebugStatus& ret) const -{ - LOCK(cs_lockStatus); - ret = localStatus; -} - void CDKGDebugManager::ResetLocalSessionStatus(Consensus::LLMQType llmqType, int quorumIndex) { LOCK(cs_lockStatus); diff --git a/src/llmq/debug.h b/src/llmq/debug.h index f6923abbe9a4..5fef407fa1b1 100644 --- a/src/llmq/debug.h +++ b/src/llmq/debug.h @@ -10,7 +10,7 @@ #include #include -#include +#include class CDataStream; class CDeterministicMNManager; @@ -45,7 +45,7 @@ class CDKGDebugMemberStatus uint8_t statusBitset; }; - std::set complaintsFromMembers; + std::unordered_set complaintsFromMembers; public: CDKGDebugMemberStatus() : statusBitset(0) {} @@ -83,22 +83,18 @@ class CDKGDebugSessionStatus const ChainstateManager& chainman, int quorumIndex, int detailLevel) const; }; -class CDKGDebugStatus -{ -public: +struct CDKGDebugStatus { int64_t nTime{0}; - std::map, CDKGDebugSessionStatus> sessions; - //std::map sessions; - -public: - [[nodiscard]] static RPCResult GetJsonHelp(const std::string& key, bool optional, bool inner_optional = false); - [[nodiscard]] UniValue ToJson(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman, - const ChainstateManager& chainman, int detailLevel) const; }; class CDKGDebugManager { +private: + CDeterministicMNManager& m_dmnman; + CQuorumSnapshotManager& m_qsnapman; + const ChainstateManager& m_chainman; + private: mutable Mutex cs_lockStatus; CDKGDebugStatus localStatus GUARDED_BY(cs_lockStatus); @@ -106,11 +102,9 @@ class CDKGDebugManager public: CDKGDebugManager(const CDKGDebugManager&) = delete; CDKGDebugManager& operator=(const CDKGDebugManager&) = delete; - CDKGDebugManager(); + CDKGDebugManager(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman); ~CDKGDebugManager(); - void GetLocalDebugStatus(CDKGDebugStatus& ret) const EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); - void ResetLocalSessionStatus(Consensus::LLMQType llmqType, int quorumIndex) EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); void InitLocalSessionStatus(const Consensus::LLMQParams& llmqParams, int quorumIndex, const uint256& quorumHash, int quorumHeight) EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); @@ -121,8 +115,13 @@ class CDKGDebugManager void UpdateLocalMemberStatus(Consensus::LLMQType llmqType, int quorumIndex, size_t memberIdx, std::function&& func) EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); -}; + size_t GetSessionCount() const + EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); + [[nodiscard]] static RPCResult GetJsonHelp(const std::string& key, bool optional, bool inner_optional = false); + [[nodiscard]] UniValue ToJson(int detailLevel) const + EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); +}; } // namespace llmq #endif // BITCOIN_LLMQ_DEBUG_H diff --git a/src/llmq/dkgsession.cpp b/src/llmq/dkgsession.cpp index db1cb05311c4..f2b91862ba86 100644 --- a/src/llmq/dkgsession.cpp +++ b/src/llmq/dkgsession.cpp @@ -25,6 +25,7 @@ #include #include #include +#include namespace llmq { CDKGLogger::CDKGLogger(const CDKGSession& _quorumDkg, std::string_view _func, int source_line) : @@ -197,42 +198,15 @@ bool CDKGSession::PreVerifyMessage(const CDKGContribution& qc, bool& retBan) con return true; } -// TODO: remove duplicated code between all ReceiveMessage: CDKGContribution, CDKGComplaint, CDKGJustification, CDKGPrematureCommitment std::optional CDKGSession::ReceiveMessage(const CDKGContribution& qc) { CDKGLogger logger(*this, __func__, __LINE__); - - auto* member = GetMember(qc.proTxHash); - cxxtimer::Timer t1(true); - logger.Batch("received contribution from %s", qc.proTxHash.ToString()); - - // relay, no matter if further verification fails - // This ensures the whole quorum sees the bad behavior - - if (member->contributions.size() >= 2) { - // only relay up to 2 contributions, that's enough to let the other members know about his bad behavior - return std::nullopt; - } - - const uint256 hash = ::SerializeHash(qc); - WITH_LOCK(invCs, contributions.emplace(hash, qc)); - member->contributions.emplace(hash); - - CInv inv(MSG_QUORUM_CONTRIB, hash); - dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) { - status.statusBits.receivedContribution = true; - return true; - }); - - if (member->contributions.size() > 1) { - // don't do any further processing if we got more than 1 contribution. we already relayed it, - // so others know about his bad behavior - MarkBadMember(member->idx); - logger.Batch("%s did send multiple contributions", member->dmn->proTxHash.ToString()); - return inv; - } + auto state = WITH_LOCK(invCs, return ReceiveMessagePreamble(qc, MsgPhase::Contribution, logger)); + if (!state) return std::nullopt; + auto& [member, hash, inv, should_process] = *state; + if (!should_process) return inv; receivedVvecs[member->idx] = qc.vvec; @@ -240,13 +214,11 @@ std::optional CDKGSession::ReceiveMessage(const CDKGContribution& qc) logger.Batch("received and relayed contribution. received=%d/%d, time=%d", receivedCount, members.size(), t1.count()); - cxxtimer::Timer t2(true); - if (!AreWeMember()) { - // can't further validate return inv; } + cxxtimer::Timer t2(true); dkgManager.WriteVerifiedVvecContribution(params.type, m_quorum_base_block_index, qc.proTxHash, qc.vvec); bool complain = false; @@ -327,33 +299,10 @@ std::optional CDKGSession::ReceiveMessage(const CDKGComplaint& qc) { CDKGLogger logger(*this, __func__, __LINE__); - logger.Batch("received complaint from %s", qc.proTxHash.ToString()); - - auto* member = GetMember(qc.proTxHash); - - if (member->complaints.size() >= 2) { - // only relay up to 2 complaints, that's enough to let the other members know about his bad behavior - return std::nullopt; - } - - const uint256 hash = ::SerializeHash(qc); - WITH_LOCK(invCs, complaints.emplace(hash, qc)); - member->complaints.emplace(hash); - - CInv inv(MSG_QUORUM_COMPLAINT, hash); - - dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) { - status.statusBits.receivedComplaint = true; - return true; - }); - - if (member->complaints.size() > 1) { - // don't do any further processing if we got more than 1 complaint. we already relayed it, - // so others know about his bad behavior - MarkBadMember(member->idx); - logger.Batch("%s did send multiple complaints", member->dmn->proTxHash.ToString()); - return inv; - } + auto state = WITH_LOCK(invCs, return ReceiveMessagePreamble(qc, MsgPhase::Complaint, logger)); + if (!state) return std::nullopt; + auto& [member, hash, inv, should_process] = *state; + if (!should_process) return inv; int receivedCount = 0; for (const auto i : irange::range(members.size())) { @@ -409,7 +358,7 @@ bool CDKGSession::PreVerifyMessage(const CDKGJustification& qj, bool& retBan) co return false; } - std::set contributionsSet; + std::unordered_set contributionsSet; for (const auto& [index, skContribution] : qj.contributions) { if (GetMemberAtIndex(index) == nullptr) { logger.Batch("invalid contribution index"); @@ -446,34 +395,10 @@ std::optional CDKGSession::ReceiveMessage(const CDKGJustification& qj) { CDKGLogger logger(*this, __func__, __LINE__); - logger.Batch("received justification from %s", qj.proTxHash.ToString()); - - auto* member = GetMember(qj.proTxHash); - - if (member->justifications.size() >= 2) { - // only relay up to 2 justifications, that's enough to let the other members know about his bad behavior - return std::nullopt; - } - - const uint256 hash = ::SerializeHash(qj); - WITH_LOCK(invCs, justifications.emplace(hash, qj)); - member->justifications.emplace(hash); - - // we always relay, even if further verification fails - CInv inv(MSG_QUORUM_JUSTIFICATION, hash); - - dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) { - status.statusBits.receivedJustification = true; - return true; - }); - - if (member->justifications.size() > 1) { - // don't do any further processing if we got more than 1 justification. we already relayed it, - // so others know about his bad behavior - logger.Batch("%s did send multiple justifications", member->dmn->proTxHash.ToString()); - MarkBadMember(member->idx); - return inv; - } + auto state = WITH_LOCK(invCs, return ReceiveMessagePreamble(qj, MsgPhase::Justification, logger)); + if (!state) return std::nullopt; + auto& [member, hash, inv, should_process] = *state; + if (!should_process) return inv; if (member->bad) { // we locally determined him to be bad (sent none or more then one contributions) @@ -691,6 +616,80 @@ CDKGMember* CDKGSession::GetMemberAtIndex(size_t index) const return members[index].get(); } +template +std::optional CDKGSession::ReceiveMessagePreamble(const MsgType& msg, MsgPhase phase, CDKGLogger& logger) +{ + auto* member = GetMember(msg.proTxHash); + if (member == nullptr) { + logger.Batch("message from non-member %s", msg.proTxHash.ToString()); + return std::nullopt; + } + + GetDataMsg inv_type{0}; + std::string msg_name; + + // Select member set, inv type, and name based on phase + auto& member_set = [&]() -> Uint256HashSet& { + switch (phase) { + case MsgPhase::Contribution: + inv_type = MSG_QUORUM_CONTRIB; + msg_name = "contribution"; + return member->contributions; + case MsgPhase::Complaint: + inv_type = MSG_QUORUM_COMPLAINT; + msg_name = "complaint"; + return member->complaints; + case MsgPhase::Justification: + inv_type = MSG_QUORUM_JUSTIFICATION; + msg_name = "justification"; + return member->justifications; + } + assert(false); + }(); + + logger.Batch("received %s from %s", msg_name, msg.proTxHash.ToString()); + + if (member_set.size() >= 2) { + // only relay up to 2 messages, that's enough to let the other members know about his bad behavior + return std::nullopt; + } + + const uint256 hash = ::SerializeHash(msg); + member_set.emplace(hash); + if constexpr (std::is_same_v) { + contributions.emplace(hash, msg); + } else if constexpr (std::is_same_v) { + complaints.emplace(hash, msg); + } else if constexpr (std::is_same_v) { + justifications.emplace(hash, msg); + } + + dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [phase](CDKGDebugMemberStatus& status) { + switch (phase) { + case MsgPhase::Contribution: status.statusBits.receivedContribution = true; break; + case MsgPhase::Complaint: status.statusBits.receivedComplaint = true; break; + case MsgPhase::Justification: status.statusBits.receivedJustification = true; break; + } + return true; + }); + + bool should_process{true}; + if (member_set.size() > 1) { + // don't do any further processing if we got more than 1 justification. we already relayed it, + // so others know about his bad behavior + MarkBadMember(member->idx); + logger.Batch("%s did send multiple %ss", member->dmn->proTxHash.ToString(), msg_name); + should_process = false; + } + + // we always relay, even if further verification fails + return ReceiveMessageState{member, hash, CInv{inv_type, hash}, should_process}; +} + +template std::optional CDKGSession::ReceiveMessagePreamble(const CDKGContribution&, MsgPhase, CDKGLogger&); +template std::optional CDKGSession::ReceiveMessagePreamble(const CDKGComplaint&, MsgPhase, CDKGLogger&); +template std::optional CDKGSession::ReceiveMessagePreamble(const CDKGJustification&, MsgPhase, CDKGLogger&); + void CDKGSession::MarkBadMember(size_t idx) { auto* member = members.at(idx).get(); diff --git a/src/llmq/dkgsession.h b/src/llmq/dkgsession.h index 95529b2bdde6..a24273dcd522 100644 --- a/src/llmq/dkgsession.h +++ b/src/llmq/dkgsession.h @@ -6,14 +6,15 @@ #define BITCOIN_LLMQ_DKGSESSION_H #include +#include #include #include #include #include #include - #include + #include #include @@ -21,7 +22,6 @@ #include class CActiveMasternodeManager; -class CInv; class CConnman; class CDeterministicMN; class CMasternodeMetaMan; @@ -215,13 +215,13 @@ class CDKGMember size_t idx; CBLSId id; - std::set contributions; - std::set complaints; - std::set justifications; - std::set prematureCommitments; + Uint256HashSet contributions; + Uint256HashSet complaints; + Uint256HashSet justifications; + Uint256HashSet prematureCommitments; - std::set badMemberVotes; - std::set complaintsFromOthers; + Uint256HashSet badMemberVotes; + Uint256HashSet complaintsFromOthers; bool bad{false}; bool badConnection{false}; @@ -281,6 +281,20 @@ class CDKGSession friend class CDKGSessionManager; friend class CDKGLogger; +private: + enum class MsgPhase : uint8_t { + Contribution, + Complaint, + Justification + }; + + struct ReceiveMessageState { + CDKGMember* member{nullptr}; + uint256 hash{}; + CInv inv{}; + bool should_process{true}; + }; + private: CBLSWorker& blsWorker; CBLSWorkerCache cache; @@ -325,7 +339,7 @@ class CDKGSession std::vector pendingContributionVerifications GUARDED_BY(cs_pending); // filled by ReceivePrematureCommitment and used by FinalizeCommitments - std::set validCommitments GUARDED_BY(invCs); + Uint256HashSet validCommitments GUARDED_BY(invCs); public: CDKGSession(CBLSWorker& _blsWorker, CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, @@ -366,7 +380,7 @@ class CDKGSession // Phase 3: justification virtual void VerifyAndJustify(CDKGPendingMessages& pendingMessages, PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!invCs) {} - virtual void SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, const std::set& forMembers) {} + virtual void SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, const Uint256HashSet& forMembers) {} bool PreVerifyMessage(const CDKGJustification& qj, bool& retBan) const; std::optional ReceiveMessage(const CDKGJustification& qj) EXCLUSIVE_LOCKS_REQUIRED(!invCs); @@ -401,12 +415,16 @@ class CDKGSession private: [[nodiscard]] bool ShouldSimulateError(DKGError::type type) const; + + template + [[nodiscard]] std::optional ReceiveMessagePreamble(const MsgType& msg, MsgPhase phase, CDKGLogger& logger) + EXCLUSIVE_LOCKS_REQUIRED(invCs); + void MarkBadMember(size_t idx); }; void SetSimulatedDKGErrorRate(DKGError::type type, double rate); double GetSimulatedErrorRate(DKGError::type type); - } // namespace llmq #endif // BITCOIN_LLMQ_DKGSESSION_H diff --git a/src/llmq/dkgsessionhandler.h b/src/llmq/dkgsessionhandler.h index 19ea22cdd6c7..98b1cf7ac45e 100644 --- a/src/llmq/dkgsessionhandler.h +++ b/src/llmq/dkgsessionhandler.h @@ -18,8 +18,8 @@ #include #include #include -#include #include +#include #include class CBlockIndex; @@ -67,7 +67,7 @@ class CDKGPendingMessages mutable Mutex cs_messages; std::list pendingMessages GUARDED_BY(cs_messages); std::map messagesPerNode GUARDED_BY(cs_messages); - std::set seenMessages GUARDED_BY(cs_messages); + Uint256HashSet seenMessages GUARDED_BY(cs_messages); public: explicit CDKGPendingMessages(size_t _maxMessagesPerNode, uint32_t _invType) : diff --git a/src/llmq/ehf_signals.h b/src/llmq/ehf_signals.h index 7f6688fd4eb9..022a1d48bed4 100644 --- a/src/llmq/ehf_signals.h +++ b/src/llmq/ehf_signals.h @@ -7,6 +7,7 @@ #include #include +#include #include @@ -30,7 +31,8 @@ class CEHFSignalsHandler : public CRecoveredSigsListener * keep freshly generated IDs for easier filter sigs in HandleNewRecoveredSig */ mutable Mutex cs; - std::set ids GUARDED_BY(cs); + Uint256HashSet ids GUARDED_BY(cs); + public: explicit CEHFSignalsHandler(ChainstateManager& chainman, CSigningManager& sigman, CSigSharesManager& shareman, const CQuorumManager& qman); diff --git a/src/llmq/observer/context.cpp b/src/llmq/observer/context.cpp index ac8a3258c52e..717cd58f143b 100644 --- a/src/llmq/observer/context.cpp +++ b/src/llmq/observer/context.cpp @@ -19,7 +19,7 @@ ObserverContext::ObserverContext(CBLSWorker& bls_worker, CConnman& connman, CDet const CSporkManager& sporkman, const llmq::QvvecSyncModeMap& sync_map, const util::DbWrapperParams& db_params, bool quorums_recovery) : m_qman{qman}, - dkgdbgman{std::make_unique()}, + dkgdbgman{std::make_unique(dmnman, qsnapman, chainman)}, qdkgsman{std::make_unique(dmnman, qsnapman, chainman, sporkman, db_params, /*quorums_watch=*/true)}, qman_handler{std::make_unique(connman, dmnman, qman, qsnapman, chainman, mn_sync, sporkman, @@ -37,9 +37,9 @@ ObserverContext::~ObserverContext() m_qman.DisconnectManagers(); } -void ObserverContext::Start() +void ObserverContext::Start(int16_t worker_count) { - qman_handler->Start(); + qman_handler->Start(worker_count); } void ObserverContext::Stop() diff --git a/src/llmq/observer/context.h b/src/llmq/observer/context.h index ed2f015d9703..0f6c2b5501cf 100644 --- a/src/llmq/observer/context.h +++ b/src/llmq/observer/context.h @@ -47,7 +47,7 @@ struct ObserverContext final : public CValidationInterface { const util::DbWrapperParams& db_params, bool quorums_recovery); ~ObserverContext(); - void Start(); + void Start(int16_t worker_count); void Stop(); protected: diff --git a/src/llmq/observer/quorums.cpp b/src/llmq/observer/quorums.cpp index 7f31d58b4c68..2c089d559016 100644 --- a/src/llmq/observer/quorums.cpp +++ b/src/llmq/observer/quorums.cpp @@ -44,11 +44,10 @@ QuorumObserver::~QuorumObserver() Stop(); } -void QuorumObserver::Start() +void QuorumObserver::Start(int16_t worker_count) { - int workerCount = std::thread::hardware_concurrency() / 2; - workerCount = std::clamp(workerCount, 1, 4); - workerPool.resize(workerCount); + assert(worker_count > 0); + workerPool.resize(worker_count); RenameThreadPool(workerPool, "q-mngr"); } @@ -334,7 +333,7 @@ void QuorumObserver::StartCleanupOldQuorumDataThread(gsl::not_null dbKeysToSkip; + Uint256HashSet dbKeysToSkip; if (LOCK(cs_cleanup); cleanupQuorumsCache.empty()) { utils::InitQuorumsCache(cleanupQuorumsCache, m_chainman.GetConsensus(), /*limit_by_connections=*/false); @@ -346,7 +345,7 @@ void QuorumObserver::StartCleanupOldQuorumDataThread(gsl::not_null quorum_keys; + Uint256HashSet quorum_keys; while (pindex_loop != nullptr && pIndex->nHeight - pindex_loop->nHeight < params.max_store_depth()) { uint256 quorum_key; if (cache.get(pindex_loop->GetBlockHash(), quorum_key)) { diff --git a/src/llmq/observer/quorums.h b/src/llmq/observer/quorums.h index 20d580466ed9..7b7bb68f79d4 100644 --- a/src/llmq/observer/quorums.h +++ b/src/llmq/observer/quorums.h @@ -9,9 +9,9 @@ #include #include #include +#include #include -#include #include #include #include @@ -59,7 +59,7 @@ class QuorumObserverParent gsl::not_null pindexStart, size_t nCountRequested) const = 0; virtual void CleanupExpiredDataRequests() const = 0; - virtual void CleanupOldQuorumData(const std::set& dbKeysToSkip) const = 0; + virtual void CleanupOldQuorumData(const Uint256HashSet& dbKeysToSkip) const = 0; }; class QuorumObserver @@ -92,7 +92,7 @@ class QuorumObserver const llmq::QvvecSyncModeMap& sync_map, bool quorums_recovery); virtual ~QuorumObserver(); - void Start(); + void Start(int16_t worker_count); void Stop(); void UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload) const; diff --git a/src/llmq/options.cpp b/src/llmq/options.cpp index f0074373a5d1..ed6a35306dd7 100644 --- a/src/llmq/options.cpp +++ b/src/llmq/options.cpp @@ -14,11 +14,13 @@ #include #include +#include #include #include +#include -namespace llmq -{ +namespace llmq { +int16_t DEFAULT_WORKER_COUNT{std::clamp(std::thread::hardware_concurrency() / 2, 1, 4)}; static bool EvalSpork(const Consensus::LLMQType llmqType, const int64_t spork_value) { diff --git a/src/llmq/options.h b/src/llmq/options.h index 3f6459ad6595..e546190301d2 100644 --- a/src/llmq/options.h +++ b/src/llmq/options.h @@ -27,14 +27,16 @@ enum class QvvecSyncMode : int8_t { OnlyIfTypeMember = 1, }; -/** Maximum number of dedicated script-checking threads allowed */ -static const int8_t MAX_BLSCHECK_THREADS{33}; -/** -parbls default (number of bls-checking threads, 0 = auto) */ -static const int8_t DEFAULT_BLSCHECK_THREADS{0}; /** -llmq-data-recovery default */ static constexpr bool DEFAULT_ENABLE_QUORUM_DATA_RECOVERY{true}; /** -watchquorums default, if true, we will connect to all new quorums and watch their communication */ static constexpr bool DEFAULT_WATCH_QUORUMS{false}; +/** -parbls default (number of bls-checking threads, 0 = auto) */ +static constexpr int8_t DEFAULT_BLSCHECK_THREADS{0}; +/** Number of workers allocated per worker pool */ +extern int16_t DEFAULT_WORKER_COUNT; +/** Maximum number of dedicated script-checking threads allowed */ +static constexpr int8_t MAX_BLSCHECK_THREADS{33}; bool IsAllMembersConnectedEnabled(const Consensus::LLMQType llmqType, const CSporkManager& sporkman); bool IsQuorumPoseEnabled(const Consensus::LLMQType llmqType, const CSporkManager& sporkman); diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index 54e0bf5c74c4..330576e4413c 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -26,7 +26,7 @@ uint256 MakeQuorumKey(const CQuorum& q) return hw.GetHash(); } -void DataCleanupHelper(CDBWrapper& db, const std::set& skip_list, bool compact) +void DataCleanupHelper(CDBWrapper& db, const Uint256HashSet& skip_list, bool compact) { const auto prefixes = {DB_QUORUM_QUORUM_VVEC, DB_QUORUM_SK_SHARE}; diff --git a/src/llmq/quorums.h b/src/llmq/quorums.h index e7e4b1c5d6fd..5bd10eac2b9b 100644 --- a/src/llmq/quorums.h +++ b/src/llmq/quorums.h @@ -35,7 +35,7 @@ extern const std::string DB_QUORUM_SK_SHARE; extern const std::string DB_QUORUM_QUORUM_VVEC; uint256 MakeQuorumKey(const CQuorum& q); -void DataCleanupHelper(CDBWrapper& db, const std::set& skip_list, bool compact = false); +void DataCleanupHelper(CDBWrapper& db, const Uint256HashSet& skip_list, bool compact = false); /** * Object used as a key to store CQuorumDataRequest diff --git a/src/llmq/quorumsman.cpp b/src/llmq/quorumsman.cpp index 55d1d8a26209..b471e5e433a6 100644 --- a/src/llmq/quorumsman.cpp +++ b/src/llmq/quorumsman.cpp @@ -82,7 +82,7 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l quorum->Init(std::make_unique(std::move(qc)), pQuorumBaseBlockIndex, minedBlockHash, members); if (populate_cache && llmq_params_opt->size == 1) { - WITH_LOCK(cs_map_quorums, mapQuorumsCache[llmqType].insert(quorumHash, quorum)); + WITH_LOCK(m_cs_maps, mapQuorumsCache[llmqType].insert(quorumHash, quorum)); return quorum; } @@ -106,7 +106,7 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l QueueQuorumForWarming(quorum); } - WITH_LOCK(cs_map_quorums, mapQuorumsCache[llmqType].insert(quorumHash, quorum)); + WITH_LOCK(m_cs_maps, mapQuorumsCache[llmqType].insert(quorumHash, quorum)); return quorum; } @@ -230,7 +230,7 @@ std::vector CQuorumManager::ScanQuorums(Consensus::LLMQType llmqTyp std::vector vecResultQuorums; { - LOCK(cs_scan_quorums); + LOCK(m_cs_maps); if (scanQuorumsCache.empty()) { for (const auto& llmq : Params().GetConsensus().llmqs) { // NOTE: We store it for each block hash in the DKG mining phase here @@ -293,7 +293,7 @@ std::vector CQuorumManager::ScanQuorums(Consensus::LLMQType llmqTyp const size_t nCountResult{vecResultQuorums.size()}; if (nCountResult > 0) { - LOCK(cs_scan_quorums); + LOCK(m_cs_maps); // Don't cache more than keepOldConnections elements // because signing by old quorums requires the exact quorum hash // to be specified and quorum scanning isn't needed there. @@ -359,7 +359,7 @@ void CQuorumManager::CleanupExpiredDataRequests() const } } -void CQuorumManager::CleanupOldQuorumData(const std::set& dbKeysToSkip) const +void CQuorumManager::CleanupOldQuorumData(const Uint256HashSet& dbKeysToSkip) const { LOCK(cs_db); DataCleanupHelper(*db, dbKeysToSkip); @@ -399,7 +399,7 @@ CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, gsl::not_nul } CQuorumPtr pQuorum; - if (LOCK(cs_map_quorums); mapQuorumsCache[llmqType].get(quorumHash, pQuorum)) { + if (LOCK(m_cs_maps); mapQuorumsCache[llmqType].get(quorumHash, pQuorum)) { return pQuorum; } @@ -530,7 +530,7 @@ MessageProcessingResult CQuorumManager::ProcessMessage(CNode& pfrom, CConnman& c CQuorumPtr pQuorum; { - if (LOCK(cs_map_quorums); !mapQuorumsCache[request.GetLLMQType()].get(request.GetQuorumHash(), pQuorum)) { + if (LOCK(m_cs_maps); !mapQuorumsCache[request.GetLLMQType()].get(request.GetQuorumHash(), pQuorum)) { // Don't bump score because we asked for it LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s: Quorum not found, from peer=%d\n", __func__, msg_type, pfrom.GetId()); return {}; diff --git a/src/llmq/quorumsman.h b/src/llmq/quorumsman.h index 811ef1f3ad08..352ec128dc7f 100644 --- a/src/llmq/quorumsman.h +++ b/src/llmq/quorumsman.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -83,12 +84,11 @@ class CQuorumManager final : public QuorumObserverParent mutable std::unordered_map mapQuorumDataRequests GUARDED_BY(cs_data_requests); - mutable Mutex cs_map_quorums; - mutable std::map> mapQuorumsCache GUARDED_BY(cs_map_quorums); - - mutable Mutex cs_scan_quorums; // TODO: merge cs_map_quorums, cs_scan_quorums mutexes + mutable Mutex m_cs_maps; + mutable std::map> mapQuorumsCache + GUARDED_BY(m_cs_maps); mutable std::map>> scanQuorumsCache - GUARDED_BY(cs_scan_quorums); + GUARDED_BY(m_cs_maps); // On mainnet, we have around 62 quorums active at any point; let's cache a little more than double that to be safe. // it maps `quorum_hash` to `pindex` @@ -130,7 +130,7 @@ class CQuorumManager final : public QuorumObserverParent [[nodiscard]] MessageProcessingResult ProcessMessage(CNode& pfrom, CConnman& connman, std::string_view msg_type, CDataStream& vRecv) - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_data_requests, !cs_map_quorums, !m_cache_cs); + EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_data_requests, !m_cs_maps, !m_cache_cs); static bool HasQuorum(Consensus::LLMQType llmqType, const CQuorumBlockProcessor& quorum_block_processor, const uint256& quorumHash); @@ -140,14 +140,14 @@ class CQuorumManager final : public QuorumObserverParent // all these methods will lock cs_main for a short period of time CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_map_quorums, !m_cache_cs); + EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); std::vector ScanQuorums(Consensus::LLMQType llmqType, size_t nCountRequested) const - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_map_quorums, !cs_scan_quorums, !m_cache_cs); + EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); // this one is cs_main-free std::vector ScanQuorums(Consensus::LLMQType llmqType, gsl::not_null pindexStart, size_t nCountRequested) const override - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_map_quorums, !cs_scan_quorums, !m_cache_cs); + EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); bool IsMasternode() const; bool IsWatching() const; @@ -158,7 +158,7 @@ class CQuorumManager final : public QuorumObserverParent Consensus::LLMQType llmqType) const override EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests); void CleanupExpiredDataRequests() const override EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests); - void CleanupOldQuorumData(const std::set& dbKeysToSkip) const override EXCLUSIVE_LOCKS_REQUIRED(!cs_db); + void CleanupOldQuorumData(const Uint256HashSet& dbKeysToSkip) const override EXCLUSIVE_LOCKS_REQUIRED(!cs_db); private: // all private methods here are cs_main-free @@ -167,11 +167,11 @@ class CQuorumManager final : public QuorumObserverParent CQuorumPtr BuildQuorumFromCommitment(Consensus::LLMQType llmqType, gsl::not_null pQuorumBaseBlockIndex, bool populate_cache) const - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_map_quorums, !m_cache_cs); + EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, gsl::not_null pindex, bool populate_cache = true) const - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_map_quorums, !m_cache_cs); + EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); void QueueQuorumForWarming(CQuorumCPtr pQuorum) const EXCLUSIVE_LOCKS_REQUIRED(!m_cache_cs); void CacheWarmingThreadMain() const EXCLUSIVE_LOCKS_REQUIRED(!m_cache_cs); diff --git a/src/llmq/signing.cpp b/src/llmq/signing.cpp index 1a52d0a327cb..d0512ebb248b 100644 --- a/src/llmq/signing.cpp +++ b/src/llmq/signing.cpp @@ -181,8 +181,7 @@ void CRecoveredSigsDb::RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType l if (deleteTimeKey) { CDataStream writeTimeDs(SER_DISK, CLIENT_VERSION); - // TODO remove the size() == sizeof(uint32_t) in a future version (when we stop supporting upgrades from < 0.14.1) - if (db->ReadDataStream(k2, writeTimeDs) && writeTimeDs.size() == sizeof(uint32_t)) { + if (db->ReadDataStream(k2, writeTimeDs)) { uint32_t writeTime; writeTimeDs >> writeTime; auto k5 = std::make_tuple(std::string("rs_t"), (uint32_t) htobe32_internal(writeTime), recSig.getLlmqType(), recSig.getId()); diff --git a/src/llmq/utils.cpp b/src/llmq/utils.cpp index 8106d893fbeb..818a900fdc2b 100644 --- a/src/llmq/utils.cpp +++ b/src/llmq/utils.cpp @@ -755,8 +755,9 @@ Uint256HashSet GetQuorumRelayMembers(const Consensus::LLMQParams& llmqParams, co return result; } -std::set CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, gsl::not_null pQuorumBaseBlockIndex, - size_t memberCount, size_t connectionCount) +std::unordered_set CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, + gsl::not_null pQuorumBaseBlockIndex, + size_t memberCount, size_t connectionCount) { static uint256 qwatchConnectionSeed; static std::atomic qwatchConnectionSeedGenerated{false}; @@ -767,7 +768,7 @@ std::set CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, qwatchConnectionSeedGenerated = true; } - std::set result; + std::unordered_set result; uint256 rnd = qwatchConnectionSeed; for ([[maybe_unused]] const auto _ : irange::range(connectionCount)) { rnd = ::SerializeHash(std::make_pair(rnd, std::make_pair(llmqType, pQuorumBaseBlockIndex->GetBlockHash()))); @@ -842,7 +843,7 @@ void AddQuorumProbeConnections(const Consensus::LLMQParams& llmqParams, CConnman auto members = GetAllQuorumMembers(llmqParams.type, util_params); auto curTime = GetTime().count(); - std::set probeConnections; + Uint256HashSet probeConnections; for (const auto& dmn : members) { if (dmn->proTxHash == myProTxHash) { continue; diff --git a/src/llmq/utils.h b/src/llmq/utils.h index 5279baf8cf4c..2f839dc16810 100644 --- a/src/llmq/utils.h +++ b/src/llmq/utils.h @@ -16,7 +16,7 @@ #include -#include +#include #include class CBlockIndex; @@ -62,9 +62,9 @@ struct BlsCheck { uint256 DeterministicOutboundConnection(const uint256& proTxHash1, const uint256& proTxHash2); -std::set CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, - gsl::not_null pQuorumBaseBlockIndex, - size_t memberCount, size_t connectionCount); +std::unordered_set CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, + gsl::not_null pQuorumBaseBlockIndex, + size_t memberCount, size_t connectionCount); // includes members which failed DKG std::vector GetAllQuorumMembers(Consensus::LLMQType llmqType, const UtilParameters& util_params, diff --git a/src/net.cpp b/src/net.cpp index 3b70ee45fe00..a408e519624f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -4456,7 +4456,7 @@ bool CConnman::IsMasternodeQuorumRelayMember(const uint256& protxHash) return false; } -void CConnman::AddPendingProbeConnections(const std::set &proTxHashes) +void CConnman::AddPendingProbeConnections(const Uint256HashSet& proTxHashes) { LOCK(cs_vPendingMasternodes); masternodePendingProbes.insert(proTxHashes.begin(), proTxHashes.end()); diff --git a/src/net.h b/src/net.h index 8bdda43e9e75..b58738e9d6bf 100644 --- a/src/net.h +++ b/src/net.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -35,7 +36,6 @@ #include #include #include -#include #include #include @@ -1465,7 +1465,7 @@ friend class CNode; void RemoveMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash); bool IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list) const; bool IsMasternodeQuorumRelayMember(const uint256& protxHash); - void AddPendingProbeConnections(const std::set& proTxHashes); + void AddPendingProbeConnections(const Uint256HashSet& proTxHashes); size_t GetNodeCount(ConnectionDirection) const EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex); std::map getNetLocalAddresses() const; @@ -1783,7 +1783,7 @@ friend class CNode; mutable RecursiveMutex cs_vPendingMasternodes; std::map, Uint256HashSet> masternodeQuorumNodes GUARDED_BY(cs_vPendingMasternodes); std::map, Uint256HashSet> masternodeQuorumRelayMembers GUARDED_BY(cs_vPendingMasternodes); - std::set masternodePendingProbes GUARDED_BY(cs_vPendingMasternodes); + Uint256HashSet masternodePendingProbes GUARDED_BY(cs_vPendingMasternodes); mutable Mutex cs_mapSocketToNode; std::unordered_map mapSocketToNode GUARDED_BY(cs_mapSocketToNode); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 3d428992565e..0adbc77a685e 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -971,6 +971,9 @@ class PeerManagerImpl final : public PeerManager bool AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_recent_confirmed_transactions_mutex); + bool DKGSessionAlreadyHave(const CInv& inv); + MessageProcessingResult DKGSessionProcessMessage(CNode& pfrom, bool is_masternode, std::string_view msg_type, CDataStream& vRecv); + /** * Filter for transactions that were recently rejected by the mempool. * These are not rerequested until the chain tip changes, at which point @@ -2363,8 +2366,7 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv) case MSG_QUORUM_COMPLAINT: case MSG_QUORUM_JUSTIFICATION: case MSG_QUORUM_PREMATURE_COMMITMENT: - return (m_observer_ctx && m_observer_ctx->qdkgsman->AlreadyHave(inv)) - || (m_active_ctx && m_active_ctx->qdkgsman->AlreadyHave(inv)); + return DKGSessionAlreadyHave(inv); case MSG_QUORUM_RECOVERED_SIG: // TODO: move it to NetSigning return m_llmq_ctx->sigman->AlreadyHave(inv); @@ -2389,6 +2391,16 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv) return true; } +bool PeerManagerImpl::DKGSessionAlreadyHave(const CInv& inv) +{ + if (m_observer_ctx) { + return m_observer_ctx->qdkgsman->AlreadyHave(inv); + } else if (m_active_ctx) { + return m_active_ctx->qdkgsman->AlreadyHave(inv); + } + return false; +} + bool PeerManagerImpl::AlreadyHaveBlock(const uint256& block_hash) { return m_chainman.m_blockman.LookupBlockIndex(block_hash) != nullptr; @@ -5468,24 +5480,7 @@ void PeerManagerImpl::ProcessMessage( if (m_cj_walletman) { PostProcessMessage(m_cj_walletman->processMessage(pfrom, m_chainman.ActiveChainstate(), m_connman, m_mempool, msg_type, vRecv), pfrom.GetId()); } - if (m_active_ctx) { - assert(is_masternode); - PostProcessMessage(m_active_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv), pfrom.GetId()); - } - if (m_observer_ctx) { - assert(!is_masternode); - PostProcessMessage(m_observer_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv), pfrom.GetId()); - } - if (!m_active_ctx && !m_observer_ctx) { - assert(!is_masternode); - if (msg_type == NetMsgType::QCONTRIB - || msg_type == NetMsgType::QCOMPLAINT - || msg_type == NetMsgType::QJUSTIFICATION - || msg_type == NetMsgType::QPCOMMITMENT - || msg_type == NetMsgType::QWATCH) { - Misbehaving(pfrom.GetId(), /*howmuch=*/10); - } - } + PostProcessMessage(DKGSessionProcessMessage(pfrom, is_masternode, msg_type, vRecv), pfrom.GetId()); PostProcessMessage(m_sporkman.ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom.GetId()); PostProcessMessage(CMNAuth::ProcessMessage(pfrom, peer->m_their_services, m_connman, m_mn_metaman, (m_active_ctx ? m_active_ctx->nodeman.get() : nullptr), m_mn_sync, m_dmnman->GetListAtChainTip(), msg_type, vRecv), pfrom.GetId()); PostProcessMessage(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom.GetId()); @@ -5515,6 +5510,26 @@ void PeerManagerImpl::ProcessMessage( return; } +MessageProcessingResult PeerManagerImpl::DKGSessionProcessMessage(CNode& pfrom, bool is_masternode, std::string_view msg_type, CDataStream& vRecv) +{ + if (m_active_ctx) { + assert(is_masternode); + return m_active_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv); + } else if (m_observer_ctx) { + assert(!is_masternode); + return m_observer_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv); + } + assert(!is_masternode); + if (msg_type == NetMsgType::QCONTRIB + || msg_type == NetMsgType::QCOMPLAINT + || msg_type == NetMsgType::QJUSTIFICATION + || msg_type == NetMsgType::QPCOMMITMENT + || msg_type == NetMsgType::QWATCH) { + return MisbehavingError{10}; + } + return {}; +} + bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer) { { diff --git a/src/node/chainstate.cpp b/src/node/chainstate.cpp index aee964546334..98ac8e3d4f04 100644 --- a/src/node/chainstate.cpp +++ b/src/node/chainstate.cpp @@ -59,6 +59,7 @@ std::optional LoadChainstate(bool fReset, bool coins_db_in_memory, bool dash_dbs_in_memory, int8_t bls_threads, + int16_t worker_count, int64_t max_recsigs_age, std::function shutdown_requested, std::function coins_error_cb) @@ -84,7 +85,8 @@ std::optional LoadChainstate(bool fReset, DashChainstateSetup(chainman, govman, mn_metaman, mn_sync, sporkman, chainlocks, chain_helper, dmnman, *evodb, llmq_ctx, mempool, data_dir, dash_dbs_in_memory, - /*llmq_dbs_wipe=*/fReset || fReindexChainState, bls_threads, max_recsigs_age, consensus_params); + /*llmq_dbs_wipe=*/fReset || fReindexChainState, bls_threads, worker_count, + max_recsigs_age, consensus_params); if (fReset) { pblocktree->WriteReindexing(true); @@ -219,6 +221,7 @@ void DashChainstateSetup(ChainstateManager& chainman, bool llmq_dbs_in_memory, bool llmq_dbs_wipe, int8_t bls_threads, + int16_t worker_count, int64_t max_recsigs_age, const Consensus::Params& consensus_params) { @@ -229,7 +232,7 @@ void DashChainstateSetup(ChainstateManager& chainman, llmq_ctx.reset(); llmq_ctx = std::make_unique(*dmnman, evodb, sporkman, chainlocks, *mempool, chainman, mn_sync, util::DbWrapperParams{.path = data_dir, .memory = llmq_dbs_in_memory, .wipe = llmq_dbs_wipe}, - bls_threads, max_recsigs_age); + bls_threads, worker_count, max_recsigs_age); mempool->ConnectManagers(dmnman.get(), llmq_ctx->isman.get()); chain_helper.reset(); chain_helper = std::make_unique(evodb, *dmnman, govman, *(llmq_ctx->isman), *(llmq_ctx->quorum_block_processor), diff --git a/src/node/chainstate.h b/src/node/chainstate.h index 66b78067cc1d..09adc7cdf214 100644 --- a/src/node/chainstate.h +++ b/src/node/chainstate.h @@ -102,6 +102,7 @@ std::optional LoadChainstate(bool fReset, bool coins_db_in_memory, bool dash_dbs_in_memory, int8_t bls_threads, + int16_t worker_count, int64_t max_recsigs_age, std::function shutdown_requested = nullptr, std::function coins_error_cb = nullptr); @@ -122,6 +123,7 @@ void DashChainstateSetup(ChainstateManager& chainman, bool llmq_dbs_in_memory, bool llmq_dbs_wipe, int8_t bls_threads, + int16_t worker_count, int64_t max_recsigs_age, const Consensus::Params& consensus_params); diff --git a/src/rpc/quorums.cpp b/src/rpc/quorums.cpp index f35bd6952f4b..b65223e0baab 100644 --- a/src/rpc/quorums.cpp +++ b/src/rpc/quorums.cpp @@ -298,7 +298,7 @@ static RPCHelpMan quorum_info() static RPCResult quorum_dkgstatus_help() { - auto ret = llmq::CDKGDebugStatus::GetJsonHelp(/*key=*/"", /*optional=*/false, /*inner_optional=*/true); + auto ret = llmq::CDKGDebugManager::GetJsonHelp(/*key=*/"", /*optional=*/false, /*inner_optional=*/true); auto mod_inner = ret.m_inner; mod_inner.push_back({RPCResult::Type::ARR, "quorumConnections", "Array of objects containing quorum connection information", { {RPCResult::Type::OBJ, "", "", { @@ -347,18 +347,16 @@ static RPCHelpMan quorum_dkgstatus() UniValue quorumArrConnections(UniValue::VARR); const NodeContext& node = EnsureAnyNodeContext(request.context); - const ChainstateManager& chainman = EnsureChainman(node); - const LLMQContext& llmq_ctx = EnsureLLMQContext(node); if (const auto* debugman = node.active_ctx ? node.active_ctx->dkgdbgman.get() : node.observer_ctx ? node.observer_ctx->dkgdbgman.get() : nullptr; debugman) { - llmq::CDKGDebugStatus status; - debugman->GetLocalDebugStatus(status); - ret = status.ToJson(*CHECK_NONFATAL(node.dmnman), *llmq_ctx.qsnapman, chainman, detailLevel); + ret = debugman->ToJson(detailLevel); } const CConnman& connman = EnsureConnman(node); + const ChainstateManager& chainman = EnsureChainman(node); const CBlockIndex* const pindexTip = WITH_LOCK(cs_main, return chainman.ActiveChain().Tip()); + const LLMQContext& llmq_ctx = EnsureLLMQContext(node); const int tipHeight = pindexTip->nHeight; const uint256 proTxHash = node.active_ctx ? node.active_ctx->nodeman->GetProTxHash() : uint256{}; for (const auto& type : llmq::GetEnabledQuorumTypes(chainman, pindexTip)) { @@ -1005,10 +1003,8 @@ static RPCHelpMan quorum_dkginfo() } const auto& dkgdbgman = *(node.active_ctx ? node.active_ctx->dkgdbgman.get() : node.observer_ctx->dkgdbgman.get()); - llmq::CDKGDebugStatus status; - dkgdbgman.GetLocalDebugStatus(status); UniValue ret(UniValue::VOBJ); - ret.pushKV("active_dkgs", status.sessions.size()); + ret.pushKV("active_dkgs", dkgdbgman.GetSessionCount()); const ChainstateManager& chainman = EnsureChainman(node); const int nTipHeight{WITH_LOCK(cs_main, return chainman.ActiveChain().Height())}; diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 77fd1840203f..9847e4b2f913 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -146,7 +146,8 @@ void DashChainstateSetup(ChainstateManager& chainman, DashChainstateSetup(chainman, *Assert(node.govman.get()), *Assert(node.mn_metaman.get()), *Assert(node.mn_sync.get()), *Assert(node.sporkman.get()), *Assert(node.chainlocks), node.chain_helper, node.dmnman, *node.evodb, node.llmq_ctx, Assert(node.mempool.get()), node.args->GetDataDirNet(), llmq_dbs_in_memory, llmq_dbs_wipe, - llmq::DEFAULT_BLSCHECK_THREADS, llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE, consensus_params); + llmq::DEFAULT_BLSCHECK_THREADS, llmq::DEFAULT_WORKER_COUNT, llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE, + consensus_params); } void DashChainstateSetupClose(NodeContext& node) @@ -344,6 +345,7 @@ TestingSetup::TestingSetup(const std::string& chainName, const std::vector