Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions cmd/cel-shed/latency_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package main

import (
"context"
"fmt"
"math/rand"
"time"

"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric"

libshare "github.com/celestiaorg/go-square/v3/share"

"github.com/celestiaorg/celestia-node/api/rpc/client"
"github.com/celestiaorg/celestia-node/blob"
"github.com/celestiaorg/celestia-node/libs/utils"
)

var runLatencyMonitorCmd = &cobra.Command{
Use: "latency-monitor <ip> <port> <token> <metrics_endpoint>",
Short: "Runs blob module submission + retrieval latency monitor for celestia-node..",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 4 {
return fmt.Errorf("must provide core.ip, core.port, auth token and metrics.endpoint, only "+
"got %d arguments", len(args))
}

cli, err := buildClient(cmd.Context(), args[0], args[1], args[2])
if err != nil {
return err
}

info, err := cli.P2P.Info(cmd.Context())
if err != nil {
return fmt.Errorf("failed to get node info: %w", err)
}
fmt.Printf("\nConnected to node with ID: %s\n", info.ID)

metricsEndpoint := args[3]
shutdown, latencyMetrics, err := initializeMetrics(cmd.Context(), metricsEndpoint, info.ID.String())
if err != nil {
return fmt.Errorf("failed to initialize metrics: %w", err)
}
defer func() {
if err := shutdown(cmd.Context()); err != nil {
fmt.Printf("Error shutting down metrics: %v\n", err)
}
}()

fmt.Printf("Metrics initialized, exporting to: %s\n", metricsEndpoint)

runLatencyMonitor(cmd.Context(), cli, latencyMetrics)
return nil
},
}

func buildClient(ctx context.Context, ip, port, token string) (*client.Client, error) {
addr := fmt.Sprintf("http://%s:%s", ip, port)

return client.NewClient(ctx, addr, token)
}

// latencyMetrics holds the metrics for tracking submission and retrieval latency
type latencyMetrics struct {
submitLatency metric.Float64Histogram
retrieveLatency metric.Float64Histogram
submitFailedTotal metric.Int64Counter
retrieveFailedTotal metric.Int64Counter
}

// initializeMetrics sets up the OTLP metrics exporter and creates latency metrics
func initializeMetrics(
ctx context.Context,
endpoint,
peerID string,
) (func(context.Context) error, *latencyMetrics, error) {
cfg := utils.MetricProviderConfig{
ServiceNamespace: "latency-monitor",
ServiceName: "cel-shed",
ServiceInstanceID: peerID,
Interval: 10 * time.Second,
OTLPOptions: []otlpmetrichttp.Option{otlpmetrichttp.WithEndpoint(endpoint)},
}

provider, err := utils.NewMetricProvider(ctx, cfg)
if err != nil {
return nil, nil, err
}

otel.SetMeterProvider(provider)

// Create meter and metrics
meter := otel.Meter("latency-monitor")

submitLatency, err := meter.Float64Histogram(
"blob_submit_latency_seconds",
metric.WithDescription("Latency of blob submission operations in s"),
metric.WithUnit("s"),
)
if err != nil {
return nil, nil, fmt.Errorf("creating submit latency histogram: %w", err)
}

retrieveLatency, err := meter.Float64Histogram(
"blob_retrieve_latency_seconds",
metric.WithDescription("Latency of blob retrieval operations in s"),
metric.WithUnit("s"),
)
if err != nil {
return nil, nil, fmt.Errorf("creating retrieve latency histogram: %w", err)
}

submitTotal, err := meter.Int64Counter(
"blob_failed_submit_total",
metric.WithDescription("Total number of blob failed submission attempts"),
)
if err != nil {
return nil, nil, fmt.Errorf("creating submit total counter: %w", err)
}

retrieveTotal, err := meter.Int64Counter(
"blob_failed_retrieve_total",
metric.WithDescription("Total number of blob failed retrieval attempts"),
)
if err != nil {
return nil, nil, fmt.Errorf("creating retrieve total counter: %w", err)
}

metrics := &latencyMetrics{
submitLatency: submitLatency,
retrieveLatency: retrieveLatency,
submitFailedTotal: submitTotal,
retrieveFailedTotal: retrieveTotal,
}

shutdown := func(ctx context.Context) error {
return provider.Shutdown(ctx)
}

return shutdown, metrics, nil
}

