From 297be7b7a135f8bf2f54c96afa2b4a71961717fb Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Thu, 31 Oct 2024 06:38:01 -0500 Subject: [PATCH 1/4] CompactMeasurement: simplify, optimize, and add DataPublisher support --- sttp/transport/CompactMeasurement.go | 219 +++++---------------------- sttp/transport/DataSubscriber.go | 8 +- 2 files changed, 44 insertions(+), 183 deletions(-) diff --git a/sttp/transport/CompactMeasurement.go b/sttp/transport/CompactMeasurement.go index ab48e2a..6815c34 100644 --- a/sttp/transport/CompactMeasurement.go +++ b/sttp/transport/CompactMeasurement.go @@ -62,8 +62,6 @@ const ( systemIssueMask StateFlagsEnum = 0xE0000000 calculatedValueMask StateFlagsEnum = 0x00001000 discardedValueMask StateFlagsEnum = 0x00400000 - - fixedLength = 9 ) func (compactFlags compactStateFlagsEnum) mapToFullFlags() StateFlagsEnum { @@ -96,158 +94,20 @@ func (compactFlags compactStateFlagsEnum) mapToFullFlags() StateFlagsEnum { return fullFlags } -func (fullFlags StateFlagsEnum) mapToCompactFlags() compactStateFlagsEnum { - var compactFlags compactStateFlagsEnum - - if (fullFlags & dataRangeMask) > 0 { - compactFlags |= compactStateFlags.DataRange - } - - if (fullFlags & dataQualityMask) > 0 { - compactFlags |= compactStateFlags.DataQuality - } - - if (fullFlags & timeQualityMask) > 0 { - compactFlags |= compactStateFlags.TimeQuality - } - - if (fullFlags & systemIssueMask) > 0 { - compactFlags |= compactStateFlags.SystemIssue - } - - if (fullFlags & calculatedValueMask) > 0 { - compactFlags |= compactStateFlags.CalculatedValue - } - - if (fullFlags & discardedValueMask) > 0 { - compactFlags |= compactStateFlags.DiscardedValue - } - - return compactFlags -} - // CompactMeasurement defines a measured value, in simple compact format, for transmission or reception in STTP. type CompactMeasurement struct { - Measurement - signalIndexCache *SignalIndexCache - baseTimeOffsets *[2]int64 - includeTime bool - useMillisecondResolution bool - timeIndex int32 - usingBaseTimeOffset bool -} - -// NewCompactMeasurement creates a new CompactMeasurement -func NewCompactMeasurement(signalIndexCache *SignalIndexCache, includeTime, useMillisecondResolution bool, baseTimeOffsets *[2]int64) CompactMeasurement { - return CompactMeasurement{ - Measurement: Measurement{}, - signalIndexCache: signalIndexCache, - baseTimeOffsets: baseTimeOffsets, - includeTime: includeTime, - useMillisecondResolution: useMillisecondResolution, - timeIndex: 0, - usingBaseTimeOffset: false, - } -} - -// GetBinaryLength gets the binary byte length of a CompactMeasurement -func (cm *CompactMeasurement) GetBinaryLength() uint32 { - var length uint32 = fixedLength - - if !cm.includeTime { - return length - } - - baseTimeOffset := cm.baseTimeOffsets[cm.timeIndex] - - if baseTimeOffset > 0 { - // See if timestamp will fit within space allowed for active base offset. We cache result so that post call - // to GetBinaryLength, result will speed other subsequent parsing operations by not having to reevaluate. - difference := cm.TimestampValue() - baseTimeOffset - - if difference > 0 { - if cm.useMillisecondResolution { - cm.usingBaseTimeOffset = difference/int64(ticks.PerMillisecond) < math.MaxUint16 - } else { - cm.usingBaseTimeOffset = difference < math.MaxUint32 - } - } else { - cm.usingBaseTimeOffset = false - } - - if cm.usingBaseTimeOffset { - if cm.useMillisecondResolution { - length += 2 // Use two bytes for millisecond resolution timestamp with valid offset - } else { - length += 4 // Use four bytes for tick resolution timestamp with valid offset - } - } else { - length += 8 // Use eight bytes for full fidelity time - } - } else { - // Use eight bytes for full fidelity time - length += 8 - } - - return length -} - -// GetTimestampC2 gets offset compressed millisecond-resolution 2-byte timestamp. -func (cm *CompactMeasurement) GetTimestampC2() uint16 { - return uint16((cm.TimestampValue() - cm.baseTimeOffsets[cm.timeIndex]) / int64(ticks.PerMillisecond)) -} - -// GetTimestampC4 gets offset compressed tick-resolution 4-byte timestamp. -func (cm *CompactMeasurement) GetTimestampC4() uint32 { - return uint32(cm.TimestampValue() - cm.baseTimeOffsets[cm.timeIndex]) -} - -// GetCompactStateFlags gets byte level compact state flags with encoded time index and base time offset bits. -func (cm *CompactMeasurement) GetCompactStateFlags() byte { - // Encode compact state flags - flags := cm.Flags.mapToCompactFlags() - - if cm.timeIndex != 0 { - flags |= compactStateFlags.TimeIndex - } - - if cm.usingBaseTimeOffset { - flags |= compactStateFlags.BaseTimeOffset - } - - return byte(flags) -} - -// SetCompactStateFlags sets byte level compact state flags with encoded time index and base time offset bits. -func (cm *CompactMeasurement) SetCompactStateFlags(value byte) { - // Decode compact state flags - flags := compactStateFlagsEnum(value) - - cm.Flags = flags.mapToFullFlags() - - if (flags & compactStateFlags.TimeIndex) > 0 { - cm.timeIndex = 1 - } else { - cm.timeIndex = 0 - } - - cm.usingBaseTimeOffset = (flags & compactStateFlags.BaseTimeOffset) > 0 -} - -// GetRuntimeID gets the 4-byte run-time signal index for this measurement. -func (cm *CompactMeasurement) GetRuntimeID() int32 { - return cm.signalIndexCache.SignalIndex(cm.SignalID) + Value float32 + Timestamp ticks.Ticks + signalIndex uint32 + flags compactStateFlagsEnum } -// SetRuntimeID assigns CompactMeasurement SignalID (UUID) from the specified signalIndex. -func (cm *CompactMeasurement) SetRuntimeID(signalIndex int32) { - cm.SignalID = cm.signalIndexCache.SignalID(signalIndex) -} +// Constructs a CompactMeasurement from the specified byte buffer; returns the measurement and the number of bytes occupied by this measurement. +func NewCompactMeasurement(includeTime, useMillisecondResolution bool, baseTimeOffsets *[2]int64, buffer []byte) (CompactMeasurement, int, error) { + var cm CompactMeasurement -// Decode parses a CompactMeasurement from the specified byte buffer. -func (cm *CompactMeasurement) Decode(buffer []byte) (int, error) { - if len(buffer) < fixedLength { - return 0, errors.New("not enough buffer available to deserialize compact measurement") + if len(buffer) < 9 { + return cm, 0, errors.New("not enough buffer available to deserialize compact measurement") } // Basic Compact Measurement Format: @@ -257,51 +117,54 @@ func (cm *CompactMeasurement) Decode(buffer []byte) (int, error) { // ID 4 // Value 4 // [Time] 0/2/4/8 - var index int - - // Decode state flags - cm.SetCompactStateFlags(buffer[0]) - index++ - // Decode runtime ID - cm.SetRuntimeID(int32(binary.BigEndian.Uint32(buffer[index:]))) - index += 4 + cm.flags = compactStateFlagsEnum(buffer[0]) + cm.signalIndex = binary.BigEndian.Uint32(buffer[1:5]) + cm.Value = math.Float32frombits(binary.BigEndian.Uint32(buffer[5:9])) - // Decode value - cm.Value = float64(math.Float32frombits(binary.BigEndian.Uint32(buffer[index:]))) - index += 4 - - if !cm.includeTime { - return index, nil + if !includeTime { + return cm, 9, nil } - if cm.usingBaseTimeOffset { - baseTimeOffset := cm.baseTimeOffsets[cm.timeIndex] - - if cm.useMillisecondResolution { + if (cm.flags & compactStateFlags.BaseTimeOffset) != 0 { + timeIndex := (cm.flags & compactStateFlags.TimeIndex) >> 7 + baseTimeOffset := baseTimeOffsets[timeIndex] + if useMillisecondResolution { // Decode 2-byte millisecond offset timestamp if baseTimeOffset > 0 { - cm.Timestamp = ticks.Ticks(baseTimeOffset + int64(binary.BigEndian.Uint16(buffer[index:]))*int64(ticks.PerMillisecond)) + cm.Timestamp = ticks.Ticks(baseTimeOffset + int64(binary.BigEndian.Uint16(buffer[9:11]))*int64(ticks.PerMillisecond)) } - index += 2 + return cm, 11, nil } else { // Decode 4-byte tick offset timestamp if baseTimeOffset > 0 { - cm.Timestamp = ticks.Ticks(baseTimeOffset + int64(binary.BigEndian.Uint32(buffer[index:]))) + cm.Timestamp = ticks.Ticks(baseTimeOffset + int64(binary.BigEndian.Uint32(buffer[9:13]))) } - index += 4 + return cm, 13, nil } } else { // Decode 8-byte full fidelity timestamp // Note that only a full fidelity timestamp can carry leap second flags - cm.Timestamp = ticks.Ticks(binary.BigEndian.Uint64(buffer[index:])) - index += 8 + cm.Timestamp = ticks.Ticks(binary.BigEndian.Uint64(buffer[9:17])) + return cm, 17, nil } +} + +// Compute the full measurement from the compact representation +func (cm *CompactMeasurement) Expand(signalIndexCache *SignalIndexCache) Measurement { + return Measurement{ + SignalID: signalIndexCache.SignalID(int32(cm.signalIndex)), + Timestamp: cm.Timestamp, + Value: float64(cm.Value), + Flags: cm.flags.mapToFullFlags(), + } +} - return index, nil +//// Serializes a CompactMeasurement to a byte buffer for publication to a DataSubscriber. +func (cm *CompactMeasurement) Marshal(b []byte) { + b[0] = byte(cm.flags) + binary.BigEndian.PutUint32(b[1:], cm.signalIndex) + binary.BigEndian.PutUint32(b[5:], math.Float32bits(float32(cm.Value))) + binary.BigEndian.PutUint64(b[9:], uint64(cm.Timestamp)) } -//// Encode serializes a CompactMeasurement to a byte buffer for publication to a DataSubscriber. -//func (cm *CompactMeasurement) Encode(buffer []byte) { -// // TODO: This will be needed by DataPublisher implementation -//} diff --git a/sttp/transport/DataSubscriber.go b/sttp/transport/DataSubscriber.go index 89e94f5..f0691b7 100644 --- a/sttp/transport/DataSubscriber.go +++ b/sttp/transport/DataSubscriber.go @@ -1353,12 +1353,10 @@ func (ds *DataSubscriber) parseCompactMeasurements(signalIndexCache *SignalIndex useMillisecondResolution := ds.subscription.UseMillisecondResolution includeTime := ds.subscription.IncludeTime - index := 0 for i := 0; i < len(measurements); i++ { // Deserialize compact measurement format - compactMeasurement := NewCompactMeasurement(signalIndexCache, includeTime, useMillisecondResolution, &ds.baseTimeOffsets) - bytesDecoded, err := compactMeasurement.Decode(data[index:]) + cm, n, err := NewCompactMeasurement(includeTime, useMillisecondResolution, &ds.baseTimeOffsets, data) if err != nil { ds.dispatchErrorMessage("Failed to parse compact measurements - disconnecting: " + err.Error()) @@ -1366,8 +1364,8 @@ func (ds *DataSubscriber) parseCompactMeasurements(signalIndexCache *SignalIndex return } - index += bytesDecoded - measurements[i] = compactMeasurement.Measurement + data = data[n:] + measurements[i] = cm.Expand(signalIndexCache) } } From 99637c889377f5455706346a57f6b7fa917a3a55 Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Thu, 31 Oct 2024 06:42:21 -0500 Subject: [PATCH 2/4] CompactMeasurement: expose SignalIndex and Flags for DataPublisher use --- sttp/transport/CompactMeasurement.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sttp/transport/CompactMeasurement.go b/sttp/transport/CompactMeasurement.go index 6815c34..38efcc4 100644 --- a/sttp/transport/CompactMeasurement.go +++ b/sttp/transport/CompactMeasurement.go @@ -98,8 +98,8 @@ func (compactFlags compactStateFlagsEnum) mapToFullFlags() StateFlagsEnum { type CompactMeasurement struct { Value float32 Timestamp ticks.Ticks - signalIndex uint32 - flags compactStateFlagsEnum + SignalIndex uint32 + Flags compactStateFlagsEnum } // Constructs a CompactMeasurement from the specified byte buffer; returns the measurement and the number of bytes occupied by this measurement. @@ -118,16 +118,16 @@ func NewCompactMeasurement(includeTime, useMillisecondResolution bool, baseTimeO // Value 4 // [Time] 0/2/4/8 - cm.flags = compactStateFlagsEnum(buffer[0]) - cm.signalIndex = binary.BigEndian.Uint32(buffer[1:5]) + cm.Flags = compactStateFlagsEnum(buffer[0]) + cm.SignalIndex = binary.BigEndian.Uint32(buffer[1:5]) cm.Value = math.Float32frombits(binary.BigEndian.Uint32(buffer[5:9])) if !includeTime { return cm, 9, nil } - if (cm.flags & compactStateFlags.BaseTimeOffset) != 0 { - timeIndex := (cm.flags & compactStateFlags.TimeIndex) >> 7 + if (cm.Flags & compactStateFlags.BaseTimeOffset) != 0 { + timeIndex := (cm.Flags & compactStateFlags.TimeIndex) >> 7 baseTimeOffset := baseTimeOffsets[timeIndex] if useMillisecondResolution { // Decode 2-byte millisecond offset timestamp @@ -153,17 +153,17 @@ func NewCompactMeasurement(includeTime, useMillisecondResolution bool, baseTimeO // Compute the full measurement from the compact representation func (cm *CompactMeasurement) Expand(signalIndexCache *SignalIndexCache) Measurement { return Measurement{ - SignalID: signalIndexCache.SignalID(int32(cm.signalIndex)), + SignalID: signalIndexCache.SignalID(int32(cm.SignalIndex)), Timestamp: cm.Timestamp, Value: float64(cm.Value), - Flags: cm.flags.mapToFullFlags(), + Flags: cm.Flags.mapToFullFlags(), } } //// Serializes a CompactMeasurement to a byte buffer for publication to a DataSubscriber. func (cm *CompactMeasurement) Marshal(b []byte) { - b[0] = byte(cm.flags) - binary.BigEndian.PutUint32(b[1:], cm.signalIndex) + b[0] = byte(cm.Flags) + binary.BigEndian.PutUint32(b[1:], cm.SignalIndex) binary.BigEndian.PutUint32(b[5:], math.Float32bits(float32(cm.Value))) binary.BigEndian.PutUint64(b[9:], uint64(cm.Timestamp)) } From bc90d5e0c965d90098c7b89ee933dfbbf6b209e5 Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Mon, 25 Nov 2024 16:32:38 -0600 Subject: [PATCH 3/4] restore --- sttp/transport/CompactMeasurement.go | 30 ++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/sttp/transport/CompactMeasurement.go b/sttp/transport/CompactMeasurement.go index 38efcc4..9a32ad1 100644 --- a/sttp/transport/CompactMeasurement.go +++ b/sttp/transport/CompactMeasurement.go @@ -94,6 +94,36 @@ func (compactFlags compactStateFlagsEnum) mapToFullFlags() StateFlagsEnum { return fullFlags } +func (fullFlags StateFlagsEnum) mapToCompactFlags() compactStateFlagsEnum { + var compactFlags compactStateFlagsEnum + + if (fullFlags & dataRangeMask) > 0 { + compactFlags |= compactStateFlags.DataRange + } + + if (fullFlags & dataQualityMask) > 0 { + compactFlags |= compactStateFlags.DataQuality + } + + if (fullFlags & timeQualityMask) > 0 { + compactFlags |= compactStateFlags.TimeQuality + } + + if (fullFlags & systemIssueMask) > 0 { + compactFlags |= compactStateFlags.SystemIssue + } + + if (fullFlags & calculatedValueMask) > 0 { + compactFlags |= compactStateFlags.CalculatedValue + } + + if (fullFlags & discardedValueMask) > 0 { + compactFlags |= compactStateFlags.DiscardedValue + } + + return compactFlags +} + // CompactMeasurement defines a measured value, in simple compact format, for transmission or reception in STTP. type CompactMeasurement struct { Value float32 From d05f0e8657bbe78416be97b74d9f5ec8bf1000e4 Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Mon, 25 Nov 2024 16:32:42 -0600 Subject: [PATCH 4/4] go fmt --- sttp/transport/CompactMeasurement.go | 17 ++++++++--------- sttp/transport/SignalIndexCache.go | 14 +++++++------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/sttp/transport/CompactMeasurement.go b/sttp/transport/CompactMeasurement.go index 9a32ad1..c8fc592 100644 --- a/sttp/transport/CompactMeasurement.go +++ b/sttp/transport/CompactMeasurement.go @@ -126,10 +126,10 @@ func (fullFlags StateFlagsEnum) mapToCompactFlags() compactStateFlagsEnum { // CompactMeasurement defines a measured value, in simple compact format, for transmission or reception in STTP. type CompactMeasurement struct { - Value float32 - Timestamp ticks.Ticks - SignalIndex uint32 - Flags compactStateFlagsEnum + Value float32 + Timestamp ticks.Ticks + SignalIndex uint32 + Flags compactStateFlagsEnum } // Constructs a CompactMeasurement from the specified byte buffer; returns the measurement and the number of bytes occupied by this measurement. @@ -183,18 +183,17 @@ func NewCompactMeasurement(includeTime, useMillisecondResolution bool, baseTimeO // Compute the full measurement from the compact representation func (cm *CompactMeasurement) Expand(signalIndexCache *SignalIndexCache) Measurement { return Measurement{ - SignalID: signalIndexCache.SignalID(int32(cm.SignalIndex)), + SignalID: signalIndexCache.SignalID(int32(cm.SignalIndex)), Timestamp: cm.Timestamp, - Value: float64(cm.Value), - Flags: cm.Flags.mapToFullFlags(), + Value: float64(cm.Value), + Flags: cm.Flags.mapToFullFlags(), } } -//// Serializes a CompactMeasurement to a byte buffer for publication to a DataSubscriber. +// // Serializes a CompactMeasurement to a byte buffer for publication to a DataSubscriber. func (cm *CompactMeasurement) Marshal(b []byte) { b[0] = byte(cm.Flags) binary.BigEndian.PutUint32(b[1:], cm.SignalIndex) binary.BigEndian.PutUint32(b[5:], math.Float32bits(float32(cm.Value))) binary.BigEndian.PutUint64(b[9:], uint64(cm.Timestamp)) } - diff --git a/sttp/transport/SignalIndexCache.go b/sttp/transport/SignalIndexCache.go index 305d584..6d5d824 100644 --- a/sttp/transport/SignalIndexCache.go +++ b/sttp/transport/SignalIndexCache.go @@ -36,13 +36,13 @@ import ( // SignalIndexCache maps 32-bit runtime IDs to 128-bit globally unique Measurement IDs. The structure // additionally provides reverse lookup and an extra mapping to human-readable measurement keys. type SignalIndexCache struct { - reference map[int32]uint32 - signalIDList []guid.Guid - sourceList []string - idList []uint64 - signalIDCache map[guid.Guid]int32 - binaryLength uint32 - tsscDecoder *tssc.Decoder + reference map[int32]uint32 + signalIDList []guid.Guid + sourceList []string + idList []uint64 + signalIDCache map[guid.Guid]int32 + binaryLength uint32 + tsscDecoder *tssc.Decoder } // NewSignalIndexCache makes a new SignalIndexCache