Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .builder/actions/crt_size_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def run(self, env):
# Maximum package size (for current platform) in bytes
# NOTE: if you increase this, you might also need to increase the
# limit in continuous-delivery/pack.sh
max_size = 9_000_000
max_size = 11_000_000
# size of current folder
folder_size = 0
# total size in bytes
Expand Down
112 changes: 112 additions & 0 deletions lib/browser/heap.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

import * as heap from "./heap";

interface ElementType {
timestamp: number;
id: number;
}

function compareElements(lhs: ElementType, rhs: ElementType) {
if (lhs.timestamp < rhs.timestamp) {
return true;
} else if (lhs.timestamp > rhs.timestamp) {
return false;
} else {
return lhs.id < rhs.id;
}
}

test('empty', async () => {
let testHeap : heap.MinHeap<ElementType> = new heap.MinHeap<ElementType>(compareElements);
expect(testHeap.empty()).toBeTruthy();
expect(testHeap.peek()).toBeUndefined();

testHeap.push({timestamp: 0, id: 1});
expect(testHeap.empty()).toBeFalsy();
expect(testHeap.peek()).toBeDefined();

testHeap.pop();
expect(testHeap.empty()).toBeTruthy();
expect(testHeap.peek()).toBeUndefined();
});

function doSimplePushPopTest(testHeap : heap.MinHeap<ElementType>, timestamps: Array<number>) {
let currentId = 1;
for (let timestamp of timestamps) {
testHeap.push({timestamp: timestamp, id: currentId++});
}

let poppedTimestamps : Array<number> = [];
let peekedTimestamps : Array<number> = [];
while (!testHeap.empty()) {
// @ts-ignore
peekedTimestamps.push(testHeap.peek().timestamp);
poppedTimestamps.push(testHeap.pop().timestamp);
}

let sortedTimestamps = timestamps.sort((lhs, rhs) => {
if (lhs < rhs) {
return -1;
} else if (lhs > rhs) {
return 1;
} else {
return 0;
}
});

expect(poppedTimestamps).toEqual(sortedTimestamps);
expect(peekedTimestamps).toEqual(sortedTimestamps);
}

test('reverseOrderPushPop', async () => {
let testHeap : heap.MinHeap<ElementType> = new heap.MinHeap<ElementType>(compareElements);

let timestamps = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0];
doSimplePushPopTest(testHeap, timestamps);
});

test('alternatingOrderPushPop', async () => {
let testHeap : heap.MinHeap<ElementType> = new heap.MinHeap<ElementType>(compareElements);

let timestamps = [10, -10, 9, -9, 8, -8, 7, -7, 6, -6, 5, -5, 4, -4, 3, -3, 2, -2, 1, -1, 0];
doSimplePushPopTest(testHeap, timestamps);
});

test('randomOrderPushPop', async () => {
let testHeap : heap.MinHeap<ElementType> = new heap.MinHeap<ElementType>(compareElements);

let timestamps = [];

for (let i = 0; i < 100; i++) {
timestamps.push(Math.floor(Math.random() * 1000 + .5));
}

doSimplePushPopTest(testHeap, timestamps);
});

test('peek empty', async () => {
let testHeap : heap.MinHeap<ElementType> = new heap.MinHeap<ElementType>(compareElements);

expect(() => { testHeap.pop(); }).toThrow("empty");
});

test('clear', async () => {
let testHeap : heap.MinHeap<ElementType> = new heap.MinHeap<ElementType>(compareElements);

let timestamps = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0];
let currentId = 1;
for (let timestamp of timestamps) {
testHeap.push({timestamp: timestamp, id: currentId++});
}

expect(testHeap.empty()).toBeFalsy();

testHeap.clear();

expect(testHeap.empty()).toBeTruthy();
expect(() => { testHeap.pop(); }).toThrow("empty");
});
122 changes: 122 additions & 0 deletions lib/browser/heap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

import {CrtError} from "./error";

