From aef08b468a766141d3df05be79a93e8082cbd6b0 Mon Sep 17 00:00:00 2001 From: Swaraj Date: Sat, 27 Mar 2021 23:05:55 +0530 Subject: [PATCH] Optimized incoming bytes queue. Release lock asap to let the stream push the data into the queue. --- src/Renci.SshNet/ShellStream.cs | 77 ++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 36 deletions(-) diff --git a/src/Renci.SshNet/ShellStream.cs b/src/Renci.SshNet/ShellStream.cs index 3274fe19c..6dfe9bde5 100644 --- a/src/Renci.SshNet/ShellStream.cs +++ b/src/Renci.SshNet/ShellStream.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Text.RegularExpressions; using Renci.SshNet.Abstractions; +using System.Linq; namespace Renci.SshNet { @@ -20,7 +21,7 @@ public class ShellStream : Stream private readonly ISession _session; private readonly Encoding _encoding; private readonly int _bufferSize; - private readonly Queue _incoming; + private readonly Queue _incoming; private readonly Queue _outgoing; private IChannelSession _channel; private AutoResetEvent _dataReceived = new AutoResetEvent(false); @@ -83,7 +84,7 @@ internal ShellStream(ISession session, string terminalName, uint columns, uint r _encoding = session.ConnectionInfo.Encoding; _session = session; _bufferSize = bufferSize; - _incoming = new Queue(); + _incoming = new Queue(); _outgoing = new Queue(); _channel = _session.CreateChannelSession(); @@ -219,9 +220,14 @@ public override int Read(byte[] buffer, int offset, int count) lock (_incoming) { - for (; i < count && _incoming.Count > 0; i++) + List chain = new List(); + foreach (var byteArray in _incoming.ToArray()) + chain = chain.Concat(byteArray).ToList(); + + _incoming.Clear(); + for (; i < count && chain.Count() > 0; i++) { - buffer[offset + i] = _incoming.Dequeue(); + buffer[offset + i] = chain[i]; } } @@ -308,7 +314,7 @@ public void Expect(TimeSpan timeout, params ExpectAction[] expectActions) { if (_incoming.Count > 0) { - text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); + text = ReadInternal(_incoming); } if (text.Length > 0) @@ -422,7 +428,7 @@ public IAsyncResult BeginExpect(TimeSpan timeout, AsyncCallback callback, object if (_incoming.Count > 0) { - text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); + text = ReadInternal(_incoming); } if (text.Length > 0) @@ -556,7 +562,7 @@ public string Expect(Regex regex, TimeSpan timeout) { if (_incoming.Count > 0) { - text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); + text = ReadInternal(_incoming); } var match = regex.Match(text); @@ -617,7 +623,7 @@ public string ReadLine(TimeSpan timeout) { if (_incoming.Count > 0) { - text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); + text = ReadInternal(_incoming); } var index = text.IndexOf(CrLf, StringComparison.Ordinal); @@ -662,15 +668,7 @@ public string ReadLine(TimeSpan timeout) /// public string Read() { - string text; - - lock (_incoming) - { - text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); - _incoming.Clear(); - } - - return text; + return ReadInternal(_incoming, true); } /// @@ -743,6 +741,27 @@ protected override void Dispose(bool disposing) } } + /// + /// Reading bytes as string + /// + /// + /// + /// + private string ReadInternal(Queue source, bool clear = false) + { + IEnumerable chain = new List(); + lock (source) + { + foreach (var byteArray in source.ToArray()) + chain = chain.Concat(byteArray); + + if (clear) + source.Clear(); + } + + return _encoding.GetString(chain.ToArray()); + } + /// /// Unsubscribes the current from session events. /// @@ -761,7 +780,8 @@ private void UnsubscribeFromSessionEvents(ISession session) private void Session_ErrorOccured(object sender, ExceptionEventArgs e) { - OnRaiseError(e); + if (ErrorOccurred != null) + ErrorOccurred.Invoke(this, e); } private void Session_Disconnected(object sender, EventArgs e) @@ -780,32 +800,17 @@ private void Channel_DataReceived(object sender, ChannelDataEventArgs e) { lock (_incoming) { - foreach (var b in e.Data) - _incoming.Enqueue(b); + _incoming.Enqueue(e.Data); } if (_dataReceived != null) _dataReceived.Set(); - OnDataReceived(e.Data); - } - - private void OnRaiseError(ExceptionEventArgs e) - { - var handler = ErrorOccurred; - if (handler != null) + if (DataReceived != null) { - handler(this, e); + DataReceived.Invoke(this, new ShellDataEventArgs(e.Data)); } } - private void OnDataReceived(byte[] data) - { - var handler = DataReceived; - if (handler != null) - { - handler(this, new ShellDataEventArgs(data)); - } - } } }