func runLatencyMonitor(ctx context.Context, cli *client.Client, metrics *latencyMetrics) {
ns := libshare.RandomBlobNamespace()
fmt.Println("\nUsing namespace: ", ns.String())

fmt.Println("\nGenerating blobs...")

const numBlobs = 10
libBlobs := make([]*libshare.Blob, numBlobs)
for i := 0; i < numBlobs; i++ {
generated, err := libshare.GenerateV0Blobs([]int{16}, true) // TODO @renaynay: variable size
if err != nil {
panic(fmt.Sprintf("failed to generate blob %d: %v", i, err))
}

libBlobs[i] = generated[0]
fmt.Printf("Generated blob %d, actual data length: %d bytes\n", i, len(generated[0].Data()))
}

blobs, err := blob.ToNodeBlobs(libBlobs...)
if err != nil {
panic(fmt.Sprintf("failed to convert blobs: %s", err.Error()))
}

for {
select {
case <-ctx.Done():
return
default:
randBlob := blobs[rand.Intn(len(blobs))] //nolint:gosec

fmt.Printf("\nSubmitting blob of size %d bytes...", len(randBlob.Data()))

operationCtx, cancel := context.WithTimeout(ctx, time.Minute)
start := time.Now()
height, err := cli.Blob.Submit(operationCtx, []*blob.Blob{randBlob}, &blob.SubmitOptions{})
submitDuration := time.Since(start)
cancel()
if err != nil {
fmt.Println("failed to submit blob: ", err)
metrics.submitFailedTotal.Add(ctx, 1)
continue
}

fmt.Printf("\nSubmitted blob of size %d bytes at height %d\n", len(randBlob.Data()), height)
fmt.Printf("Submission latency: %f s\n", submitDuration.Seconds())
metrics.submitLatency.Record(ctx, submitDuration.Seconds())

operationCtx, cancel = context.WithTimeout(ctx, time.Minute)
start = time.Now()
_, err = cli.Blob.Get(operationCtx, height, randBlob.Namespace(), randBlob.Commitment)
retrieveDuration := time.Since(start)
cancel()

// Record retrieve metrics
if err != nil {
fmt.Println("failed to retrieve blob: ", err)
metrics.retrieveFailedTotal.Add(ctx, 1)
continue
}

fmt.Printf("\nGot blob of size %d bytes from height %d\n", len(randBlob.Data()), height)
fmt.Printf("Retrieval latency: %f s\n", retrieveDuration.Seconds())
metrics.retrieveLatency.Record(ctx, retrieveDuration.Seconds())
}
}
}
2 changes: 1 addition & 1 deletion cmd/cel-shed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func init() {
rootCmd.AddCommand(p2pCmd, headerCmd, edsStoreCmd, shwapCmd, squareCmd)
rootCmd.AddCommand(p2pCmd, headerCmd, edsStoreCmd, shwapCmd, squareCmd, runLatencyMonitorCmd)
}

