-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathThreadMarshallingZusiDataReceiver.cs
More file actions
62 lines (51 loc) · 1.79 KB
/
ThreadMarshallingZusiDataReceiver.cs
File metadata and controls
62 lines (51 loc) · 1.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
using System;
using System.Threading;
using System.Threading.Tasks;
using ZusiTcpInterface.TypeDescriptors;
namespace ZusiTcpInterface
{
public class ThreadMarshallingZusiDataReceiver : CallbackBasedZusiDataReceiverBase, IDisposable
{
private readonly IBlockingCollection<DataChunkBase> _blockingCollection;
private readonly SynchronizationContext _synchronizationContext;
private Task _marshallingTask;
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private bool _disposed;
public ThreadMarshallingZusiDataReceiver(DescriptorCollection descriptors, IBlockingCollection<DataChunkBase> blockingCollection, SynchronizationContext synchronizationContext = null) : base(descriptors)
{
_blockingCollection = blockingCollection;
_synchronizationContext = synchronizationContext ?? SynchronizationContext.Current;
_marshallingTask = Task.Run((Action) MainMarshallingLoop);
}
private void MainMarshallingLoop()
{
var cancellationToken = _cancellationTokenSource.Token;
try
{
while (true)
{
var chunk = _blockingCollection.Take(cancellationToken);
_synchronizationContext.Post(RaiseEventFor, chunk);
}
}
catch (OperationCanceledException)
{
// Teardown requested
}
}
private void RaiseEventFor(object chunk)
{
base.RaiseEventFor((DataChunkBase)chunk);
}
public void Dispose()
{
if (_disposed)
return;
_cancellationTokenSource.Cancel();
if (_marshallingTask != null && !_marshallingTask.Wait(500))
throw new TimeoutException("Failed to shut down message recption task within timeout.");
_marshallingTask = null;
_disposed = true;
}
}
}