/**
* A generic array-based min-heap implementation for priority queue usage. Used to track operation timeouts.
*/
export class MinHeap<T> {

private heap : Array<T | undefined> = [undefined];
private currentSize: number = 0;

constructor(private lessThanOperator: (lhs: T, rhs: T) => boolean) {
}

/**
* Push a value into the min-heap
*
* @param value value to add
*/
push(value: T) {
this.currentSize++;
this.heap[this.currentSize] = value;
this.heapifyUp(this.currentSize);
}

/**
* Gets the minimum value of the heap. Returns undefined if the heap is empty.
*/
peek() : T | undefined {
if (this.currentSize == 0) {
return undefined;
}

// guaranteed to be a T
return this.heap[1];
}

/**
* Pops the minimum value from the heap. Throws an exception if the heap is empty.
*/
pop() : T {
if (this.empty()) {
throw new CrtError("Heap is empty");
}

let returnValue = this.heap[1];
let lastElement = this.heap[this.currentSize];
this.heap[this.currentSize--] = undefined; // erase the reference; unclear if this is overkill rather than simply removing from an array (or an indexed map for that matter)
this.heap[1] = lastElement;
this.heapifyDown(1);

// @ts-ignore - guaranteed to be a T
return returnValue;
}

/**
* Checks if the min-heap is empty or not
*/
empty() : boolean {
return this.currentSize == 0;
}

/**
* Removes all values from the min-heap
*/
clear() {
this.heap = [undefined];
this.currentSize = 0;
}

private swapElements(index1: number, index2: number) {
let temp = this.heap[index1];
this.heap[index1] = this.heap[index2];
this.heap[index2] = temp;
}

private heapifyDown(startIndex: number) {
let currentIndex : number = startIndex;

while (true) {
let leftIndex = currentIndex << 1;
let rightIndex = leftIndex + 1;
let swapIndex = undefined;

// @ts-ignore - heap elements guaranteed to be a T
if (leftIndex <= this.currentSize && this.lessThanOperator(this.heap[leftIndex], this.heap[currentIndex])) {
swapIndex = leftIndex;
}

// @ts-ignore - heap elements guaranteed to be a T
if (rightIndex <= this.currentSize && this.lessThanOperator(this.heap[rightIndex], this.heap[currentIndex])) {
// @ts-ignore - heap elements guaranteed to be a T
if (!swapIndex || this.lessThanOperator(this.heap[rightIndex], this.heap[leftIndex])) {
// if current is greater than both left and right children, we must swap with the smallest child
// since it will become the parent of both current and the other child
swapIndex = rightIndex;
}
}

if (!swapIndex) {
break;
}

this.swapElements(currentIndex, swapIndex);
currentIndex = swapIndex;
}
}

private heapifyUp(index: number) {
let parentIndex = index >> 1;
// @ts-ignore - heap element guaranteed to be a T
while (parentIndex > 0 && this.lessThanOperator(this.heap[index], this.heap[parentIndex])) {
this.swapElements(index, parentIndex);
index = parentIndex;
parentIndex = parentIndex >> 1;
}
}
}
38 changes: 8 additions & 30 deletions lib/browser/mqtt5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,14 @@ class ReconnectionScheduler {
onConnectionFailureOrDisconnection() : void {
this.clearTasks();

let nextDelay : number = this.calculateNextReconnectDelay();
let reconnectContext = {
retryJitterMode: this.clientConfig.retryJitterMode,
minReconnectDelayMs : this.clientConfig.minReconnectDelayMs,
maxReconnectDelayMs : this.clientConfig.maxReconnectDelayMs,
lastReconnectDelay : this.lastReconnectDelay,
connectionFailureCount : this.connectionFailureCount,
};
let nextDelay : number = mqtt_shared.calculateNextReconnectDelay(reconnectContext);

this.lastReconnectDelay = nextDelay;
this.connectionFailureCount += 1;
Expand Down Expand Up @@ -268,35 +275,6 @@ class ReconnectionScheduler {
clearTimeout(this.resetConnectionFailureCountTask);
}
}

private randomInRange(min: number, max: number) : number {
return min + (max - min) * Math.random();
}

/**
* Computes the next reconnect delay based on the Jitter/Retry configuration.
* Implements jitter calculations in https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
* @private
*/
private calculateNextReconnectDelay() : number {
const jitterType : mqtt5.RetryJitterType = this.clientConfig.retryJitterMode ?? mqtt5.RetryJitterType.Default;
const [minDelay, maxDelay] : [number, number] = mqtt_utils.getOrderedReconnectDelayBounds(this.clientConfig.minReconnectDelayMs, this.clientConfig.maxReconnectDelayMs);
const clampedFailureCount : number = Math.min(52, this.connectionFailureCount);
let delay : number = 0;

if (jitterType == mqtt5.RetryJitterType.None) {
delay = minDelay * Math.pow(2, clampedFailureCount);
} else if (jitterType == mqtt5.RetryJitterType.Decorrelated && this.lastReconnectDelay) {
delay = this.randomInRange(minDelay, 3 * this.lastReconnectDelay);
} else {
delay = this.randomInRange(minDelay, Math.min(maxDelay, minDelay * Math.pow(2, clampedFailureCount)));
}

delay = Math.min(maxDelay, delay);
this.lastReconnectDelay = delay;

return delay;
}
}

/**
Expand Down
8 changes: 6 additions & 2 deletions lib/browser/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import { CrtError } from "./error";
var websocket = require('@httptoolkit/websocket-stream')
import * as Crypto from "crypto-js";
import * as iot_shared from "../common/aws_iot_shared";
import { Duplex } from 'stream';
import * as WebSocket from 'ws';

export type WsStream = Duplex & {socket: WebSocket, on(event: "ws-close", listener: (close: WebSocket.CloseEvent) => void) : WsStream};

/**
* Options for websocket based connections in browser
Expand Down Expand Up @@ -126,7 +130,7 @@ export function create_websocket_url(config: MqttConnectionConfig) {
}

/** @internal */
export function create_websocket_stream(config: MqttConnectionConfig) {
export function create_websocket_stream(config: MqttConnectionConfig) : WsStream {
const url = create_websocket_url(config);
return websocket(url, ['mqttv3.1'], config.websocket);
}
Expand Down Expand Up @@ -182,7 +186,7 @@ export function create_mqtt5_websocket_url(config: mqtt5.Mqtt5ClientConfig) {
}

/** @internal */
export function create_mqtt5_websocket_stream(config: mqtt5.Mqtt5ClientConfig) {
export function create_mqtt5_websocket_stream(config: mqtt5.Mqtt5ClientConfig) : WsStream {
const url = create_mqtt5_websocket_url(config);
let ws = websocket(url, ['mqtt'], config.websocketOptions?.wsOptions);

Expand Down
1 change: 1 addition & 0 deletions lib/common/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export class BufferedEventEmitter extends EventEmitter {
super.emit(event.event, ...event.args);
this.eventQueue = this.eventQueue.next;
}
this.lastQueuedEvent = undefined;
}

/**
Expand Down
32 changes: 31 additions & 1 deletion lib/common/mqtt5_packet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,32 @@ export enum ConnectReasonCode {
*/
Success = 0,

/**
* (MQTT 311 Only) The connection was rejected because the server does not support the requested protocol version.
*/
UnacceptableProtocolVersion311 = 1,

/**
* (MQTT 311 Only) The connection was rejected because the server does not allow the requested client id.
*/
ClientIdRejected311 = 2,

/**
* (MQTT 311 Only) The connection was rejected because the server is not accepting MQTT connections.
*/
ServerUnavailable311 = 3,

/**
* (MQTT 311 Only) The connection was rejected because the server does not accept the data specified in
* the username or password fields.
*/
InvalidUsernameOrPassword311 = 4,

/**
* (MQTT 311 Only) The connection was rejected because the client is not authorized to connect.
*/
NotAuthorized311 = 5,

/**
* Returned when the server has a failure but does not want to specify a reason or none
* of the other reason codes apply.
Expand Down Expand Up @@ -149,7 +175,7 @@ export enum ConnectReasonCode {
* @param reasonCode reason code to check success for
*/
export function isSuccessfulConnectReasonCode(reasonCode: ConnectReasonCode): boolean {
return reasonCode < 128;
return reasonCode == ConnectReasonCode.Success;
}

/**
Expand Down Expand Up @@ -409,6 +435,10 @@ export enum SubackReasonCode {
*/
GrantedQoS2 = 2,

/**
* Generic reason code used in MQTT311 to indicate subscription failure
*/
Failure311 = 128,

/**
* Returned when the connection was closed but the sender does not want to specify a reason or none
Expand Down
Loading
Loading