From 06f8423b5ac4661189cd9877889b9c4c7a403477 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 30 Aug 2024 16:39:49 -0500 Subject: [PATCH] Mqtt311Decoder: use ImmutableList.Builder Fewer object allocations during packet parsing this way --- src/TurboMqtt/Protocol/Mqtt311Decoder.cs | 41 +++++++++++++++--------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/TurboMqtt/Protocol/Mqtt311Decoder.cs b/src/TurboMqtt/Protocol/Mqtt311Decoder.cs index 27ed79a7..24c8f38d 100644 --- a/src/TurboMqtt/Protocol/Mqtt311Decoder.cs +++ b/src/TurboMqtt/Protocol/Mqtt311Decoder.cs @@ -19,7 +19,7 @@ public class Mqtt311Decoder private ReadOnlyMemory _remainder = ReadOnlyMemory.Empty; public bool TryDecode(in ReadOnlyMemory additionalData, out ImmutableList packets) - { + { packets = ImmutableList.Empty; var rValue = false; @@ -43,6 +43,8 @@ public bool TryDecode(in ReadOnlyMemory additionalData, out ImmutableList< _remainder = workingBuffer; return false; } + + var packetBuilder = ImmutableList.CreateBuilder(); while (workingBuffer.Length > 0) { @@ -59,13 +61,18 @@ public bool TryDecode(in ReadOnlyMemory additionalData, out ImmutableList< // extract packet size (the packet span will automatically advance past the size header) if (!TryGetPacketLength(ref currentPacket, out packetSize)) - return rValue; // we need more data to decode the packet size + { + packets = packetBuilder.ToImmutable(); + return rValue; + } + // check to see if we have enough data to decode the packet if (currentPacket.Length < packetSize) { // save the remainder _remainder = workingBuffer; + packets = packetBuilder.ToImmutable(); return rValue; } @@ -89,67 +96,67 @@ public bool TryDecode(in ReadOnlyMemory additionalData, out ImmutableList< { case MqttPacketType.Publish: { - packets = packets.Add(DecodePublish(ref bufferForMsg, packetSize, headerLength)); + packetBuilder.Add(DecodePublish(ref bufferForMsg, packetSize, headerLength)); break; } case MqttPacketType.PubAck: { - packets = packets.Add(DecodePubAck(ref bufferForMsg, packetSize, headerLength)); + packetBuilder.Add(DecodePubAck(ref bufferForMsg, packetSize, headerLength)); break; } case MqttPacketType.PubRec: { - packets = packets.Add(DecodePubRec(ref bufferForMsg, packetSize, headerLength)); + packetBuilder.Add(DecodePubRec(ref bufferForMsg, packetSize, headerLength)); break; } case MqttPacketType.PubRel: { - packets = packets.Add(DecodePubRel(ref bufferForMsg, packetSize, headerLength)); + packetBuilder.Add(DecodePubRel(ref bufferForMsg, packetSize, headerLength)); break; } case MqttPacketType.PubComp: { - packets = packets.Add(DecodePubComp(ref bufferForMsg, packetSize, headerLength)); + packetBuilder.Add(DecodePubComp(ref bufferForMsg, packetSize, headerLength)); break; } case MqttPacketType.PingReq: - packets = packets.Add(PingReqPacket.Instance); + packetBuilder.Add(PingReqPacket.Instance); break; case MqttPacketType.PingResp: - packets = packets.Add(PingRespPacket.Instance); + packetBuilder.Add(PingRespPacket.Instance); break; case MqttPacketType.Connect: { - packets = packets.Add(DecodeConnect(ref bufferForMsg, packetSize, headerLength)); + packetBuilder.Add(DecodeConnect(ref bufferForMsg, packetSize, headerLength)); break; } case MqttPacketType.ConnAck: { - packets = packets.Add(DecodeConnAck(ref bufferForMsg, packetSize, headerLength)); + packetBuilder.Add(DecodeConnAck(ref bufferForMsg, packetSize, headerLength)); break; } case MqttPacketType.SubAck: { - packets = packets.Add(DecodeSubAck(ref bufferForMsg, packetSize, headerLength)); + packetBuilder.Add(DecodeSubAck(ref bufferForMsg, packetSize, headerLength)); break; } case MqttPacketType.Subscribe: { - packets = packets.Add(DecodeSubscribe(ref bufferForMsg, packetSize, headerLength)); + packetBuilder.Add(DecodeSubscribe(ref bufferForMsg, packetSize, headerLength)); break; } case MqttPacketType.Unsubscribe: { - packets = packets.Add(DecodeUnsubscribe(ref bufferForMsg, packetSize, headerLength)); + packetBuilder.Add(DecodeUnsubscribe(ref bufferForMsg, packetSize, headerLength)); break; } case MqttPacketType.UnsubAck: { - packets = packets.Add(DecodeUnsubAck(ref bufferForMsg, packetSize, headerLength)); + packetBuilder.Add(DecodeUnsubAck(ref bufferForMsg, packetSize, headerLength)); break; } case MqttPacketType.Disconnect: - packets = packets.Add(DecodeDisconnect(ref bufferForMsg, packetSize, headerLength)); + packetBuilder.Add(DecodeDisconnect(ref bufferForMsg, packetSize, headerLength)); break; case MqttPacketType.Auth: // MQTT 5.0 only - should throw an exception if we see this throw new NotSupportedException("MQTT 5.0 packets are not supported."); @@ -170,6 +177,8 @@ public bool TryDecode(in ReadOnlyMemory additionalData, out ImmutableList< throw new MqttDecoderException($"Error decoding packet of predicted size [{headerLength + packetSize}]", ex, MqttProtocolVersion.V3_1_1, packetType); } } + + packets = packetBuilder.ToImmutable(); return rValue; }