var rootCmd = &cobra.Command{
Expand Down
5 changes: 1 addition & 4 deletions cmd/flags_misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,7 @@ func ParseMiscFlags(ctx context.Context, cmd *cobra.Command) (context.Context, e
}

if enableMetrics {
opts := []otlpmetrichttp.Option{
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
otlpmetrichttp.WithEndpoint(cmd.Flag(metricsEndpointFlag).Value.String()),
}
opts := []otlpmetrichttp.Option{otlpmetrichttp.WithEndpoint(cmd.Flag(metricsEndpointFlag).Value.String())}
if ok, err := cmd.Flags().GetBool(metricsTlS); err != nil {
panic(err)
} else if !ok {
Expand Down
57 changes: 57 additions & 0 deletions libs/utils/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package utils

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.11.0"
)

// MetricProviderConfig holds configuration for creating a metric provider
type MetricProviderConfig struct {
// ServiceNamespace is the service namespace (e.g., network name, "latency-monitor")
ServiceNamespace string
// ServiceName is the service name (e.g., node type, "cel-shed")
ServiceName string
// ServiceInstanceID is the unique instance identifier (e.g., peer ID)
ServiceInstanceID string
// Interval is the interval at which metrics are collected and exported (defaults to 10s)
Interval time.Duration
// OTLPOptions are OTLP HTTP options (e.g., endpoint, headers, TLS config)
OTLPOptions []otlpmetrichttp.Option
}

// NewMetricProvider creates a new OTLP metric provider with the given configuration
func NewMetricProvider(ctx context.Context, cfg MetricProviderConfig) (*sdk.MeterProvider, error) {
opts := []otlpmetrichttp.Option{otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression)}
opts = append(opts, cfg.OTLPOptions...)

exp, err := otlpmetrichttp.New(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("creating OTLP metric exporter: %w", err)
}

interval := cfg.Interval
if interval == 0 {
interval = 10 * time.Second
}

provider := sdk.NewMeterProvider(
sdk.WithReader(
sdk.NewPeriodicReader(exp,
sdk.WithTimeout(interval),
sdk.WithInterval(interval))),
sdk.WithResource(
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNamespaceKey.String(cfg.ServiceNamespace),
semconv.ServiceNameKey.String(cfg.ServiceName),
semconv.ServiceInstanceIDKey.String(cfg.ServiceInstanceID),
)))

return provider, nil
}
28 changes: 10 additions & 18 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.11.0"
Expand All @@ -25,6 +24,7 @@ import (

"github.com/celestiaorg/celestia-node/blob"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/utils"
modcore "github.com/celestiaorg/celestia-node/nodebuilder/core"
"github.com/celestiaorg/celestia-node/nodebuilder/das"
modhead "github.com/celestiaorg/celestia-node/nodebuilder/header"
Expand Down Expand Up @@ -211,27 +211,19 @@ func initializeMetrics(
network p2p.Network,
opts []otlpmetrichttp.Option,
) error {
exp, err := otlpmetrichttp.New(ctx, opts...)
cfg := utils.MetricProviderConfig{
ServiceNamespace: network.String(),
ServiceName: nodeType.String(),
ServiceInstanceID: peerID.String(),
Interval: defaultMetricsCollectInterval,
OTLPOptions: opts,
}

provider, err := utils.NewMetricProvider(ctx, cfg)
if err != nil {
return err
}

provider := sdk.NewMeterProvider(
sdk.WithReader(
sdk.NewPeriodicReader(exp,
sdk.WithTimeout(defaultMetricsCollectInterval),
sdk.WithInterval(defaultMetricsCollectInterval))),
sdk.WithResource(
resource.NewWithAttributes(
semconv.SchemaURL,
// ServiceNamespaceKey and ServiceNameKey will be concatenated into single attribute with key:
// "job" and value: "%service.namespace%/%service.name%"
semconv.ServiceNamespaceKey.String(network.String()),
semconv.ServiceNameKey.String(nodeType.String()),
// ServiceInstanceIDKey will be exported with key: "instance"
semconv.ServiceInstanceIDKey.String(peerID.String()),
)))

err = runtime.Start(
runtime.WithMinimumReadMemStatsInterval(defaultMetricsCollectInterval),
runtime.WithMeterProvider(provider))
Expand Down
3 changes: 3 additions & 0 deletions nodebuilder/tests/tastora/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,10 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 // indirect
github.com/Jorropo/jsync v1.0.1 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect
github.com/moby/moby v27.5.1+incompatible // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0 // indirect
go.opentelemetry.io/proto/otlp v1.7.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
nhooyr.io/websocket v1.8.17 // indirect
pgregory.net/rapid v1.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions nodebuilder/tests/tastora/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1992,6 +1992,8 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0/go.mod h1:
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0 h1:CIHWikMsN3wO+wq1Tp5VGdVRTcON+DmOJSfDjXypKOc=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0/go.mod h1:TNupZ6cxqyFEpLXAZW7On+mLFL0/g0TE3unIYL91xWc=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 h1:Ahq7pZmv87yiyn3jeFz/LekZmPLLdKejuO3NcK9MssM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0/go.mod h1:MJTqhM0im3mRLw1i8uGHnCvUEeS7VwRyxlLC78PA18M=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0 h1:bDMKF3RUSxshZ5OjOTi8rsHGaPKsAt76FaqgvIUySLc=
Expand Down
Loading