diff --git a/cmd/main/node.go b/cmd/main/node.go index a1d22a3..e4c81bd 100644 --- a/cmd/main/node.go +++ b/cmd/main/node.go @@ -42,6 +42,7 @@ var ( timeoutCommitMs *uint64 consensusSyncMs *uint64 proposerRepetition *uint64 + maxPeerCount *int ) var NodeCmd = &cobra.Command{ @@ -73,6 +74,7 @@ func init() { consensusSyncMs = NodeCmd.Flags().Uint64("consensusSyncMs", 500, "Consensus sync in ms") proposerRepetition = NodeCmd.Flags().Uint64("proposerRepetition", 8, "proposer repetition") + maxPeerCount = NodeCmd.Flags().Int("maxPeerCount", 10, "The maximum number of peers connected") } func runNode(cmd *cobra.Command, args []string) { @@ -191,7 +193,7 @@ func runNode(cmd *cobra.Command, args []string) { bs := NewDefaultBlockStore(db) executor := consensus.NewDefaultBlockExecutor(db) - p2pserver, err := p2p.NewP2PServer(rootCtx, bs, obsvC, sendC, p2pPriv, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel) + p2pserver, err := p2p.NewP2PServer(rootCtx, bs, obsvC, sendC, p2pPriv, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel, *maxPeerCount) go func() { p2pserver.Run(rootCtx) diff --git a/cmd/main/run_4v_mpc.sh b/cmd/main/run_4v_mpc.sh new file mode 100644 index 0000000..8bf8d66 --- /dev/null +++ b/cmd/main/run_4v_mpc.sh @@ -0,0 +1,20 @@ +#!/bin/bash +NODE_ID=${1} +MAX_PEER_COUNT=${2} +rm -rf ./datadir_4v_n${NODE_ID} && \ + +port=$((8999 + ${NODE_ID})) + +./main node \ + --datadir ./datadir_4v_n${NODE_ID} \ + --valKey=val${NODE_ID}.key \ + --nodeKey=node${NODE_ID}.key \ + --genesisTimeMs 1 \ + --validatorSet=0x564D965830b6081506c6de0625F089F751Af134a \ + --validatorSet=0x0a700e9B59d92259C68E50a978c851214916BE52 \ + --validatorSet=0x6cB42599986aF998cF4223D7C839E2dbDeC5f40f \ + --validatorSet=0x817959ec9f31a998Aa7614bCa158f22a9Cf5AE48 \ + --bootstrap /ip4/127.0.0.1/udp/8999/quic/p2p/12D3KooWEZ94qZgJgUNYiLwXahknkniYgozxw5eocijZJkew6Mj5,/ip4/127.0.0.1/udp/9000/quic/p2p/12D3KooWRAPv94qoUn8dAa3NQpZGKjaBcdiaqCETrcuyo2rT2ZvV \ + --port ${port} \ + --maxPeerCount ${MAX_PEER_COUNT} + $@ diff --git a/p2p/conngater.go b/p2p/conngater.go new file mode 100644 index 0000000..7670b4a --- /dev/null +++ b/p2p/conngater.go @@ -0,0 +1,64 @@ +package p2p + +import ( + "fmt" + "github.com/ethereum/go-ethereum/log" + "github.com/libp2p/go-libp2p-core/control" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" + "sync" +) + +type connGater struct { + sync.RWMutex + h host.Host + MaxPeerCount int +} + +func (cg *connGater) getConnectedPeerCount() int { + c := 0 + if cg.h != nil { + for index, p := range cg.h.Network().Peers() { + if cg.h.Network().Connectedness(p) == network.Connected { + log.Debug("Connected Peer List", "index", index, "peer id", p.String()) + c++ + } + } + } + return c +} + +func (cg *connGater) isPeerAtLimit() bool { + cg.RLock() + defer cg.RUnlock() + if cg.getConnectedPeerCount() >= cg.MaxPeerCount { + log.Info(fmt.Sprintf("PeerCount %d exceeds the MaxPeerCount %d.\r\n", len(cg.h.Network().Peers()), cg.MaxPeerCount)) + return true + } + return false +} + +func (cg *connGater) InterceptPeerDial(p peer.ID) (allow bool) { + log.Debug("InterceptPeerDial", "peer id", p) + return cg.h.Network().Connectedness(p) == network.Connected || !cg.isPeerAtLimit() +} + +func (cg *connGater) InterceptAddrDial(p peer.ID, a ma.Multiaddr) (allow bool) { + log.Debug("InterceptAddrDial", "peer id", p) + return cg.h.Network().Connectedness(p) == network.Connected || !cg.isPeerAtLimit() +} + +func (cg *connGater) InterceptAccept(cma network.ConnMultiaddrs) (allow bool) { + return true +} + +func (cg *connGater) InterceptSecured(dir network.Direction, p peer.ID, cma network.ConnMultiaddrs) (allow bool) { + log.Debug("InterceptSecured", "peer id", p) + return cg.h.Network().Connectedness(p) == network.Connected || !cg.isPeerAtLimit() +} + +func (cg *connGater) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) { + return true, 0 +} diff --git a/p2p/p2p.go b/p2p/p2p.go index d70ea00..d1ecf8a 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -10,25 +10,23 @@ import ( "time" "github.com/QuarkChain/go-minimal-pbft/consensus" - "github.com/prometheus/client_golang/prometheus" - - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-quic-transport/integrationtests/stream" - "github.com/multiformats/go-multiaddr" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "github.com/libp2p/go-libp2p" connmgr "github.com/libp2p/go-libp2p-connmgr" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" libp2pquic "github.com/libp2p/go-libp2p-quic-transport" + "github.com/libp2p/go-libp2p-quic-transport/integrationtests/stream" libp2ptls "github.com/libp2p/go-libp2p-tls" + "github.com/multiformats/go-multiaddr" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -318,7 +316,10 @@ func NewP2PServer( bootstrapPeers string, nodeName string, rootCtxCancel context.CancelFunc, + maxPeerCount int, ) (*Server, error) { + cg := &connGater{h: nil, MaxPeerCount: maxPeerCount} + h, err := libp2p.New(ctx, // Use the keypair we generated libp2p.Identity(priv), @@ -331,6 +332,8 @@ func NewP2PServer( fmt.Sprintf("/ip6/::/udp/%d/quic", port), ), + libp2p.ConnectionGater(cg), + // Enable TLS security as the only security protocol. libp2p.Security(libp2ptls.ID, libp2ptls.New), @@ -340,9 +343,9 @@ func NewP2PServer( // Let's prevent our peer from having too many // connections by attaching a connection manager. libp2p.ConnectionManager(connmgr.NewConnManager( - 100, // Lowwater - 400, // HighWater, - time.Minute, // GracePeriod + int(maxPeerCount*3/4), // Lowwater + maxPeerCount, // HighWater, + time.Minute, // GracePeriod )), // Let this host use the DHT to find other hosts @@ -362,6 +365,7 @@ func NewP2PServer( log.Info("Connecting to bootstrap peers", "bootstrap_peers", bootstrapPeers) + cg.h = h // Add our own bootstrap nodes // Count number of successful connection attempts. If we fail to connect to any bootstrap peer, kill