From 3a65681ce0b559618d2e05077ddceb1cf0e0d307 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Mon, 2 Mar 2026 15:04:43 -0800 Subject: [PATCH 1/4] Preliminary fixes and preparation --- lib/browser/heap.spec.ts | 112 ++++++++++++++++++++++++++++++ lib/browser/heap.ts | 122 +++++++++++++++++++++++++++++++++ lib/browser/mqtt5.ts | 38 +++------- lib/browser/ws.ts | 7 +- lib/common/event.ts | 1 + lib/common/mqtt5_packet.ts | 32 ++++++++- lib/common/mqtt_shared.spec.ts | 73 ++++++++++++++++++++ lib/common/mqtt_shared.ts | 102 ++++++++++++++++++++++----- 8 files changed, 437 insertions(+), 50 deletions(-) create mode 100644 lib/browser/heap.spec.ts create mode 100644 lib/browser/heap.ts create mode 100644 lib/common/mqtt_shared.spec.ts diff --git a/lib/browser/heap.spec.ts b/lib/browser/heap.spec.ts new file mode 100644 index 000000000..686856df7 --- /dev/null +++ b/lib/browser/heap.spec.ts @@ -0,0 +1,112 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +import * as heap from "./heap"; + +interface ElementType { + timestamp: number; + id: number; +} + +function compareElements(lhs: ElementType, rhs: ElementType) { + if (lhs.timestamp < rhs.timestamp) { + return true; + } else if (lhs.timestamp > rhs.timestamp) { + return false; + } else { + return lhs.id < rhs.id; + } +} + +test('empty', async () => { + let testHeap : heap.MinHeap = new heap.MinHeap(compareElements); + expect(testHeap.empty()).toBeTruthy(); + expect(testHeap.peek()).toBeUndefined(); + + testHeap.push({timestamp: 0, id: 1}); + expect(testHeap.empty()).toBeFalsy(); + expect(testHeap.peek()).toBeDefined(); + + testHeap.pop(); + expect(testHeap.empty()).toBeTruthy(); + expect(testHeap.peek()).toBeUndefined(); +}); + +function doSimplePushPopTest(testHeap : heap.MinHeap, timestamps: Array) { + let currentId = 1; + for (let timestamp of timestamps) { + testHeap.push({timestamp: timestamp, id: currentId++}); + } + + let poppedTimestamps : Array = []; + let peekedTimestamps : Array = []; + while (!testHeap.empty()) { + // @ts-ignore + peekedTimestamps.push(testHeap.peek().timestamp); + poppedTimestamps.push(testHeap.pop().timestamp); + } + + let sortedTimestamps = timestamps.sort((lhs, rhs) => { + if (lhs < rhs) { + return -1; + } else if (lhs > rhs) { + return 1; + } else { + return 0; + } + }); + + expect(poppedTimestamps).toEqual(sortedTimestamps); + expect(peekedTimestamps).toEqual(sortedTimestamps); +} + +test('reverseOrderPushPop', async () => { + let testHeap : heap.MinHeap = new heap.MinHeap(compareElements); + + let timestamps = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]; + doSimplePushPopTest(testHeap, timestamps); +}); + +test('alternatingOrderPushPop', async () => { + let testHeap : heap.MinHeap = new heap.MinHeap(compareElements); + + let timestamps = [10, -10, 9, -9, 8, -8, 7, -7, 6, -6, 5, -5, 4, -4, 3, -3, 2, -2, 1, -1, 0]; + doSimplePushPopTest(testHeap, timestamps); +}); + +test('randomOrderPushPop', async () => { + let testHeap : heap.MinHeap = new heap.MinHeap(compareElements); + + let timestamps = []; + + for (let i = 0; i < 100; i++) { + timestamps.push(Math.floor(Math.random() * 1000 + .5)); + } + + doSimplePushPopTest(testHeap, timestamps); +}); + +test('peek empty', async () => { + let testHeap : heap.MinHeap = new heap.MinHeap(compareElements); + + expect(() => { testHeap.pop(); }).toThrow("empty"); +}); + +test('clear', async () => { + let testHeap : heap.MinHeap = new heap.MinHeap(compareElements); + + let timestamps = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]; + let currentId = 1; + for (let timestamp of timestamps) { + testHeap.push({timestamp: timestamp, id: currentId++}); + } + + expect(testHeap.empty()).toBeFalsy(); + + testHeap.clear(); + + expect(testHeap.empty()).toBeTruthy(); + expect(() => { testHeap.pop(); }).toThrow("empty"); +}); \ No newline at end of file diff --git a/lib/browser/heap.ts b/lib/browser/heap.ts new file mode 100644 index 000000000..5411101c5 --- /dev/null +++ b/lib/browser/heap.ts @@ -0,0 +1,122 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +import {CrtError} from "./error"; + +/** + * A generic array-based min-heap implementation for priority queue usage. Used to track operation timeouts. + */ +export class MinHeap { + + private heap : Array = [undefined]; + private currentSize: number = 0; + + constructor(private lessThanOperator: (lhs: T, rhs: T) => boolean) { + } + + /** + * Push a value into the min-heap + * + * @param value value to add + */ + push(value: T) { + this.currentSize++; + this.heap[this.currentSize] = value; + this.heapifyUp(this.currentSize); + } + + /** + * Gets the minimum value of the heap. Returns undefined if the heap is empty. + */ + peek() : T | undefined { + if (this.currentSize == 0) { + return undefined; + } + + // guaranteed to be a T + return this.heap[1]; + } + + /** + * Pops the minimum value from the heap. Throws an exception if the heap is empty. + */ + pop() : T { + if (this.empty()) { + throw new CrtError("Heap is empty"); + } + + let returnValue = this.heap[1]; + let lastElement = this.heap[this.currentSize]; + this.heap[this.currentSize--] = undefined; // erase the reference; unclear if this is overkill rather than simply removing from an array (or an indexed map for that matter) + this.heap[1] = lastElement; + this.heapifyDown(1); + + // @ts-ignore - guaranteed to be a T + return returnValue; + } + + /** + * Checks if the min-heap is empty or not + */ + empty() : boolean { + return this.currentSize == 0; + } + + /** + * Removes all values from the min-heap + */ + clear() { + this.heap = [undefined]; + this.currentSize = 0; + } + + private swapElements(index1: number, index2: number) { + let temp = this.heap[index1]; + this.heap[index1] = this.heap[index2]; + this.heap[index2] = temp; + } + + private heapifyDown(startIndex: number) { + let currentIndex : number = startIndex; + + while (true) { + let leftIndex = currentIndex << 1; + let rightIndex = leftIndex + 1; + let swapIndex = undefined; + + // @ts-ignore - heap elements guaranteed to be a T + if (leftIndex <= this.currentSize && this.lessThanOperator(this.heap[leftIndex], this.heap[currentIndex])) { + swapIndex = leftIndex; + } + + // @ts-ignore - heap elements guaranteed to be a T + if (rightIndex <= this.currentSize && this.lessThanOperator(this.heap[rightIndex], this.heap[currentIndex])) { + // @ts-ignore - heap elements guaranteed to be a T + if (!swapIndex || this.lessThanOperator(this.heap[rightIndex], this.heap[leftIndex])) { + // if current is greater than both left and right children, we must swap with the smallest child + // since it will become the parent of both current and the other child + swapIndex = rightIndex; + } + } + + if (!swapIndex) { + break; + } + + this.swapElements(currentIndex, swapIndex); + currentIndex = swapIndex; + } + } + + private heapifyUp(index: number) { + let parentIndex = index >> 1; + // @ts-ignore - heap element guaranteed to be a T + while (parentIndex > 0 && this.lessThanOperator(this.heap[index], this.heap[parentIndex])) { + this.swapElements(index, parentIndex); + index = parentIndex; + parentIndex = parentIndex >> 1; + } + } +} \ No newline at end of file diff --git a/lib/browser/mqtt5.ts b/lib/browser/mqtt5.ts index 1fbdb2c77..d065be3e3 100644 --- a/lib/browser/mqtt5.ts +++ b/lib/browser/mqtt5.ts @@ -239,7 +239,14 @@ class ReconnectionScheduler { onConnectionFailureOrDisconnection() : void { this.clearTasks(); - let nextDelay : number = this.calculateNextReconnectDelay(); + let reconnectContext = { + retryJitterMode: this.clientConfig.retryJitterMode, + minReconnectDelayMs : this.clientConfig.minReconnectDelayMs, + maxReconnectDelayMs : this.clientConfig.maxReconnectDelayMs, + lastReconnectDelay : this.lastReconnectDelay, + connectionFailureCount : this.connectionFailureCount, + }; + let nextDelay : number = mqtt_shared.calculateNextReconnectDelay(reconnectContext); this.lastReconnectDelay = nextDelay; this.connectionFailureCount += 1; @@ -268,35 +275,6 @@ class ReconnectionScheduler { clearTimeout(this.resetConnectionFailureCountTask); } } - - private randomInRange(min: number, max: number) : number { - return min + (max - min) * Math.random(); - } - - /** - * Computes the next reconnect delay based on the Jitter/Retry configuration. - * Implements jitter calculations in https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ - * @private - */ - private calculateNextReconnectDelay() : number { - const jitterType : mqtt5.RetryJitterType = this.clientConfig.retryJitterMode ?? mqtt5.RetryJitterType.Default; - const [minDelay, maxDelay] : [number, number] = mqtt_utils.getOrderedReconnectDelayBounds(this.clientConfig.minReconnectDelayMs, this.clientConfig.maxReconnectDelayMs); - const clampedFailureCount : number = Math.min(52, this.connectionFailureCount); - let delay : number = 0; - - if (jitterType == mqtt5.RetryJitterType.None) { - delay = minDelay * Math.pow(2, clampedFailureCount); - } else if (jitterType == mqtt5.RetryJitterType.Decorrelated && this.lastReconnectDelay) { - delay = this.randomInRange(minDelay, 3 * this.lastReconnectDelay); - } else { - delay = this.randomInRange(minDelay, Math.min(maxDelay, minDelay * Math.pow(2, clampedFailureCount))); - } - - delay = Math.min(maxDelay, delay); - this.lastReconnectDelay = delay; - - return delay; - } } /** diff --git a/lib/browser/ws.ts b/lib/browser/ws.ts index 50004ac94..91446143f 100644 --- a/lib/browser/ws.ts +++ b/lib/browser/ws.ts @@ -181,8 +181,13 @@ export function create_mqtt5_websocket_url(config: mqtt5.Mqtt5ClientConfig) { throw new URIError(`Invalid url factory requested: ${urlFactory}`); } +import { Duplex } from 'stream'; +import * as WebSocket from 'ws'; + +export type WsStream = Duplex & {socket: WebSocket, on(event: "ws-close", listener: (close: WebSocket.CloseEvent) => void) : WsStream}; + /** @internal */ -export function create_mqtt5_websocket_stream(config: mqtt5.Mqtt5ClientConfig) { +export function create_mqtt5_websocket_stream(config: mqtt5.Mqtt5ClientConfig) : WsStream { const url = create_mqtt5_websocket_url(config); let ws = websocket(url, ['mqtt'], config.websocketOptions?.wsOptions); diff --git a/lib/common/event.ts b/lib/common/event.ts index 1317bb4c1..ed5e7e8f6 100644 --- a/lib/common/event.ts +++ b/lib/common/event.ts @@ -70,6 +70,7 @@ export class BufferedEventEmitter extends EventEmitter { super.emit(event.event, ...event.args); this.eventQueue = this.eventQueue.next; } + this.lastQueuedEvent = undefined; } /** diff --git a/lib/common/mqtt5_packet.ts b/lib/common/mqtt5_packet.ts index 9dcc762c4..bc3365378 100644 --- a/lib/common/mqtt5_packet.ts +++ b/lib/common/mqtt5_packet.ts @@ -30,6 +30,32 @@ export enum ConnectReasonCode { */ Success = 0, + /** + * (MQTT 311 Only) The connection was rejected because the server does not support the requested protocol version. + */ + UnacceptableProtocolVersion311 = 1, + + /** + * (MQTT 311 Only) The connection was rejected because the server does not allow the requested client id. + */ + ClientIdRejected311 = 2, + + /** + * (MQTT 311 Only) The connection was rejected because the server is not accepting MQTT connections. + */ + ServerUnavailable311 = 3, + + /** + * (MQTT 311 Only) The connection was rejected because the server does not accept the data specified in + * the username or password fields. + */ + InvalidUsernameOrPassword311 = 4, + + /** + * (MQTT 311 Only) The connection was rejected because the client is not authorized to connect. + */ + NotAuthorized311 = 5, + /** * Returned when the server has a failure but does not want to specify a reason or none * of the other reason codes apply. @@ -149,7 +175,7 @@ export enum ConnectReasonCode { * @param reasonCode reason code to check success for */ export function isSuccessfulConnectReasonCode(reasonCode: ConnectReasonCode): boolean { - return reasonCode < 128; + return reasonCode == ConnectReasonCode.Success; } /** @@ -409,6 +435,10 @@ export enum SubackReasonCode { */ GrantedQoS2 = 2, + /** + * Generic reason code used in MQTT311 to indicate subscription failure + */ + Failure311 = 128, /** * Returned when the connection was closed but the sender does not want to specify a reason or none diff --git a/lib/common/mqtt_shared.spec.ts b/lib/common/mqtt_shared.spec.ts new file mode 100644 index 000000000..383e7f856 --- /dev/null +++ b/lib/common/mqtt_shared.spec.ts @@ -0,0 +1,73 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +import * as mqtt_shared from "./mqtt_shared"; + +test('MQTT topic properties - valid topic filter', async () => { + expect(mqtt_shared.computeTopicProperties("a/b/c", true).isValid).toEqual(true); + expect(mqtt_shared.computeTopicProperties("#", true).isValid).toEqual(true); + expect(mqtt_shared.computeTopicProperties("/#", true).isValid).toEqual(true); + expect(mqtt_shared.computeTopicProperties("sports/basketball/#", true).isValid).toEqual(true); + expect(mqtt_shared.computeTopicProperties("+", true).isValid).toEqual(true); + expect(mqtt_shared.computeTopicProperties("/+", true).isValid).toEqual(true); + expect(mqtt_shared.computeTopicProperties("+/a", true).isValid).toEqual(true); + expect(mqtt_shared.computeTopicProperties("+/basketball/#", true).isValid).toEqual(true); + expect(mqtt_shared.computeTopicProperties("washington/+/player1", true).isValid).toEqual(true); +}); + +test('MQTT topic properties - invalid topic filter', async () => { + expect(mqtt_shared.computeTopicProperties("", true).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("derp+", true).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("derp+/", true).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("derp#/", true).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("#/a", true).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("sport/basketball#", true).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("sport/basketball/#/ranking", true).isValid).toEqual(false); +}); + +test('MQTT topic properties - shared filter', async () => { + expect(mqtt_shared.computeTopicProperties("$share/b//", true).isShared).toEqual(true); + expect(mqtt_shared.computeTopicProperties("$share/a/b", true).isShared).toEqual(true); + expect(mqtt_shared.computeTopicProperties("$share/a/b/c", true).isShared).toEqual(true); +}); + +test('MQTT topic properties - not shared filter', async () => { + expect(mqtt_shared.computeTopicProperties("a/b/c", true).isShared).toEqual(false); + expect(mqtt_shared.computeTopicProperties("$share//c", true).isShared).toEqual(false); + expect(mqtt_shared.computeTopicProperties("$share/a", true).isShared).toEqual(false); + expect(mqtt_shared.computeTopicProperties("$share/+/a", true).isShared).toEqual(false); + expect(mqtt_shared.computeTopicProperties("$share/#/a", true).isShared).toEqual(false); + expect(mqtt_shared.computeTopicProperties("$share/b/", true).isShared).toEqual(false); +}); + +test('MQTT topic properties - has wildcard', async () => { + expect(mqtt_shared.computeTopicProperties("#", true).hasWildcard).toEqual(true); + expect(mqtt_shared.computeTopicProperties("+", true).hasWildcard).toEqual(true); + expect(mqtt_shared.computeTopicProperties("a/+/+", true).hasWildcard).toEqual(true); + expect(mqtt_shared.computeTopicProperties("a/b/#", true).hasWildcard).toEqual(true); +}); + +test('MQTT topic properties - does not have wildcard', async () => { + expect(mqtt_shared.computeTopicProperties("a/b/c", true).hasWildcard).toEqual(false); + expect(mqtt_shared.computeTopicProperties("/", true).hasWildcard).toEqual(false); +}); + +test('MQTT topic properties - valid topic', async () => { + expect(mqtt_shared.computeTopicProperties("a/b/c", false).isValid).toEqual(true); + expect(mqtt_shared.computeTopicProperties("/", false).isValid).toEqual(true); + expect(mqtt_shared.computeTopicProperties("///a", false).isValid).toEqual(true); +}); + +test('MQTT topic properties - invalid topic', async () => { + expect(mqtt_shared.computeTopicProperties("", false).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("#", false).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("/#", false).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("sports/basketball/#", false).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("+", false).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("/+", false).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("+/a", false).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("+/basketball/#", false).isValid).toEqual(false); + expect(mqtt_shared.computeTopicProperties("washington/+/player1", false).isValid).toEqual(false); +}); \ No newline at end of file diff --git a/lib/common/mqtt_shared.ts b/lib/common/mqtt_shared.ts index 30e995521..34880657d 100644 --- a/lib/common/mqtt_shared.ts +++ b/lib/common/mqtt_shared.ts @@ -8,6 +8,9 @@ */ +import * as mqtt5 from "./mqtt5"; +import * as mqtt_utils from "../browser/mqtt5_utils"; + /** * Converts payload to Buffer or string regardless of the supplied type * @param payload The payload to convert @@ -61,46 +64,75 @@ export function normalize_payload_to_buffer(payload: any): Buffer { /** @internal */ export const DEFAULT_KEEP_ALIVE : number = 1200; +export interface TopicProperties { + isValid: boolean; + isShared: boolean; + hasWildcard: boolean; +} + +export function computeTopicProperties(topic: string, isFilter: boolean) : TopicProperties { + let properties : TopicProperties = { + isValid: false, + isShared: false, + hasWildcard: false + }; -function isValidTopicInternal(topic: string, isFilter: boolean) : boolean { - if (topic.length === 0 || topic.length > 65535) { - return false; + if (topic.length === 0) { + return properties; } + let hasSharePrefix : boolean = false; + let hasShareName : boolean = false; let sawHash : boolean = false; + let index : number = 0; for (let segment of topic.split('/')) { if (sawHash) { - return false; - } - - if (segment.length === 0) { - continue; + return properties; } if (segment.includes("+")) { if (!isFilter) { - return false; + return properties; } if (segment.length > 1) { - return false; + return properties; } + + properties.hasWildcard = true; } if (segment.includes("#")) { if (!isFilter) { - return false; + return properties; } if (segment.length > 1) { - return false; + return properties; } + properties.hasWildcard = true; sawHash = true; } + + if (index == 0 && segment === "$share") { + hasSharePrefix = true; + } + + if (index == 1 && hasSharePrefix && segment.length > 0 && !properties.hasWildcard) { + hasShareName = true; + } + + if (hasShareName && ((index == 2 && segment.length > 0) || index > 2)) { + properties.isShared = true; + } + + index += 1; } - return true; + properties.isValid = true; + + return properties; } export function isValidTopicFilter(topicFilter: any) : boolean { @@ -108,9 +140,8 @@ export function isValidTopicFilter(topicFilter: any) : boolean { return false; } - let topicFilterAsString = topicFilter as string; - - return isValidTopicInternal(topicFilterAsString, true); + let properties = computeTopicProperties(topicFilter as string, true); + return properties.isValid; } export function isValidTopic(topic: any) : boolean { @@ -118,7 +149,42 @@ export function isValidTopic(topic: any) : boolean { return false; } - let topicAsString = topic as string; + let properties = computeTopicProperties(topic as string, false); + return properties.isValid; +} + +function randomInRange(min: number, max: number) : number { + return min + (max - min) * Math.random(); +} + +export interface ReconnectDelayContext { + retryJitterMode?: mqtt5.RetryJitterType, + minReconnectDelayMs? : number, + maxReconnectDelayMs? : number, + lastReconnectDelay? : number, + connectionFailureCount : number, +} + +/** + * Computes the next reconnect delay based on the Jitter/Retry configuration. + * Implements jitter calculations in https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + * @private + */ +export function calculateNextReconnectDelay(context: ReconnectDelayContext) : number { + const jitterType : mqtt5.RetryJitterType = context.retryJitterMode ?? mqtt5.RetryJitterType.Default; + const [minDelay, maxDelay] : [number, number] = mqtt_utils.getOrderedReconnectDelayBounds(context.minReconnectDelayMs, context.maxReconnectDelayMs); + const clampedFailureCount : number = Math.min(52, context.connectionFailureCount); + let delay : number = 0; + + if (jitterType == mqtt5.RetryJitterType.None) { + delay = minDelay * Math.pow(2, clampedFailureCount); + } else if (jitterType == mqtt5.RetryJitterType.Decorrelated && context.lastReconnectDelay) { + delay = randomInRange(minDelay, 3 * context.lastReconnectDelay); + } else { + delay = randomInRange(minDelay, Math.min(maxDelay, minDelay * Math.pow(2, clampedFailureCount))); + } + + delay = Math.min(maxDelay, delay); - return isValidTopicInternal(topicAsString, false); + return delay; } \ No newline at end of file From 8a90ddd94ba17aab8ef071391706b0586510a996 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Tue, 3 Mar 2026 09:10:58 -0800 Subject: [PATCH 2/4] Update size check --- .builder/actions/crt_size_check.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.builder/actions/crt_size_check.py b/.builder/actions/crt_size_check.py index 8d9cfeb16..646652d85 100644 --- a/.builder/actions/crt_size_check.py +++ b/.builder/actions/crt_size_check.py @@ -11,7 +11,7 @@ def run(self, env): # Maximum package size (for current platform) in bytes # NOTE: if you increase this, you might also need to increase the # limit in continuous-delivery/pack.sh - max_size = 9_000_000 + max_size = 10_000_000 # size of current folder folder_size = 0 # total size in bytes From f8cd025424c8e1d9b70bcf70487372ba9b7da317 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Fri, 6 Mar 2026 12:23:15 -0800 Subject: [PATCH 3/4] Fixes and updates from 311 integration --- lib/browser/ws.ts | 11 +++++------ lib/common/mqtt_shared.ts | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/lib/browser/ws.ts b/lib/browser/ws.ts index 91446143f..766b107ff 100644 --- a/lib/browser/ws.ts +++ b/lib/browser/ws.ts @@ -19,6 +19,10 @@ import { CrtError } from "./error"; var websocket = require('@httptoolkit/websocket-stream') import * as Crypto from "crypto-js"; import * as iot_shared from "../common/aws_iot_shared"; +import { Duplex } from 'stream'; +import * as WebSocket from 'ws'; + +export type WsStream = Duplex & {socket: WebSocket, on(event: "ws-close", listener: (close: WebSocket.CloseEvent) => void) : WsStream}; /** * Options for websocket based connections in browser @@ -126,7 +130,7 @@ export function create_websocket_url(config: MqttConnectionConfig) { } /** @internal */ -export function create_websocket_stream(config: MqttConnectionConfig) { +export function create_websocket_stream(config: MqttConnectionConfig) : WsStream { const url = create_websocket_url(config); return websocket(url, ['mqttv3.1'], config.websocket); } @@ -181,11 +185,6 @@ export function create_mqtt5_websocket_url(config: mqtt5.Mqtt5ClientConfig) { throw new URIError(`Invalid url factory requested: ${urlFactory}`); } -import { Duplex } from 'stream'; -import * as WebSocket from 'ws'; - -export type WsStream = Duplex & {socket: WebSocket, on(event: "ws-close", listener: (close: WebSocket.CloseEvent) => void) : WsStream}; - /** @internal */ export function create_mqtt5_websocket_stream(config: mqtt5.Mqtt5ClientConfig) : WsStream { const url = create_mqtt5_websocket_url(config); diff --git a/lib/common/mqtt_shared.ts b/lib/common/mqtt_shared.ts index 34880657d..2144a4e43 100644 --- a/lib/common/mqtt_shared.ts +++ b/lib/common/mqtt_shared.ts @@ -55,7 +55,7 @@ export function normalize_payload_to_buffer(payload: any): Buffer { let normalized = normalize_payload(payload); if (typeof normalized === 'string') { // pass string through - return Buffer.from(normalized); + return Buffer.from(new TextEncoder().encode(normalized).buffer); } return normalized; From 97e974c3cf830a40d8e81bf6042222972e8182b9 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Mon, 9 Mar 2026 12:58:29 -0700 Subject: [PATCH 4/4] Update size check --- .builder/actions/crt_size_check.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.builder/actions/crt_size_check.py b/.builder/actions/crt_size_check.py index 646652d85..7379dd35b 100644 --- a/.builder/actions/crt_size_check.py +++ b/.builder/actions/crt_size_check.py @@ -11,7 +11,7 @@ def run(self, env): # Maximum package size (for current platform) in bytes # NOTE: if you increase this, you might also need to increase the # limit in continuous-delivery/pack.sh - max_size = 10_000_000 + max_size = 11_000_000 # size of current folder folder_size = 0 # total size in bytes