From 129d14dca7f5441d0e07efbe9e9933d93f7b02b9 Mon Sep 17 00:00:00 2001 From: Drake Eidukas Date: Thu, 16 Oct 2025 10:38:01 -0500 Subject: [PATCH] feat: templetize write stream timestamp type --- nominal/core/_stream/write_stream.py | 10 ++++++---- nominal/core/_stream/write_stream_base.py | 12 +++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/nominal/core/_stream/write_stream.py b/nominal/core/_stream/write_stream.py index e9b40828..c37b9b63 100644 --- a/nominal/core/_stream/write_stream.py +++ b/nominal/core/_stream/write_stream.py @@ -32,13 +32,15 @@ def sort_key(cls, item: Self) -> tuple[str, Sequence[tuple[str, str]], str]: return item._to_api_batch_key() -DataStream: TypeAlias = WriteStreamBase[str | float | int] +FlexibleTimestampType = str | datetime | IntegralNanosecondsUTC + +DataStream: TypeAlias = WriteStreamBase[str | float | int, FlexibleTimestampType] """Stream type for asynchronously sending timeseries data to the Nominal backend.""" DataItem: TypeAlias = BatchItem[str | float | int] """Individual item of timeseries data to stream to Nominal.""" -LogStream: TypeAlias = WriteStreamBase[str] +LogStream: TypeAlias = WriteStreamBase[str, str | datetime | IntegralNanosecondsUTC] """Stream type for asynchronously sending log data to the Nominal backend.""" LogItem: TypeAlias = BatchItem[str] @@ -46,7 +48,7 @@ def sort_key(cls, item: Self) -> tuple[str, Sequence[tuple[str, str]], str]: @dataclass(frozen=True) -class WriteStream(WriteStreamBase[StreamType]): +class WriteStream(WriteStreamBase[StreamType, FlexibleTimestampType]): batch_size: int max_wait: timedelta _process_batch: Callable[[Sequence[BatchItem[StreamType]]], None] @@ -92,7 +94,7 @@ def __exit__( def enqueue( self, channel_name: str, - timestamp: str | datetime | IntegralNanosecondsUTC, + timestamp: FlexibleTimestampType, value: StreamType, tags: Mapping[str, str] | None = None, ) -> None: diff --git a/nominal/core/_stream/write_stream_base.py b/nominal/core/_stream/write_stream_base.py index 143b25e0..6ef473b6 100644 --- a/nominal/core/_stream/write_stream_base.py +++ b/nominal/core/_stream/write_stream_base.py @@ -1,18 +1,16 @@ from __future__ import annotations import abc -from datetime import datetime from types import TracebackType from typing import Generic, Mapping, Sequence, Type, TypeVar from typing_extensions import Self -from nominal.ts import IntegralNanosecondsUTC - StreamType = TypeVar("StreamType") +TimestampType = TypeVar("TimestampType") -class WriteStreamBase(abc.ABC, Generic[StreamType]): +class WriteStreamBase(abc.ABC, Generic[StreamType, TimestampType]): @abc.abstractmethod def __enter__(self) -> Self: """Create the stream as a context manager.""" @@ -27,7 +25,7 @@ def __exit__( def enqueue( self, channel_name: str, - timestamp: str | datetime | IntegralNanosecondsUTC, + timestamp: TimestampType, value: StreamType, tags: Mapping[str, str] | None = None, ) -> None: @@ -44,7 +42,7 @@ def enqueue( def enqueue_batch( self, channel_name: str, - timestamps: Sequence[str | datetime | IntegralNanosecondsUTC], + timestamps: Sequence[TimestampType], values: Sequence[StreamType], tags: Mapping[str, str] | None = None, ) -> None: @@ -71,7 +69,7 @@ def enqueue_batch( def enqueue_from_dict( self, - timestamp: str | datetime | IntegralNanosecondsUTC, + timestamp: TimestampType, channel_values: Mapping[str, StreamType], tags: Mapping[str, str] | None = None, ) -> None: