Skip to content

nominal-io/nominal-streaming-go

Repository files navigation

⟢ nominal-streaming-go

Go client for streaming time-series data to Nominal.

go get github.com/nominal-io/nominal-streaming-go

Quick Start

Get a typed stream for a channel and write data points:

ds, _, _ := client.NewDatasetStream(
    context.Background(),
    "ri.nominal.main.dataset.your-dataset-id",
)
defer ds.Close()

temperature := ds.FloatStream("temperature")
temperature.Enqueue(time.Now().UnixNano(), 23.5)
temperature.Enqueue(time.Now().UnixNano(), 24.5)
temperature.Enqueue(time.Now().UnixNano(), 25.5)

Or, write data points with the channel name and values, and the library will delegate to the appropriate stream for you:

ds, _, _ := client.NewDatasetStream(
    context.Background(),
    "ri.nominal.main.dataset.your-dataset-id",
)
defer ds.Close()

ds.EnqueueDynamic("temperature", time.Now().UnixNano(), 23.5)
ds.EnqueueDynamic("temperature", time.Now().UnixNano(), 24.5)
ds.EnqueueDynamic("temperature", time.Now().UnixNano(), 25.5)

Complete Example

package main

import (
    "context"
    "time"

    nominal "github.com/nominal-io/nominal-streaming-go"
)

func main() {
    client, _ := nominal.NewClient("your-api-key")
    defer client.Close()

    ds, errCh, _ := client.NewDatasetStream(
        context.Background(),
        "ri.nominal.main.dataset.your-dataset-id",
    )
    // Closing the stream blocks until all requests have been sent to Nominal
    defer ds.Close()

    // Handle errors from the stream asynchronously
    go func() {
        for err := range errCh {
            log.Printf("Stream error: %v", err)
        }
    }()

    // Get typed data streams
    temp := ds.FloatStream("temperature")
    count := ds.IntStream("count")
    // Optionally with tags..
    status := ds.StringStream("status", nominal.WithTags(nominal.Tags{
        "sensor_id": "A1",
        "location":  "lab",
    }))

    // Add data to streams
    temp.Enqueue(time.Now().UnixNano(), 23.5)
    count.Enqueue(time.Now().UnixNano(), 42)
    status.Enqueue(time.Now().UnixNano(), "OK")

    // Add data without retrieving a channel stream up-front
    ds.EnqueueDynamic("temperature", time.Now().UnixNano(), 23.5)
    ds.EnqueueDynamic("status", time.Now().UnixNano(), "OK", nominal.WithTags(nominal.Tags{
        "sensor_id": "A1",
        "location":  "lab",
    }))
}

About

Go library for low-latency streaming to Nominal Core

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published