-
Notifications
You must be signed in to change notification settings - Fork 2
Advanced Features
This page covers advanced features and patterns in
- Cooperative Yielding
- Subsequent Request Chaining
- Custom Request Handlers
- Dynamic Parallelism Control
- SynchronizationContext Marshaling
- IValueTaskSource Integration
- Custom State Machines
- Advanced Container Patterns
Cooperative yielding allows requests to pause gracefully at designated points without forceful thread interruption. This prevents data corruption and resource leaks.
public static YieldAwaitable Yield() => new(_current.Value);Uses AsyncLocal<IRequest> to track the current request context automatically.
public class LongRunningRequest : Request<RequestOptions<VoidStruct, VoidStruct>, VoidStruct, VoidStruct>
{
private int _checkpointIndex = 0;
protected override async Task<RequestReturn> RunRequestAsync()
{
for (int i = _checkpointIndex; i < 10000; i++)
{
// Cooperative yield - checks for pause/cancel
await Request.Yield();
ProcessItem(i);
// Save progress every 100 items
if (i % 100 == 0)
{
_checkpointIndex = i;
SaveCheckpoint(i);
}
}
return new RequestReturn { Successful = true };
}
}✅ DO:
- Place
await Request.Yield()in long-running loops - Yield before expensive operations
- Save checkpoints after yields
- Use for responsive cancellation
❌ DON'T:
- Yield too frequently (performance overhead is smal but there)
- Yield inside critical sections or locks
Chain requests to execute sequentially without re-queuing through the handler. This provides immediate execution upon parent completion.
var processRequest = new OwnRequest(async token =>
{
await ProcessDataAsync(token);
return true;
});
var extractRequest = new OwnRequest(async token =>
{
await ExtractZipAsync("data.zip", token);
return true;
}, new() { SubsequentRequest = processRequest });
var downloadRequest = new OwnRequest(async token =>
{
await DownloadFileAsync("data.zip", token);
return true;
}, new() { SubsequentRequest = extractRequest });
// Start only the first, others follow automatically
downloadRequest.Start();public class ConditionalRequest : Request<RequestOptions<VoidStruct, VoidStruct>, VoidStruct, VoidStruct>
{
private readonly IRequest _onSuccessRequest;
private readonly IRequest _onFailureRequest;
public ConditionalRequest(IRequest onSuccess, IRequest onFailure)
{
_onSuccessRequest = onSuccess;
_onFailureRequest = onFailure;
AutoStart();
}
protected override async Task<RequestReturn> RunRequestAsync()
{
bool success = await TryOperationAsync(Token);
// Dynamically set subsequent request based on result
Options.SubsequentRequest = success ? _onSuccessRequest : _onFailureRequest;
return new RequestReturn { Successful = success };
}
}-
$\text{\color{orange}Pipeline Processing}$ : Download → Extract → Process → Upload -
$\text{\color{blue}Dependent Operations}$ : Authenticate → Fetch Data → Parse → Store -
$\text{\color{green}Conditional Workflows}$ : Validate → (Success: Process | Failure: Retry)
Both ParallelRequestHandler and SequentialRequestHandler implement IRequestHandler. You can create custom handlers for specialized execution patterns.
public class RateLimitedHandler : ParallelRequestHandler
{
private readonly SemaphoreSlim _rateLimiter;
private readonly Timer _resetTimer;
public RateLimitedHandler(int requestsPerSecond)
{
_rateLimiter = new SemaphoreSlim(requestsPerSecond, requestsPerSecond);
_resetTimer = new Timer(ResetRateLimit, null, TimeSpan.Zero, TimeSpan.FromSeconds(1));
}
protected override async Task HandleRequestAsync(IRequest request)
{
await _rateLimiter.WaitAsync(CancellationToken);
try
{
await base.HandleRequestAsync(request);
}
finally
{
// Don't release here - let timer reset the limit
}
}
private void ResetRateLimit(object? state)
{
// Reset available permits
while (_rateLimiter.CurrentCount < _rateLimiter.CurrentCount)
_rateLimiter.Release();
}
}public class WeightedPriorityHandler : SequentialRequestHandler
{
private readonly Dictionary<RequestPriority, int> _weights = new()
{
{ RequestPriority.High, 10 }, // 10x more likely to execute
{ RequestPriority.Normal, 3 },
{ RequestPriority.Low, 1 }
};
private int _executionCounter = 0;
protected override IRequest? SelectNextRequest()
{
_executionCounter++;
// Every Nth request, force high priority
if (_executionCounter % 10 == 0)
return GetHighestPriorityRequest();
// Otherwise use weighted random selection
return GetWeightedRandomRequest();
}
}The AutoParallelism function calculates parallelism dynamically:
var handler = new ParallelRequestHandler
{
AutoParallelism = () =>
{
// Custom logic based on system state
int cpuUsage = GetCpuUsage();
int memoryAvailable = GetAvailableMemoryMB();
if (cpuUsage > 80 || memoryAvailable < 500)
return 2; // Throttle down
else if (cpuUsage < 30 && memoryAvailable > 2000)
return Environment.ProcessorCount * 2; // Scale up
else
return Environment.ProcessorCount; // Normal
},
MaxParallelism = 50 // Hard cap
};var gpuHandler = new ParallelRequestHandler
{
AutoParallelism = () =>
{
// Query GPU memory usage
float gpuMemUsagePercent = GetGpuMemoryUsage();
if (gpuMemUsagePercent > 90)
return 1; // One at a time
else if (gpuMemUsagePercent > 70)
return 2;
else
return 4; // Max 4 concurrent GPU operations
},
MaxParallelism = 4
};Preserve the original execution context for callbacks, ensuring they run on the correct thread (e.g., UI thread in WPF/WinForms).
// In Request constructor:
protected SynchronizationContext? SynchronizationContext { get; }
public Request(TOptions? requestOptions = null)
{
// Capture current context or use handler's default
SynchronizationContext = SynchronizationContext.Current
?? Options.Handler?.DefaultSynchronizationContext;
}// Static callbacks for efficiency
private static readonly SendOrPostCallback s_stateChangedCallback = static state =>
{
var (request, newState) = (Tuple<Request, RequestState>)state!;
request.StateChanged?.Invoke(request, newState);
};
private static readonly SendOrPostCallback s_requestCompletedCallback = static state =>
{
var (request, result) = (Tuple<Request, TCompleted?>)state!;
request.Options.RequestCompleted?.Invoke(request, result);
};
// Usage in state transition:
if (StateChanged != null)
{
if (SynchronizationContext != null)
SynchronizationContext.Post(s_stateChangedCallback, (this, newState));
else
StateChanged.Invoke(this, newState); // Direct invoke
}public class MyCustomHandler : ParallelRequestHandler
{
public MyCustomHandler()
{
// Override default context
DefaultSynchronizationContext = new MyCustomSynchronizationContext();
}
}
public class MyCustomSynchronizationContext : SynchronizationContext
{
public override void Post(SendOrPostCallback d, object? state)
{
// Custom dispatching logic
MyDispatcher.BeginInvoke(() => d(state));
}
}public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
StartRequests();
}
private void StartRequests()
{
// Capture UI context
var uiContext = SynchronizationContext.Current; // WPF DispatcherSynchronizationContext
var request = new OwnRequest(async token =>
{
// Runs on thread pool
await Task.Delay(2000, token);
return true;
}, new()
{
// Callback runs on UI thread thanks to captured context
RequestCompleted = (req, result) =>
{
StatusText.Text = "Completed!"; // Safe UI access
}
});
}
}Request<> implements IValueTaskSource for efficient awaiting without heap allocations.
public ValueTask GetRunningAwaiter() => new(this, _runningSourceVersion);Allows waiting until the request stops running (paused or completed).
public async Task PauseAndWaitAsync(IRequest request)
{
request.Pause();
// Wait for request to actually pause (complete current execution)
await ((Request<RequestOptions<VoidStruct, VoidStruct>, VoidStruct, VoidStruct>)request)
.GetRunningAwaiter();
Console.WriteLine("Request has paused execution");
}public class RequestStateMachine
{
private int _stateInt;
public bool TrySetState(RequestState from, RequestState to)
{
int fromInt = (int)from;
int toInt = (int)to;
// Atomic compare-and-swap
int oldInt = Interlocked.CompareExchange(ref _stateInt, toInt, fromInt);
if (oldInt == fromInt)
{
OnStateChanged?.Invoke(from, to);
return true;
}
return false; // Transition failed
}
}public class RetryableRequest : Request<RequestOptions<VoidStruct, VoidStruct>, VoidStruct, VoidStruct>
{
private readonly RequestStateMachine _customStateMachine = new();
protected override async Task<RequestReturn> RunRequestAsync()
{
bool success = await TryOperationAsync();
if (!success && AttemptCounter < 3)
{
// Custom retry logic with custom state
_customStateMachine.TrySetState(RequestState.Running, RequestState.Idle);
await Task.Delay(1000);
return await RunRequestAsync(); // Recursive retry
}
return new RequestReturn { Successful = success };
}
}// Create sub-containers for logical grouping
var downloadContainer = new RequestContainer<OwnRequest>(
new OwnRequest(async token => await Download1(token)),
new OwnRequest(async token => await Download2(token))
);
var processContainer = new RequestContainer<OwnRequest>(
new OwnRequest(async token => await Process1(token)),
new OwnRequest(async token => await Process2(token))
);
// Create parent container
var mainContainer = new RequestContainer<IRequestContainer<IRequest>>(
downloadContainer,
processContainer
);
mainContainer.Start(); // Starts all nested requestsvar container1 = new RequestContainer<OwnRequest>(request1, request2);
var container2 = new RequestContainer<OwnRequest>(request3, request4);
// Merge into new container (flattens requests)
var merged = RequestContainer<OwnRequest>.MergeContainers(container1, container2);
// merged contains: request1, request2, request3, request4var container = new ProgressableContainer<OwnRequest>();
// Start container
container.Start();
// Dynamically add requests while running
for (int i = 0; i < 10; i++)
{
await Task.Delay(1000);
var newRequest = new OwnRequest(async token =>
{
await ProcessDataAsync(i, token);
return true;
});
container.Add(newRequest); // Automatically starts and tracked
}
// Wait for all (including dynamically added)
await container.Task;These advanced features enable sophisticated async patterns:
-
$\text{\color{lightblue}Cooperative Yielding}$ : Graceful pause/resume with checkpoints -
$\text{\color{green}Request Chaining}$ : Pipeline operations without handler overhead -
$\text{\color{purple}Custom Handlers}$ : Rate limiting, weighted priorities, custom execution -
$\text{\color{red}Dynamic Parallelism}$ : Real-time concurrency adjustment -
$\text{\color{blue}Context Marshaling}$ : UI-safe callbacks -
$\text{\color{orange}IValueTaskSource}$ : Zero-allocation waiting -
$\text{\color{purple}State Machines}$ : Lock-free atomic transitions -
$\text{\color{red}Advanced Containers}$ : Nesting, merging, dynamic addition