diff --git a/cmd/es-node/config.go b/cmd/es-node/config.go index 9c778939..09ecfc55 100644 --- a/cmd/es-node/config.go +++ b/cmd/es-node/config.go @@ -54,11 +54,24 @@ func NewConfig(ctx *cli.Context, lg log.Logger) (*node.Config, error) { return nil, fmt.Errorf("failed to load storage config: %w", err) } + emailConfig, err := email.GetEmailConfig(ctx) + if err != nil { + lg.Warn("Failed to load email config, email notifications will be disabled.", "error", err) + } dlConfig := NewDownloaderConfig(ctx) + if emailConfig != nil { + dlConfig.EmailConfig = emailConfig + } minerConfig, err := NewMinerConfig(ctx, client, storageConfig.L1Contract, storageConfig.Miner, lg) if err != nil { return nil, fmt.Errorf("failed to load miner config: %w", err) } + if minerConfig != nil && minerConfig.EmailEnabled { + if emailConfig == nil { + return nil, fmt.Errorf("email config is required by miner but not loaded") + } + minerConfig.EmailConfig = *emailConfig + } chainId := new(big.Int).SetUint64(ctx.GlobalUint64(flags.ChainId.Name)) lg.Info("Read chain ID of EthStorage network", "chainID", chainId) if minerConfig != nil { @@ -134,13 +147,6 @@ func NewMinerConfig(ctx *cli.Context, client *ethclient.Client, l1Contract, mine if err != nil { return nil, err } - if minerConfig.EmailEnabled { - emailConfig, err := email.GetEmailConfig(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get email config: %w", err) - } - minerConfig.EmailConfig = *emailConfig - } cctx := context.Background() cr := newContractReader(cctx, client, l1Contract, lg) diff --git a/ethstorage/archiver/service.go b/ethstorage/archiver/service.go index 6d702ef5..3462d71c 100644 --- a/ethstorage/archiver/service.go +++ b/ethstorage/archiver/service.go @@ -119,7 +119,7 @@ func (a *APIService) Start(ctx context.Context) error { return err } r := mux.NewRouter() - // Deprecated + // Deprecated by Fusaka but still used by OP Stack r.HandleFunc("/eth/v1/beacon/blob_sidecars/{id}", a.blobSidecarHandler) // Fusaka r.HandleFunc("/eth/v1/beacon/blobs/{id}", a.blobsHandler) diff --git a/ethstorage/downloader/config.go b/ethstorage/downloader/config.go index b094f199..4ce62f44 100644 --- a/ethstorage/downloader/config.go +++ b/ethstorage/downloader/config.go @@ -3,8 +3,11 @@ package downloader +import "github.com/ethstorage/go-ethstorage/ethstorage/email" + type Config struct { DownloadStart int64 // which block should we download the blobs from DownloadDump string // where to dump the download blobs DownloadThreadNum int // how many threads that will be used to download the blobs into storage file + EmailConfig *email.EmailConfig } diff --git a/ethstorage/downloader/downloader.go b/ethstorage/downloader/downloader.go index f93ff1ee..39e37808 100644 --- a/ethstorage/downloader/downloader.go +++ b/ethstorage/downloader/downloader.go @@ -12,6 +12,7 @@ import ( "math/big" "os" "path/filepath" + "slices" "sync" "time" @@ -22,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ethstorage/go-ethstorage/ethstorage" + "github.com/ethstorage/go-ethstorage/ethstorage/email" "github.com/ethstorage/go-ethstorage/ethstorage/eth" ) @@ -70,10 +72,11 @@ type Downloader struct { dlLatestReq chan struct{} dlFinalizedReq chan struct{} - lg log.Logger - done chan struct{} - wg sync.WaitGroup - mu sync.Mutex + emailConfig *email.EmailConfig + lg log.Logger + done chan struct{} + wg sync.WaitGroup + mu sync.Mutex } type blob struct { @@ -105,13 +108,11 @@ func NewDownloader( db ethdb.Database, sm *ethstorage.StorageManager, cache BlobCache, - downloadStart int64, - downloadDump string, minDurationForBlobsRequest uint64, - downloadThreadNum int, + downloadConfig Config, lg log.Logger, ) *Downloader { - sm.DownloadThreadNum = downloadThreadNum + sm.DownloadThreadNum = downloadConfig.DownloadThreadNum return &Downloader{ Cache: cache, l1Source: l1Source, @@ -119,14 +120,15 @@ func NewDownloader( daClient: daClient, db: db, sm: sm, - dumpDir: downloadDump, + dumpDir: downloadConfig.DownloadDump, minDurationForBlobsRequest: minDurationForBlobsRequest, dlLatestReq: make(chan struct{}, 1), dlFinalizedReq: make(chan struct{}, 1), lg: lg, done: make(chan struct{}), - lastDownloadBlock: downloadStart, + lastDownloadBlock: downloadConfig.DownloadStart, downloadedBlobs: 0, + emailConfig: downloadConfig.EmailConfig, } } @@ -386,33 +388,22 @@ func (s *Downloader) downloadRange(start int64, end int64, toCache bool) ([]blob ) } - var clBlobs map[common.Hash]eth.Blob - if s.l1Beacon != nil { - clBlobs, err = s.l1Beacon.DownloadBlobs(s.l1Beacon.Timestamp2Slot(elBlock.timestamp)) - if err != nil { - s.lg.Error("L1 beacon download blob error", "err", err) - return nil, err - } - } else if s.daClient != nil { - var hashes []common.Hash - for _, blob := range elBlock.blobs { - hashes = append(hashes, blob.hash) - } - - clBlobs, err = s.daClient.DownloadBlobs(hashes) - if err != nil { - s.lg.Error("DA client download blob error", "err", err) - return nil, err - } - } else { - return nil, fmt.Errorf("no beacon client or DA client is available") + clBlobs, err := s.downloadBlobsWithRetry(elBlock, 3) + if err != nil { + s.lg.Error("Failed to download blobs for the block after 3 attempts", "block", elBlock.number, "err", err) + // Empty CL blob will be handled later in the EL blob loop } for _, elBlob := range elBlock.blobs { + shard := elBlob.kvIndex.Uint64() >> s.sm.KvEntriesBits() + if !slices.Contains(s.sm.Shards(), shard) { + s.lg.Warn("Shard not initialized locally for the kvIndex, skip this blob", "kvIndex", elBlob.kvIndex.Uint64(), "shard", shard) + continue + } clBlob, exists := clBlobs[elBlob.hash] if !exists { - s.lg.Error("Did not find the event specified blob in the CL") - + s.notifyBlobMissing(elBlock.number, elBlob.kvIndex.Uint64(), elBlob.hash) + s.lg.Crit("Did not find the event specified blob in the CL", "blockNumber", elBlock.number, "kvIndex", elBlob.kvIndex) } // encode blobs so that miner can do sampling directly from cache elBlob.data = s.sm.EncodeBlob(clBlob.Data, elBlob.hash, elBlob.kvIndex.Uint64(), s.sm.MaxKvSize()) @@ -434,6 +425,49 @@ func (s *Downloader) downloadRange(start int64, end int64, toCache bool) ([]blob return blobs, nil } +func (s *Downloader) downloadBlobsWithRetry(elBlock *blockBlobs, maxAttempts int) (map[common.Hash]eth.Blob, error) { + var lastErr error + for attempt := 1; attempt <= maxAttempts; attempt++ { + clBlobs, err := s.downloadBlobs(elBlock) + if err == nil { + return clBlobs, nil + } + lastErr = err + if attempt < maxAttempts { + time.Sleep(3 * time.Second) + } + } + return nil, lastErr +} + +func (s *Downloader) downloadBlobs(elBlock *blockBlobs) (map[common.Hash]eth.Blob, error) { + if s.l1Beacon != nil { + slot := s.l1Beacon.Timestamp2Slot(elBlock.timestamp) + clBlobs, err := s.l1Beacon.DownloadBlobs(slot) + if err != nil { + s.lg.Error("L1 beacon download blob error", "block", elBlock.number, "slot", slot, "err", err) + return nil, err + } + return clBlobs, nil + } + + if s.daClient != nil { + hashes := make([]common.Hash, 0, len(elBlock.blobs)) + for _, b := range elBlock.blobs { + hashes = append(hashes, b.hash) + } + + clBlobs, err := s.daClient.DownloadBlobs(hashes) + if err != nil { + s.lg.Error("DA client download blob error", "err", err) + return nil, err + } + return clBlobs, nil + } + + return nil, fmt.Errorf("no beacon client or DA client is available") +} + func (s *Downloader) dumpBlobsIfNeeded(blobs []blob) { if s.dumpDir != "" { for _, blob := range blobs { @@ -484,3 +518,22 @@ func (s *Downloader) eventsToBlocks(events []types.Log) ([]*blockBlobs, error) { return blocks, nil } + +func (s *Downloader) notifyBlobMissing(blockNumber uint64, kvIndex uint64, hash common.Hash) { + if s.emailConfig == nil { + return + } + + msg := "The downloader couldn't locate the specified blob in the consensus layer. The node is stopped pending resolution. " + msg += "Details from the EL event: \n" + msg += fmt.Sprintf(" - blockNumber: %d\n", blockNumber) + msg += fmt.Sprintf(" - kvIndex: %d\n", kvIndex) + msg += fmt.Sprintf(" - hash: %s\n", hash.Hex()) + msg += "This may indicate a potential issue with blob availability on the consensus layer. \n" + email.SendEmail( + "🛑 Fatal Error from es-node: Downloader Failed to Locate Blob in CL", + msg, + *s.emailConfig, + s.lg, + ) +} diff --git a/ethstorage/eth/beacon_client.go b/ethstorage/eth/beacon_client.go index 880be075..46f74445 100644 --- a/ethstorage/eth/beacon_client.go +++ b/ethstorage/eth/beacon_client.go @@ -77,13 +77,13 @@ func (c *BeaconClient) DownloadBlobs(slot uint64) (map[common.Hash]Blob, error) } resp, err := http.Get(beaconUrl) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to query beacon blobs with url %s: %w", beaconUrl, err) } defer resp.Body.Close() var blobsResp blobs.BeaconBlobs if err := json.NewDecoder(resp.Body).Decode(&blobsResp); err != nil { - return nil, err + return nil, fmt.Errorf("failed to decode beacon blobs response from url %s: %w", beaconUrl, err) } res := map[common.Hash]Blob{} @@ -91,11 +91,11 @@ func (c *BeaconClient) DownloadBlobs(slot uint64) (map[common.Hash]Blob, error) // decode hex string to bytes asciiBytes, err := hex.DecodeString(beaconBlob[2:]) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to decode beacon blob hex string %s: %w", beaconBlob, err) } hash, err := blobs.BlobToVersionedHash(asciiBytes) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to compute versioned hash for blob: %w", err) } res[hash] = Blob{VersionedHash: hash, Data: asciiBytes} } diff --git a/ethstorage/node/node.go b/ethstorage/node/node.go index 4f20bd49..180ba628 100644 --- a/ethstorage/node/node.go +++ b/ethstorage/node/node.go @@ -139,10 +139,8 @@ func (n *EsNode) initL2(ctx context.Context, cfg *Config) error { n.db, n.storageManager, n.blobCache, - cfg.Downloader.DownloadStart, - cfg.Downloader.DownloadDump, cfg.L1.L1MinDurationForBlobsRequest, - cfg.Downloader.DownloadThreadNum, + cfg.Downloader, n.lg, ) return nil