Skip to content
20 changes: 13 additions & 7 deletions cmd/es-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,24 @@ func NewConfig(ctx *cli.Context, lg log.Logger) (*node.Config, error) {
return nil, fmt.Errorf("failed to load storage config: %w", err)
}

emailConfig, err := email.GetEmailConfig(ctx)
if err != nil {
lg.Warn("Failed to load email config, email notifications will be disabled.", "error", err)
}
dlConfig := NewDownloaderConfig(ctx)
if emailConfig != nil {
dlConfig.EmailConfig = emailConfig
}
minerConfig, err := NewMinerConfig(ctx, client, storageConfig.L1Contract, storageConfig.Miner, lg)
if err != nil {
return nil, fmt.Errorf("failed to load miner config: %w", err)
}
if minerConfig != nil && minerConfig.EmailEnabled {
if emailConfig == nil {
return nil, fmt.Errorf("email config is required by miner but not loaded")
}
minerConfig.EmailConfig = *emailConfig
}
chainId := new(big.Int).SetUint64(ctx.GlobalUint64(flags.ChainId.Name))
lg.Info("Read chain ID of EthStorage network", "chainID", chainId)
if minerConfig != nil {
Expand Down Expand Up @@ -134,13 +147,6 @@ func NewMinerConfig(ctx *cli.Context, client *ethclient.Client, l1Contract, mine
if err != nil {
return nil, err
}
if minerConfig.EmailEnabled {
emailConfig, err := email.GetEmailConfig(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get email config: %w", err)
}
minerConfig.EmailConfig = *emailConfig
}

cctx := context.Background()
cr := newContractReader(cctx, client, l1Contract, lg)
Expand Down
2 changes: 1 addition & 1 deletion ethstorage/archiver/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (a *APIService) Start(ctx context.Context) error {
return err
}
r := mux.NewRouter()
// Deprecated
// Deprecated by Fusaka but still used by OP Stack
r.HandleFunc("/eth/v1/beacon/blob_sidecars/{id}", a.blobSidecarHandler)
// Fusaka
r.HandleFunc("/eth/v1/beacon/blobs/{id}", a.blobsHandler)
Expand Down
3 changes: 3 additions & 0 deletions ethstorage/downloader/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

package downloader

import "github.com/ethstorage/go-ethstorage/ethstorage/email"

type Config struct {
DownloadStart int64 // which block should we download the blobs from
DownloadDump string // where to dump the download blobs
DownloadThreadNum int // how many threads that will be used to download the blobs into storage file
EmailConfig *email.EmailConfig
}
117 changes: 85 additions & 32 deletions ethstorage/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"math/big"
"os"
"path/filepath"
"slices"
"sync"
"time"

Expand All @@ -22,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"

"github.com/ethstorage/go-ethstorage/ethstorage"
"github.com/ethstorage/go-ethstorage/ethstorage/email"
"github.com/ethstorage/go-ethstorage/ethstorage/eth"
)

Expand Down Expand Up @@ -70,10 +72,11 @@ type Downloader struct {
dlLatestReq chan struct{}
dlFinalizedReq chan struct{}

lg log.Logger
done chan struct{}
wg sync.WaitGroup
mu sync.Mutex
emailConfig *email.EmailConfig
lg log.Logger
done chan struct{}
wg sync.WaitGroup
mu sync.Mutex
}

type blob struct {
Expand Down Expand Up @@ -105,28 +108,27 @@ func NewDownloader(
db ethdb.Database,
sm *ethstorage.StorageManager,
cache BlobCache,
downloadStart int64,
downloadDump string,
minDurationForBlobsRequest uint64,
downloadThreadNum int,
downloadConfig Config,
lg log.Logger,
) *Downloader {
sm.DownloadThreadNum = downloadThreadNum
sm.DownloadThreadNum = downloadConfig.DownloadThreadNum
return &Downloader{
Cache: cache,
l1Source: l1Source,
l1Beacon: l1Beacon,
daClient: daClient,
db: db,
sm: sm,
dumpDir: downloadDump,
dumpDir: downloadConfig.DownloadDump,
minDurationForBlobsRequest: minDurationForBlobsRequest,
dlLatestReq: make(chan struct{}, 1),
dlFinalizedReq: make(chan struct{}, 1),
lg: lg,
done: make(chan struct{}),
lastDownloadBlock: downloadStart,
lastDownloadBlock: downloadConfig.DownloadStart,
downloadedBlobs: 0,
emailConfig: downloadConfig.EmailConfig,
}
}

Expand Down Expand Up @@ -386,33 +388,22 @@ func (s *Downloader) downloadRange(start int64, end int64, toCache bool) ([]blob
)
}

var clBlobs map[common.Hash]eth.Blob
if s.l1Beacon != nil {
clBlobs, err = s.l1Beacon.DownloadBlobs(s.l1Beacon.Timestamp2Slot(elBlock.timestamp))
if err != nil {
s.lg.Error("L1 beacon download blob error", "err", err)
return nil, err
}
} else if s.daClient != nil {
var hashes []common.Hash
for _, blob := range elBlock.blobs {
hashes = append(hashes, blob.hash)
}

clBlobs, err = s.daClient.DownloadBlobs(hashes)
if err != nil {
s.lg.Error("DA client download blob error", "err", err)
return nil, err
}
} else {
return nil, fmt.Errorf("no beacon client or DA client is available")
clBlobs, err := s.downloadBlobsWithRetry(elBlock, 3)
if err != nil {
s.lg.Error("Failed to download blobs for the block after 3 attempts", "block", elBlock.number, "err", err)
// Empty CL blob will be handled later in the EL blob loop
}

for _, elBlob := range elBlock.blobs {
shard := elBlob.kvIndex.Uint64() >> s.sm.KvEntriesBits()
if !slices.Contains(s.sm.Shards(), shard) {
s.lg.Warn("Shard not initialized locally for the kvIndex, skip this blob", "kvIndex", elBlob.kvIndex.Uint64(), "shard", shard)
continue
}
clBlob, exists := clBlobs[elBlob.hash]
if !exists {
s.lg.Error("Did not find the event specified blob in the CL")

s.notifyBlobMissing(elBlock.number, elBlob.kvIndex.Uint64(), elBlob.hash)
s.lg.Crit("Did not find the event specified blob in the CL", "blockNumber", elBlock.number, "kvIndex", elBlob.kvIndex)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to lg.crit here, as it will cost es-node exit
func (l *logger) Crit(msg string, ctx ...interface{}) {
l.Write(LevelCrit, msg, ctx...)
os.Exit(1)
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is precisely what is required by the issue. It is pointless to leave the node running in this situation, and the operator should be informed as soon as possible.

}
// encode blobs so that miner can do sampling directly from cache
elBlob.data = s.sm.EncodeBlob(clBlob.Data, elBlob.hash, elBlob.kvIndex.Uint64(), s.sm.MaxKvSize())
Expand All @@ -434,6 +425,49 @@ func (s *Downloader) downloadRange(start int64, end int64, toCache bool) ([]blob
return blobs, nil
}

func (s *Downloader) downloadBlobsWithRetry(elBlock *blockBlobs, maxAttempts int) (map[common.Hash]eth.Blob, error) {
var lastErr error
for attempt := 1; attempt <= maxAttempts; attempt++ {
clBlobs, err := s.downloadBlobs(elBlock)
if err == nil {
return clBlobs, nil
}
lastErr = err
if attempt < maxAttempts {
time.Sleep(3 * time.Second)
}
}
return nil, lastErr
}

func (s *Downloader) downloadBlobs(elBlock *blockBlobs) (map[common.Hash]eth.Blob, error) {
if s.l1Beacon != nil {
slot := s.l1Beacon.Timestamp2Slot(elBlock.timestamp)
clBlobs, err := s.l1Beacon.DownloadBlobs(slot)
if err != nil {
s.lg.Error("L1 beacon download blob error", "block", elBlock.number, "slot", slot, "err", err)
return nil, err
}
return clBlobs, nil
}

if s.daClient != nil {
hashes := make([]common.Hash, 0, len(elBlock.blobs))
for _, b := range elBlock.blobs {
hashes = append(hashes, b.hash)
}

clBlobs, err := s.daClient.DownloadBlobs(hashes)
if err != nil {
s.lg.Error("DA client download blob error", "err", err)
return nil, err
}
return clBlobs, nil
}

return nil, fmt.Errorf("no beacon client or DA client is available")
}

func (s *Downloader) dumpBlobsIfNeeded(blobs []blob) {
if s.dumpDir != "" {
for _, blob := range blobs {
Expand Down Expand Up @@ -484,3 +518,22 @@ func (s *Downloader) eventsToBlocks(events []types.Log) ([]*blockBlobs, error) {

return blocks, nil
}

func (s *Downloader) notifyBlobMissing(blockNumber uint64, kvIndex uint64, hash common.Hash) {
if s.emailConfig == nil {
return
}

msg := "The downloader couldn't locate the specified blob in the consensus layer. The node is stopped pending resolution. "
msg += "Details from the EL event: \n"
msg += fmt.Sprintf(" - blockNumber: %d\n", blockNumber)
msg += fmt.Sprintf(" - kvIndex: %d\n", kvIndex)
msg += fmt.Sprintf(" - hash: %s\n", hash.Hex())
msg += "This may indicate a potential issue with blob availability on the consensus layer. \n"
email.SendEmail(
"🛑 Fatal Error from es-node: Downloader Failed to Locate Blob in CL",
msg,
*s.emailConfig,
s.lg,
)
}
8 changes: 4 additions & 4 deletions ethstorage/eth/beacon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,25 @@ func (c *BeaconClient) DownloadBlobs(slot uint64) (map[common.Hash]Blob, error)
}
resp, err := http.Get(beaconUrl)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to query beacon blobs with url %s: %w", beaconUrl, err)
}
defer resp.Body.Close()

var blobsResp blobs.BeaconBlobs
if err := json.NewDecoder(resp.Body).Decode(&blobsResp); err != nil {
return nil, err
return nil, fmt.Errorf("failed to decode beacon blobs response from url %s: %w", beaconUrl, err)
}

res := map[common.Hash]Blob{}
for _, beaconBlob := range blobsResp.Data {
// decode hex string to bytes
asciiBytes, err := hex.DecodeString(beaconBlob[2:])
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to decode beacon blob hex string %s: %w", beaconBlob, err)
}
hash, err := blobs.BlobToVersionedHash(asciiBytes)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to compute versioned hash for blob: %w", err)
}
res[hash] = Blob{VersionedHash: hash, Data: asciiBytes}
}
Expand Down
4 changes: 1 addition & 3 deletions ethstorage/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,8 @@ func (n *EsNode) initL2(ctx context.Context, cfg *Config) error {
n.db,
n.storageManager,
n.blobCache,
cfg.Downloader.DownloadStart,
cfg.Downloader.DownloadDump,
cfg.L1.L1MinDurationForBlobsRequest,
cfg.Downloader.DownloadThreadNum,
cfg.Downloader,
n.lg,
)
return nil
Expand Down
Loading