Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 41 additions & 36 deletions src/Renci.SshNet/ShellStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading;
using System.Text.RegularExpressions;
using Renci.SshNet.Abstractions;
using System.Linq;

namespace Renci.SshNet
{
Expand All @@ -20,7 +21,7 @@ public class ShellStream : Stream
private readonly ISession _session;
private readonly Encoding _encoding;
private readonly int _bufferSize;
private readonly Queue<byte> _incoming;
private readonly Queue<byte[]> _incoming;
private readonly Queue<byte> _outgoing;
private IChannelSession _channel;
private AutoResetEvent _dataReceived = new AutoResetEvent(false);
Expand Down Expand Up @@ -83,7 +84,7 @@ internal ShellStream(ISession session, string terminalName, uint columns, uint r
_encoding = session.ConnectionInfo.Encoding;
_session = session;
_bufferSize = bufferSize;
_incoming = new Queue<byte>();
_incoming = new Queue<byte[]>();
_outgoing = new Queue<byte>();

_channel = _session.CreateChannelSession();
Expand Down Expand Up @@ -219,9 +220,14 @@ public override int Read(byte[] buffer, int offset, int count)

lock (_incoming)
{
for (; i < count && _incoming.Count > 0; i++)
List<byte> chain = new List<byte>();
foreach (var byteArray in _incoming.ToArray())
chain = chain.Concat(byteArray).ToList();

_incoming.Clear();
for (; i < count && chain.Count() > 0; i++)
{
buffer[offset + i] = _incoming.Dequeue();
buffer[offset + i] = chain[i];
}
}

Expand Down Expand Up @@ -308,7 +314,7 @@ public void Expect(TimeSpan timeout, params ExpectAction[] expectActions)
{
if (_incoming.Count > 0)
{
text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count);
text = ReadInternal(_incoming);
}

if (text.Length > 0)
Expand Down Expand Up @@ -422,7 +428,7 @@ public IAsyncResult BeginExpect(TimeSpan timeout, AsyncCallback callback, object

if (_incoming.Count > 0)
{
text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count);
text = ReadInternal(_incoming);
}

if (text.Length > 0)
Expand Down Expand Up @@ -556,7 +562,7 @@ public string Expect(Regex regex, TimeSpan timeout)
{
if (_incoming.Count > 0)
{
text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count);
text = ReadInternal(_incoming);
}

var match = regex.Match(text);
Expand Down Expand Up @@ -617,7 +623,7 @@ public string ReadLine(TimeSpan timeout)
{
if (_incoming.Count > 0)
{
text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count);
text = ReadInternal(_incoming);
}

var index = text.IndexOf(CrLf, StringComparison.Ordinal);
Expand Down Expand Up @@ -662,15 +668,7 @@ public string ReadLine(TimeSpan timeout)
/// </returns>
public string Read()
{
string text;

lock (_incoming)
{
text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count);
_incoming.Clear();
}

return text;
return ReadInternal(_incoming, true);
}

/// <summary>
Expand Down Expand Up @@ -743,6 +741,27 @@ protected override void Dispose(bool disposing)
}
}

/// <summary>
/// Reading bytes as string
/// </summary>
/// <param name="source"></param>
/// <param name="clear"></param>
/// <returns></returns>
private string ReadInternal(Queue<byte[]> source, bool clear = false)
{
IEnumerable<byte> chain = new List<byte>();
lock (source)
{
foreach (var byteArray in source.ToArray())
chain = chain.Concat(byteArray);

if (clear)
source.Clear();
}

return _encoding.GetString(chain.ToArray());
}

/// <summary>
/// Unsubscribes the current <see cref="ShellStream"/> from session events.
/// </summary>
Expand All @@ -761,7 +780,8 @@ private void UnsubscribeFromSessionEvents(ISession session)

private void Session_ErrorOccured(object sender, ExceptionEventArgs e)
{
OnRaiseError(e);
if (ErrorOccurred != null)
ErrorOccurred.Invoke(this, e);
}

private void Session_Disconnected(object sender, EventArgs e)
Expand All @@ -780,32 +800,17 @@ private void Channel_DataReceived(object sender, ChannelDataEventArgs e)
{
lock (_incoming)
{
foreach (var b in e.Data)
_incoming.Enqueue(b);
_incoming.Enqueue(e.Data);
}

if (_dataReceived != null)
_dataReceived.Set();

OnDataReceived(e.Data);
}

private void OnRaiseError(ExceptionEventArgs e)
{
var handler = ErrorOccurred;
if (handler != null)
if (DataReceived != null)
{
handler(this, e);
DataReceived.Invoke(this, new ShellDataEventArgs(e.Data));
}
}

private void OnDataReceived(byte[] data)
{
var handler = DataReceived;
if (handler != null)
{
handler(this, new ShellDataEventArgs(data));
}
}
}
}