From 06ea7de2f84620450eb0ef6f5516049f39e0f4a9 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Fri, 28 Nov 2025 09:55:04 -0600 Subject: [PATCH 1/5] latency monitor --- cmd/cel-shed/latency_monitor.go | 223 ++++++++++++++++++++++++++++++++ cmd/cel-shed/main.go | 2 +- 2 files changed, 224 insertions(+), 1 deletion(-) create mode 100644 cmd/cel-shed/latency_monitor.go diff --git a/cmd/cel-shed/latency_monitor.go b/cmd/cel-shed/latency_monitor.go new file mode 100644 index 000000000..772e73dab --- /dev/null +++ b/cmd/cel-shed/latency_monitor.go @@ -0,0 +1,223 @@ +package main + +import ( + "context" + "fmt" + "math/rand" + "time" + + libshare "github.com/celestiaorg/go-square/v3/share" + "github.com/spf13/cobra" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/metric" + sdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.11.0" + + "github.com/celestiaorg/celestia-node/api/rpc/client" + "github.com/celestiaorg/celestia-node/blob" +) + +var runLatencyMonitorCmd = &cobra.Command{ + Use: "latency-monitor ", + Short: "Runs blob module submission + retrieval latency monitor for celestia-node..", + RunE: func(cmd *cobra.Command, args []string) error { + if len(args) != 4 { + return fmt.Errorf("must provide core.ip, core.port, auth token and metrics.endpoint, only "+ + "got %d arguments", len(args)) + } + + cli, err := buildClient(cmd.Context(), args[0], args[1], args[2]) + if err != nil { + return err + } + + info, err := cli.P2P.Info(cmd.Context()) + if err != nil { + return fmt.Errorf("failed to get node info: %w", err) + } + fmt.Printf("\nConnected to node with ID: %s\n", info.ID) + + // Initialize metrics + metricsEndpoint := args[3] + shutdown, latencyMetrics, err := initializeMetrics(cmd.Context(), metricsEndpoint, info.ID.String()) + if err != nil { + return fmt.Errorf("failed to initialize metrics: %w", err) + } + defer func() { + if err := shutdown(cmd.Context()); err != nil { + fmt.Printf("Error shutting down metrics: %v\n", err) + } + }() + + fmt.Printf("Metrics initialized, exporting to: %s\n", metricsEndpoint) + + runLatencyMonitor(cmd.Context(), cli, latencyMetrics) + return nil + }, +} + +func buildClient(ctx context.Context, ip, port, token string) (*client.Client, error) { + addr := fmt.Sprintf("http://%s:%s", ip, port) + + return client.NewClient(ctx, addr, token) +} + +// latencyMetrics holds the metrics for tracking submission and retrieval latency +type latencyMetrics struct { + submitLatency metric.Float64Histogram + retrieveLatency metric.Float64Histogram + submitFailedTotal metric.Int64Counter + retrieveFailedTotal metric.Int64Counter +} + +// initializeMetrics sets up the OTLP metrics exporter and creates latency metrics +func initializeMetrics( + ctx context.Context, + endpoint, + peerID string, +) (func(context.Context) error, *latencyMetrics, error) { + opts := []otlpmetrichttp.Option{ + otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression), + otlpmetrichttp.WithEndpoint(endpoint), + // Using secure HTTPS connection by default (omit WithInsecure) + } + + exp, err := otlpmetrichttp.New(ctx, opts...) + if err != nil { + return nil, nil, fmt.Errorf("creating OTLP metric exporter: %w", err) + } + + provider := sdk.NewMeterProvider( + sdk.WithReader( + sdk.NewPeriodicReader(exp, + sdk.WithTimeout(10*time.Second), + sdk.WithInterval(10*time.Second))), + sdk.WithResource( + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNamespaceKey.String("latency-monitor"), + semconv.ServiceNameKey.String("cel-shed"), + semconv.ServiceInstanceIDKey.String(peerID), + ))) + + otel.SetMeterProvider(provider) + + // Create meter and metrics + meter := otel.Meter("latency-monitor") + + submitLatency, err := meter.Float64Histogram( + "blob_submit_latency_seconds", + metric.WithDescription("Latency of blob submission operations in s"), + metric.WithUnit("s"), + ) + if err != nil { + return nil, nil, fmt.Errorf("creating submit latency histogram: %w", err) + } + + retrieveLatency, err := meter.Float64Histogram( + "blob_retrieve_latency_seconds", + metric.WithDescription("Latency of blob retrieval operations in s"), + metric.WithUnit("s"), + ) + if err != nil { + return nil, nil, fmt.Errorf("creating retrieve latency histogram: %w", err) + } + + submitTotal, err := meter.Int64Counter( + "blob_failed_submit_total", + metric.WithDescription("Total number of blob failed submission attempts"), + ) + if err != nil { + return nil, nil, fmt.Errorf("creating submit total counter: %w", err) + } + + retrieveTotal, err := meter.Int64Counter( + "blob_failed_retrieve_total", + metric.WithDescription("Total number of blob failed retrieval attempts"), + ) + if err != nil { + return nil, nil, fmt.Errorf("creating retrieve total counter: %w", err) + } + + metrics := &latencyMetrics{ + submitLatency: submitLatency, + retrieveLatency: retrieveLatency, + submitFailedTotal: submitTotal, + retrieveFailedTotal: retrieveTotal, + } + + shutdown := func(ctx context.Context) error { + return provider.Shutdown(ctx) + } + + return shutdown, metrics, nil +} + +func runLatencyMonitor(ctx context.Context, cli *client.Client, metrics *latencyMetrics) { + ns := libshare.RandomBlobNamespace() + fmt.Println("\nUsing namespace: ", ns.String()) + + fmt.Println("\nGenerating blobs...") + + const numBlobs = 10 + libBlobs := make([]*libshare.Blob, numBlobs) + for i := 0; i < numBlobs; i++ { + generated, err := libshare.GenerateV0Blobs([]int{16}, true) // TODO @renaynay: variable size + if err != nil { + panic(fmt.Sprintf("failed to generate blob %d: %v", i, err)) + } + + libBlobs[i] = generated[0] + fmt.Printf("Generated blob %d, actual data length: %d bytes\n", i, len(generated[0].Data())) + } + + blobs, err := blob.ToNodeBlobs(libBlobs...) + if err != nil { + panic(fmt.Sprintf("failed to convert blobs: %s", err.Error())) + } + + for { + select { + case <-ctx.Done(): + return + default: + randBlob := blobs[rand.Intn(len(blobs))] //nolint:gosec + + fmt.Printf("\nSubmitting blob of size %d bytes...", len(randBlob.Data())) + + operationCtx, cancel := context.WithTimeout(ctx, time.Minute) + start := time.Now() + height, err := cli.Blob.Submit(operationCtx, []*blob.Blob{randBlob}, &blob.SubmitOptions{}) + submitDuration := time.Since(start) + cancel() + if err != nil { + fmt.Println("failed to submit blob: ", err) + metrics.submitFailedTotal.Add(ctx, 1) + continue + } + + fmt.Printf("\nSubmitted blob of size %d bytes at height %d\n", len(randBlob.Data()), height) + fmt.Printf("Submission latency: %f s\n", submitDuration.Seconds()) + metrics.submitLatency.Record(ctx, submitDuration.Seconds()) + + operationCtx, cancel = context.WithTimeout(ctx, time.Minute) + start = time.Now() + _, err = cli.Blob.Get(operationCtx, height, randBlob.Namespace(), randBlob.Commitment) + retrieveDuration := time.Since(start) + cancel() + + // Record retrieve metrics + if err != nil { + fmt.Println("failed to retrieve blob: ", err) + metrics.retrieveFailedTotal.Add(ctx, 1) + continue + } + + fmt.Printf("\nGot blob of size %d bytes from height %d\n", len(randBlob.Data()), height) + fmt.Printf("Retrieval latency: %f s\n", retrieveDuration.Seconds()) + metrics.retrieveLatency.Record(ctx, retrieveDuration.Seconds()) + } + } +} diff --git a/cmd/cel-shed/main.go b/cmd/cel-shed/main.go index 7e1c3c816..5cc24153a 100644 --- a/cmd/cel-shed/main.go +++ b/cmd/cel-shed/main.go @@ -10,7 +10,7 @@ import ( ) func init() { - rootCmd.AddCommand(p2pCmd, headerCmd, edsStoreCmd, shwapCmd, squareCmd) + rootCmd.AddCommand(p2pCmd, headerCmd, edsStoreCmd, shwapCmd, squareCmd, runLatencyMonitorCmd) } var rootCmd = &cobra.Command{ From 311acdcdd0f4b2664f98152d0e646e9d9406bec2 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Mon, 1 Dec 2025 07:23:50 -0600 Subject: [PATCH 2/5] DRY metrics --- cmd/cel-shed/latency_monitor.go | 31 +++++------------ libs/utils/telemetry.go | 59 +++++++++++++++++++++++++++++++++ nodebuilder/settings.go | 28 ++++++---------- 3 files changed, 78 insertions(+), 40 deletions(-) create mode 100644 libs/utils/telemetry.go diff --git a/cmd/cel-shed/latency_monitor.go b/cmd/cel-shed/latency_monitor.go index 772e73dab..4f70ec58f 100644 --- a/cmd/cel-shed/latency_monitor.go +++ b/cmd/cel-shed/latency_monitor.go @@ -11,12 +11,10 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/metric" - sdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.11.0" "github.com/celestiaorg/celestia-node/api/rpc/client" "github.com/celestiaorg/celestia-node/blob" + "github.com/celestiaorg/celestia-node/libs/utils" ) var runLatencyMonitorCmd = &cobra.Command{ @@ -78,30 +76,19 @@ func initializeMetrics( endpoint, peerID string, ) (func(context.Context) error, *latencyMetrics, error) { - opts := []otlpmetrichttp.Option{ - otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression), - otlpmetrichttp.WithEndpoint(endpoint), - // Using secure HTTPS connection by default (omit WithInsecure) + cfg := utils.MetricProviderConfig{ + ServiceNamespace: "latency-monitor", + ServiceName: "cel-shed", + ServiceInstanceID: peerID, + Interval: 10 * time.Second, + OTLPOptions: []otlpmetrichttp.Option{otlpmetrichttp.WithEndpoint(endpoint)}, } - exp, err := otlpmetrichttp.New(ctx, opts...) + provider, err := utils.NewMetricProvider(ctx, cfg) if err != nil { - return nil, nil, fmt.Errorf("creating OTLP metric exporter: %w", err) + return nil, nil, err } - provider := sdk.NewMeterProvider( - sdk.WithReader( - sdk.NewPeriodicReader(exp, - sdk.WithTimeout(10*time.Second), - sdk.WithInterval(10*time.Second))), - sdk.WithResource( - resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceNamespaceKey.String("latency-monitor"), - semconv.ServiceNameKey.String("cel-shed"), - semconv.ServiceInstanceIDKey.String(peerID), - ))) - otel.SetMeterProvider(provider) // Create meter and metrics diff --git a/libs/utils/telemetry.go b/libs/utils/telemetry.go new file mode 100644 index 000000000..11dccfec6 --- /dev/null +++ b/libs/utils/telemetry.go @@ -0,0 +1,59 @@ +package utils + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + sdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.11.0" +) + +// MetricProviderConfig holds configuration for creating a metric provider +type MetricProviderConfig struct { + // ServiceNamespace is the service namespace (e.g., network name, "latency-monitor") + ServiceNamespace string + // ServiceName is the service name (e.g., node type, "cel-shed") + ServiceName string + // ServiceInstanceID is the unique instance identifier (e.g., peer ID) + ServiceInstanceID string + // Interval is the interval at which metrics are collected and exported (defaults to 10s) + Interval time.Duration + // OTLPOptions are OTLP HTTP options (e.g., endpoint, headers, TLS config) + OTLPOptions []otlpmetrichttp.Option +} + +// NewMetricProvider creates a new OTLP metric provider with the given configuration +func NewMetricProvider(ctx context.Context, cfg MetricProviderConfig) (*sdk.MeterProvider, error) { + // Build OTLP options with compression enabled + opts := []otlpmetrichttp.Option{otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression)} + opts = append(opts, cfg.OTLPOptions...) + + exp, err := otlpmetrichttp.New(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("creating OTLP metric exporter: %w", err) + } + + // Use default interval if not specified + interval := cfg.Interval + if interval == 0 { + interval = 10 * time.Second + } + + provider := sdk.NewMeterProvider( + sdk.WithReader( + sdk.NewPeriodicReader(exp, + sdk.WithTimeout(interval), + sdk.WithInterval(interval))), + sdk.WithResource( + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNamespaceKey.String(cfg.ServiceNamespace), + semconv.ServiceNameKey.String(cfg.ServiceName), + semconv.ServiceInstanceIDKey.String(cfg.ServiceInstanceID), + ))) + + return provider, nil +} diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index cdfc0feda..174bacf65 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -14,7 +14,6 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" - sdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" tracesdk "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.11.0" @@ -25,6 +24,7 @@ import ( "github.com/celestiaorg/celestia-node/blob" "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/libs/utils" modcore "github.com/celestiaorg/celestia-node/nodebuilder/core" "github.com/celestiaorg/celestia-node/nodebuilder/das" modhead "github.com/celestiaorg/celestia-node/nodebuilder/header" @@ -211,27 +211,19 @@ func initializeMetrics( network p2p.Network, opts []otlpmetrichttp.Option, ) error { - exp, err := otlpmetrichttp.New(ctx, opts...) + cfg := utils.MetricProviderConfig{ + ServiceNamespace: network.String(), + ServiceName: nodeType.String(), + ServiceInstanceID: peerID.String(), + Interval: defaultMetricsCollectInterval, + OTLPOptions: opts, + } + + provider, err := utils.NewMetricProvider(ctx, cfg) if err != nil { return err } - provider := sdk.NewMeterProvider( - sdk.WithReader( - sdk.NewPeriodicReader(exp, - sdk.WithTimeout(defaultMetricsCollectInterval), - sdk.WithInterval(defaultMetricsCollectInterval))), - sdk.WithResource( - resource.NewWithAttributes( - semconv.SchemaURL, - // ServiceNamespaceKey and ServiceNameKey will be concatenated into single attribute with key: - // "job" and value: "%service.namespace%/%service.name%" - semconv.ServiceNamespaceKey.String(network.String()), - semconv.ServiceNameKey.String(nodeType.String()), - // ServiceInstanceIDKey will be exported with key: "instance" - semconv.ServiceInstanceIDKey.String(peerID.String()), - ))) - err = runtime.Start( runtime.WithMinimumReadMemStatsInterval(defaultMetricsCollectInterval), runtime.WithMeterProvider(provider)) From 8a8762e4cce742234d3ace04ec92d9556601dd82 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Mon, 1 Dec 2025 09:59:57 -0600 Subject: [PATCH 3/5] cleanup --- cmd/cel-shed/latency_monitor.go | 1 - cmd/flags_misc.go | 5 +---- libs/utils/telemetry.go | 2 -- 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/cmd/cel-shed/latency_monitor.go b/cmd/cel-shed/latency_monitor.go index 4f70ec58f..59270e4fb 100644 --- a/cmd/cel-shed/latency_monitor.go +++ b/cmd/cel-shed/latency_monitor.go @@ -37,7 +37,6 @@ var runLatencyMonitorCmd = &cobra.Command{ } fmt.Printf("\nConnected to node with ID: %s\n", info.ID) - // Initialize metrics metricsEndpoint := args[3] shutdown, latencyMetrics, err := initializeMetrics(cmd.Context(), metricsEndpoint, info.ID.String()) if err != nil { diff --git a/cmd/flags_misc.go b/cmd/flags_misc.go index 3d5959376..4470c2c7a 100644 --- a/cmd/flags_misc.go +++ b/cmd/flags_misc.go @@ -228,10 +228,7 @@ func ParseMiscFlags(ctx context.Context, cmd *cobra.Command) (context.Context, e } if enableMetrics { - opts := []otlpmetrichttp.Option{ - otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression), - otlpmetrichttp.WithEndpoint(cmd.Flag(metricsEndpointFlag).Value.String()), - } + opts := []otlpmetrichttp.Option{otlpmetrichttp.WithEndpoint(cmd.Flag(metricsEndpointFlag).Value.String())} if ok, err := cmd.Flags().GetBool(metricsTlS); err != nil { panic(err) } else if !ok { diff --git a/libs/utils/telemetry.go b/libs/utils/telemetry.go index 11dccfec6..058df70ef 100644 --- a/libs/utils/telemetry.go +++ b/libs/utils/telemetry.go @@ -27,7 +27,6 @@ type MetricProviderConfig struct { // NewMetricProvider creates a new OTLP metric provider with the given configuration func NewMetricProvider(ctx context.Context, cfg MetricProviderConfig) (*sdk.MeterProvider, error) { - // Build OTLP options with compression enabled opts := []otlpmetrichttp.Option{otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression)} opts = append(opts, cfg.OTLPOptions...) @@ -36,7 +35,6 @@ func NewMetricProvider(ctx context.Context, cfg MetricProviderConfig) (*sdk.Mete return nil, fmt.Errorf("creating OTLP metric exporter: %w", err) } - // Use default interval if not specified interval := cfg.Interval if interval == 0 { interval = 10 * time.Second From d1733fe48a84039602624f0a4042c4558064c9bc Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Mon, 1 Dec 2025 11:58:26 -0600 Subject: [PATCH 4/5] mod tidy --- nodebuilder/tests/tastora/go.mod | 3 +++ nodebuilder/tests/tastora/go.sum | 2 ++ 2 files changed, 5 insertions(+) diff --git a/nodebuilder/tests/tastora/go.mod b/nodebuilder/tests/tastora/go.mod index 12f05ae45..fb9e74b7c 100644 --- a/nodebuilder/tests/tastora/go.mod +++ b/nodebuilder/tests/tastora/go.mod @@ -368,7 +368,10 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 // indirect github.com/Jorropo/jsync v1.0.1 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect github.com/moby/moby v27.5.1+incompatible // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0 // indirect + go.opentelemetry.io/proto/otlp v1.7.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect nhooyr.io/websocket v1.8.17 // indirect pgregory.net/rapid v1.2.0 // indirect diff --git a/nodebuilder/tests/tastora/go.sum b/nodebuilder/tests/tastora/go.sum index 8c865bceb..96be2ff92 100644 --- a/nodebuilder/tests/tastora/go.sum +++ b/nodebuilder/tests/tastora/go.sum @@ -1992,6 +1992,8 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0/go.mod h1: go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0 h1:CIHWikMsN3wO+wq1Tp5VGdVRTcON+DmOJSfDjXypKOc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0/go.mod h1:TNupZ6cxqyFEpLXAZW7On+mLFL0/g0TE3unIYL91xWc= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 h1:Ahq7pZmv87yiyn3jeFz/LekZmPLLdKejuO3NcK9MssM= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0/go.mod h1:MJTqhM0im3mRLw1i8uGHnCvUEeS7VwRyxlLC78PA18M= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0 h1:bDMKF3RUSxshZ5OjOTi8rsHGaPKsAt76FaqgvIUySLc= From 43cd009646a1dd336bbe89484a761933ff77fbbc Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Mon, 1 Dec 2025 12:33:30 -0600 Subject: [PATCH 5/5] lint --- cmd/cel-shed/latency_monitor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/cel-shed/latency_monitor.go b/cmd/cel-shed/latency_monitor.go index 59270e4fb..07f7bb87c 100644 --- a/cmd/cel-shed/latency_monitor.go +++ b/cmd/cel-shed/latency_monitor.go @@ -6,12 +6,13 @@ import ( "math/rand" "time" - libshare "github.com/celestiaorg/go-square/v3/share" "github.com/spf13/cobra" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/metric" + libshare "github.com/celestiaorg/go-square/v3/share" + "github.com/celestiaorg/celestia-node/api/rpc/client" "github.com/celestiaorg/celestia-node/blob" "github.com/celestiaorg/celestia-node/libs/utils"