diff --git a/backend.Tests/Clients/Usenet/DuplicateSegmentFallbackTests.cs b/backend.Tests/Clients/Usenet/DuplicateSegmentFallbackTests.cs new file mode 100644 index 00000000..83e3aa51 --- /dev/null +++ b/backend.Tests/Clients/Usenet/DuplicateSegmentFallbackTests.cs @@ -0,0 +1,61 @@ +using NzbWebDAV.Exceptions; +using NzbWebDAV.Models.Nzb; +using NzbWebDAV.Tests.TestDoubles; + +namespace NzbWebDAV.Tests.Clients.Usenet; + +public class DuplicateSegmentFallbackTests +{ + [Fact] + public async Task CheckAllSegmentsAsyncAcceptsAnyAvailableDuplicateCandidate() + { + using var client = new FakeNntpClient() + .AddSegment("segment-1b", [1, 2, 3]) + .AddSegment("segment-2", [4, 5, 6]); + + await client.CheckAllSegmentsAsync( + [NzbSegmentIdSet.Encode(["segment-1a", "segment-1b"]), "segment-2"], + concurrency: 2, + progress: null, + CancellationToken.None + ); + + Assert.Equal(3, client.StatCallCount); + } + + [Fact] + public async Task GetFileStreamFallsBackToAlternateDuplicateSegment() + { + using var client = new FakeNntpClient() + .AddSegment("segment-1b", [1, 2, 3], partOffset: 0) + .AddSegment("segment-2", [4, 5], partOffset: 3); + var nzbFile = new NzbFile + { + Subject = "example.mkv" + }; + nzbFile.Segments.Add(new NzbSegment { Number = 1, Bytes = 3, MessageId = "segment-1a" }); + nzbFile.Segments.Add(new NzbSegment { Number = 1, Bytes = 3, MessageId = "segment-1b" }); + nzbFile.Segments.Add(new NzbSegment { Number = 2, Bytes = 2, MessageId = "segment-2" }); + + await using var stream = await client.GetFileStream(nzbFile, articleBufferSize: 0, CancellationToken.None); + var bytes = new byte[5]; + await stream.ReadExactlyAsync(bytes); + + Assert.Equal(new byte[] { 1, 2, 3, 4, 5 }, bytes); + } + + [Fact] + public async Task CheckAllSegmentsAsyncThrowsWhenAllDuplicateCandidatesAreMissing() + { + using var client = new FakeNntpClient(); + + var exception = await Assert.ThrowsAsync(() => client.CheckAllSegmentsAsync( + [NzbSegmentIdSet.Encode(["segment-1a", "segment-1b"])], + concurrency: 1, + progress: null, + CancellationToken.None + )); + + Assert.Equal("segment-1b", exception.SegmentId); + } +} diff --git a/backend.Tests/GlobalUsings.cs b/backend.Tests/GlobalUsings.cs new file mode 100644 index 00000000..c802f448 --- /dev/null +++ b/backend.Tests/GlobalUsings.cs @@ -0,0 +1 @@ +global using Xunit; diff --git a/backend.Tests/Models/Nzb/NzbDocumentTests.cs b/backend.Tests/Models/Nzb/NzbDocumentTests.cs new file mode 100644 index 00000000..70cc52e8 --- /dev/null +++ b/backend.Tests/Models/Nzb/NzbDocumentTests.cs @@ -0,0 +1,35 @@ +using System.Text; +using NzbWebDAV.Models.Nzb; + +namespace NzbWebDAV.Tests.Models.Nzb; + +public class NzbDocumentTests +{ + [Fact] + public async Task DuplicateSegmentNumbersCollapseIntoOneLogicalSegmentWithAlternates() + { + const string xml = """ + + + + + segment-a + segment-b + segment-c + + + + """; + + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(xml)); + var document = await NzbDocument.LoadAsync(stream); + var file = Assert.Single(document.Files); + + Assert.Equal(2, file.GetLogicalSegmentCount()); + Assert.Equal(30, file.GetTotalYencodedSize()); + + var segmentIds = file.GetSegmentIds(); + Assert.Equal(["segment-a", "segment-b"], NzbSegmentIdSet.Decode(segmentIds[0])); + Assert.Equal(["segment-c"], NzbSegmentIdSet.Decode(segmentIds[1])); + } +} diff --git a/backend.Tests/TestDoubles/FakeNntpClient.cs b/backend.Tests/TestDoubles/FakeNntpClient.cs new file mode 100644 index 00000000..1d113913 --- /dev/null +++ b/backend.Tests/TestDoubles/FakeNntpClient.cs @@ -0,0 +1,240 @@ +using System.Collections.Concurrent; +using System.Reflection; +using NzbWebDAV.Clients.Usenet; +using NzbWebDAV.Clients.Usenet.Models; +using NzbWebDAV.Exceptions; +using NzbWebDAV.Streams; +using UsenetSharp.Models; + +namespace NzbWebDAV.Tests.TestDoubles; + +public sealed class FakeNntpClient : NntpClient +{ + private sealed class TrackingMemoryStream(byte[] buffer, Action onDispose) : MemoryStream(buffer, writable: false) + { + private readonly Action _onDispose = onDispose; + private bool _disposed; + + protected override void Dispose(bool disposing) + { + if (!_disposed && disposing) + { + _onDispose(); + _disposed = true; + } + + base.Dispose(disposing); + } + + public override ValueTask DisposeAsync() + { + if (!_disposed) + { + _onDispose(); + _disposed = true; + } + + return base.DisposeAsync(); + } + } + + private sealed record SegmentData( + byte[] Bytes, + long PartOffset, + UsenetYencHeader YencHeader, + UsenetArticleHeader ArticleHeaders + ); + + private readonly ConcurrentDictionary _segments = new(StringComparer.Ordinal); + + private int _getYencHeadersCallCount; + private int _decodedBodyCallCount; + private int _decodedArticleCallCount; + private int _headCallCount; + private int _statCallCount; + + public int GetYencHeadersCallCount => Volatile.Read(ref _getYencHeadersCallCount); + public int DecodedBodyCallCount => Volatile.Read(ref _decodedBodyCallCount); + public int DecodedArticleCallCount => Volatile.Read(ref _decodedArticleCallCount); + public int HeadCallCount => Volatile.Read(ref _headCallCount); + public int StatCallCount => Volatile.Read(ref _statCallCount); + + public FakeNntpClient AddSegment(string segmentId, byte[] bytes, long partOffset = 0) + { + var header = CreateYencHeader(partOffset, bytes.Length); + var articleHeaders = CreateArticleHeader(segmentId); + _segments[segmentId] = new SegmentData(bytes, partOffset, header, articleHeaders); + return this; + } + + public override Task ConnectAsync(string host, int port, bool useSsl, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public override Task AuthenticateAsync(string user, string pass, CancellationToken cancellationToken) + { + return Task.FromException(new NotSupportedException()); + } + + public override Task StatAsync(SegmentId segmentId, CancellationToken cancellationToken) + { + Interlocked.Increment(ref _statCallCount); + _ = GetSegment(segmentId); + return Task.FromResult(new UsenetStatResponse + { + ArticleExists = true, + ResponseCode = (int)UsenetResponseType.ArticleExists, + ResponseMessage = "223 - Article exists" + }); + } + + public override Task HeadAsync(SegmentId segmentId, CancellationToken cancellationToken) + { + Interlocked.Increment(ref _headCallCount); + var data = GetSegment(segmentId); + return Task.FromResult(new UsenetHeadResponse + { + SegmentId = segmentId, + ResponseCode = (int)UsenetResponseType.ArticleRetrievedHeadFollows, + ResponseMessage = "221 - Head retrieved", + ArticleHeaders = data.ArticleHeaders + }); + } + + public override Task DecodedBodyAsync(SegmentId segmentId, CancellationToken cancellationToken) + { + return DecodedBodyAsync(segmentId, onConnectionReadyAgain: null, cancellationToken); + } + + public override Task DecodedBodyAsync( + SegmentId segmentId, + Action? onConnectionReadyAgain, + CancellationToken cancellationToken + ) + { + Interlocked.Increment(ref _decodedBodyCallCount); + onConnectionReadyAgain?.Invoke(ArticleBodyResult.Retrieved); + return Task.FromResult(CreateBodyResponse(segmentId)); + } + + public override Task DecodedArticleAsync( + SegmentId segmentId, + CancellationToken cancellationToken + ) + { + return DecodedArticleAsync(segmentId, onConnectionReadyAgain: null, cancellationToken); + } + + public override Task DecodedArticleAsync( + SegmentId segmentId, + Action? onConnectionReadyAgain, + CancellationToken cancellationToken + ) + { + Interlocked.Increment(ref _decodedArticleCallCount); + var data = GetSegment(segmentId); + onConnectionReadyAgain?.Invoke(ArticleBodyResult.Retrieved); + return Task.FromResult(new UsenetDecodedArticleResponse + { + SegmentId = segmentId, + ResponseCode = (int)UsenetResponseType.ArticleRetrievedHeadAndBodyFollow, + ResponseMessage = "220 - Article retrieved", + ArticleHeaders = data.ArticleHeaders, + Stream = CreateBodyStream(segmentId, data) + }); + } + + public override Task DateAsync(CancellationToken cancellationToken) + { + return Task.FromException(new NotSupportedException()); + } + + public override Task AcquireExclusiveConnectionAsync( + string segmentId, + CancellationToken cancellationToken + ) + { + return Task.FromResult(new UsenetExclusiveConnection(onConnectionReadyAgain: null)); + } + + public override Task DecodedBodyAsync( + SegmentId segmentId, + UsenetExclusiveConnection exclusiveConnection, + CancellationToken cancellationToken + ) + { + return DecodedBodyAsync(segmentId, exclusiveConnection.OnConnectionReadyAgain, cancellationToken); + } + + public override Task DecodedArticleAsync( + SegmentId segmentId, + UsenetExclusiveConnection exclusiveConnection, + CancellationToken cancellationToken + ) + { + return DecodedArticleAsync(segmentId, exclusiveConnection.OnConnectionReadyAgain, cancellationToken); + } + + public override Task GetYencHeadersAsync(string segmentId, CancellationToken ct) + { + Interlocked.Increment(ref _getYencHeadersCallCount); + return Task.FromResult(GetSegment(segmentId).YencHeader); + } + + public override void Dispose() + { + GC.SuppressFinalize(this); + } + + private UsenetDecodedBodyResponse CreateBodyResponse(string segmentId) + { + var data = GetSegment(segmentId); + return new UsenetDecodedBodyResponse + { + SegmentId = segmentId, + ResponseCode = (int)UsenetResponseType.ArticleRetrievedBodyFollows, + ResponseMessage = "222 - Body retrieved", + Stream = CreateBodyStream(segmentId, data) + }; + } + + private CachedYencStream CreateBodyStream(string segmentId, SegmentData data) + { + var stream = new TrackingMemoryStream(data.Bytes, () => { }); + return new CachedYencStream(data.YencHeader, stream); + } + + private SegmentData GetSegment(string segmentId) + { + if (_segments.TryGetValue(segmentId, out var data)) + return data; + + throw new UsenetArticleNotFoundException(segmentId); + } + + private static UsenetYencHeader CreateYencHeader(long partOffset, long partSize) + { + return new UsenetYencHeader + { + FileName = "segment.bin", + FileSize = partOffset + partSize, + LineLength = 128, + PartNumber = 1, + TotalParts = 1, + PartSize = partSize, + PartOffset = partOffset + }; + } + + private static UsenetArticleHeader CreateArticleHeader(string segmentId) + { + return new UsenetArticleHeader + { + Headers = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + ["Subject"] = segmentId + } + }; + } +} diff --git a/backend.Tests/backend.Tests.csproj b/backend.Tests/backend.Tests.csproj new file mode 100644 index 00000000..4f085102 --- /dev/null +++ b/backend.Tests/backend.Tests.csproj @@ -0,0 +1,27 @@ + + + + net10.0 + enable + enable + false + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + diff --git a/backend/Clients/Usenet/NntpClient.cs b/backend/Clients/Usenet/NntpClient.cs index 356ea284..a133cf91 100644 --- a/backend/Clients/Usenet/NntpClient.cs +++ b/backend/Clients/Usenet/NntpClient.cs @@ -75,7 +75,7 @@ CancellationToken cancellationToken public virtual async Task GetYencHeadersAsync(string segmentId, CancellationToken ct) { - var decodedBodyResponse = await DecodedBodyAsync(segmentId, ct).ConfigureAwait(false); + var decodedBodyResponse = await this.DecodedBodyWithFallbackAsync(segmentId, ct).ConfigureAwait(false); await using var stream = decodedBodyResponse.Stream; var headers = await stream.GetYencHeadersAsync(ct).ConfigureAwait(false); return headers!; @@ -83,8 +83,9 @@ public virtual async Task GetYencHeadersAsync(string segmentId public virtual async Task GetFileSizeAsync(NzbFile file, CancellationToken ct) { - if (file.Segments.Count == 0) return 0; - var headers = await GetYencHeadersAsync(file.Segments[^1].MessageId, ct).ConfigureAwait(false); + var segmentIds = file.GetSegmentIds(); + if (segmentIds.Length == 0) return 0; + var headers = await GetYencHeadersAsync(segmentIds[^1], ct).ConfigureAwait(false); return headers!.PartOffset + headers!.PartSize; } @@ -119,7 +120,11 @@ CancellationToken cancellationToken var tasks = segmentIds .Select(async segmentId => ( SegmentId: segmentId, - Result: await StatAsync(segmentId, token).ConfigureAwait(false) + Result: await NntpClientSegmentFallbackExtensions.WithFallbackAsync( + segmentId, + (candidateSegmentId, ct) => StatAsync(candidateSegmentId, ct), + token + ).ConfigureAwait(false) )) .WithConcurrencyAsync(concurrency); @@ -132,4 +137,4 @@ CancellationToken cancellationToken throw new UsenetArticleNotFoundException(task.SegmentId); } } -} \ No newline at end of file +} diff --git a/backend/Clients/Usenet/NntpClientSegmentFallbackExtensions.cs b/backend/Clients/Usenet/NntpClientSegmentFallbackExtensions.cs new file mode 100644 index 00000000..35366002 --- /dev/null +++ b/backend/Clients/Usenet/NntpClientSegmentFallbackExtensions.cs @@ -0,0 +1,97 @@ +using NzbWebDAV.Clients.Usenet.Models; +using NzbWebDAV.Exceptions; +using NzbWebDAV.Models.Nzb; +using UsenetSharp.Models; + +namespace NzbWebDAV.Clients.Usenet; + +public static class NntpClientSegmentFallbackExtensions +{ + public static Task HeadWithFallbackAsync( + this INntpClient usenetClient, + string encodedSegmentId, + CancellationToken cancellationToken + ) + { + return WithFallbackAsync( + encodedSegmentId, + (segmentId, ct) => usenetClient.HeadAsync(segmentId, ct), + cancellationToken + ); + } + + public static Task DecodedArticleWithFallbackAsync( + this INntpClient usenetClient, + string encodedSegmentId, + CancellationToken cancellationToken + ) + { + return WithFallbackAsync( + encodedSegmentId, + (segmentId, ct) => usenetClient.DecodedArticleAsync(segmentId, ct), + cancellationToken + ); + } + + public static Task DecodedBodyWithFallbackAsync( + this INntpClient usenetClient, + string encodedSegmentId, + CancellationToken cancellationToken + ) + { + return WithFallbackAsync( + encodedSegmentId, + (segmentId, ct) => usenetClient.DecodedBodyAsync(segmentId, ct), + cancellationToken + ); + } + + public static async Task DecodedBodyWithFallbackAsync( + this INntpClient usenetClient, + string encodedSegmentId, + CancellationToken cancellationToken, + Func> acquireExclusiveConnectionAsync + ) + { + UsenetArticleNotFoundException? missingArticleException = null; + foreach (var segmentId in NzbSegmentIdSet.Decode(encodedSegmentId)) + { + try + { + var exclusiveConnection = await acquireExclusiveConnectionAsync(segmentId, cancellationToken) + .ConfigureAwait(false); + return await usenetClient + .DecodedBodyAsync(segmentId, exclusiveConnection, cancellationToken) + .ConfigureAwait(false); + } + catch (UsenetArticleNotFoundException e) + { + missingArticleException = e; + } + } + + throw missingArticleException ?? new UsenetArticleNotFoundException(encodedSegmentId); + } + + public static async Task WithFallbackAsync( + string encodedSegmentId, + Func> action, + CancellationToken cancellationToken + ) + { + UsenetArticleNotFoundException? missingArticleException = null; + foreach (var segmentId in NzbSegmentIdSet.Decode(encodedSegmentId)) + { + try + { + return await action(segmentId, cancellationToken).ConfigureAwait(false); + } + catch (UsenetArticleNotFoundException e) + { + missingArticleException = e; + } + } + + throw missingArticleException ?? new UsenetArticleNotFoundException(encodedSegmentId); + } +} diff --git a/backend/Models/Nzb/NzbDocument.cs b/backend/Models/Nzb/NzbDocument.cs index 134efe5c..728ddc52 100644 --- a/backend/Models/Nzb/NzbDocument.cs +++ b/backend/Models/Nzb/NzbDocument.cs @@ -100,8 +100,10 @@ private static async Task ReadSegmentsAsync(XmlReader reader, NzbFile file) if (reader is { NodeType: XmlNodeType.Element, Name: "segment" }) { var bytesAttr = reader.GetAttribute("bytes"); + var numberAttr = reader.GetAttribute("number"); var segment = new NzbSegment { + Number = int.TryParse(numberAttr, out var number) ? number : 0, Bytes = long.TryParse(bytesAttr, out var bytes) ? bytes : 0, MessageId = await reader.ReadElementContentAsStringAsync().ConfigureAwait(false) }; @@ -116,4 +118,4 @@ private static async Task ReadSegmentsAsync(XmlReader reader, NzbFile file) break; } } -} \ No newline at end of file +} diff --git a/backend/Models/Nzb/NzbFile.cs b/backend/Models/Nzb/NzbFile.cs index fbaf8ecf..5fd60679 100644 --- a/backend/Models/Nzb/NzbFile.cs +++ b/backend/Models/Nzb/NzbFile.cs @@ -9,18 +9,23 @@ public class NzbFile public string[] GetSegmentIds() { - return Segments - .Select(x => x.MessageId) + return GetLogicalSegments() + .Select(x => NzbSegmentIdSet.Encode(x.Select(segment => segment.MessageId).ToArray())) .ToArray(); } public long GetTotalYencodedSize() { - return Segments - .Select(x => x.Bytes) + return GetLogicalSegments() + .Select(x => x[0].Bytes) .Sum(); } + public int GetLogicalSegmentCount() + { + return GetLogicalSegments().Count; + } + public string GetSubjectFileName() { return GetFirstValidNonEmptyFilename( @@ -52,4 +57,23 @@ private static string GetFirstValidNonEmptyFilename(params Func[] funcs) .Where(x => x == Path.GetFileName(x)) .FirstOrDefault(x => x != "") ?? ""; } -} \ No newline at end of file + + private List> GetLogicalSegments() + { + if (Segments.Count == 0) + return []; + + if (Segments.All(segment => segment.Number > 0)) + { + return Segments + .GroupBy(segment => segment.Number) + .OrderBy(group => group.Key) + .Select(group => group.ToList()) + .ToList(); + } + + return Segments + .Select(segment => new List { segment }) + .ToList(); + } +} diff --git a/backend/Models/Nzb/NzbSegment.cs b/backend/Models/Nzb/NzbSegment.cs index 58af130c..51bab835 100644 --- a/backend/Models/Nzb/NzbSegment.cs +++ b/backend/Models/Nzb/NzbSegment.cs @@ -2,6 +2,7 @@ namespace NzbWebDAV.Models.Nzb; public class NzbSegment { + public required int Number { get; init; } public required long Bytes { get; init; } public required string MessageId { get; init; } } diff --git a/backend/Models/Nzb/NzbSegmentIdSet.cs b/backend/Models/Nzb/NzbSegmentIdSet.cs new file mode 100644 index 00000000..fdfb6857 --- /dev/null +++ b/backend/Models/Nzb/NzbSegmentIdSet.cs @@ -0,0 +1,39 @@ +using System.Text.Json; + +namespace NzbWebDAV.Models.Nzb; + +public static class NzbSegmentIdSet +{ + public static string Encode(IReadOnlyList segmentIds) + { + ArgumentNullException.ThrowIfNull(segmentIds); + if (segmentIds.Count == 0) + throw new ArgumentException("At least one segment id is required.", nameof(segmentIds)); + + return segmentIds.Count == 1 + ? segmentIds[0] + : JsonSerializer.Serialize(segmentIds, (JsonSerializerOptions?)null); + } + + public static string[] Decode(string encodedSegmentId) + { + if (string.IsNullOrWhiteSpace(encodedSegmentId)) + return []; + + if (encodedSegmentId[0] != '[') + return [encodedSegmentId]; + + try + { + var decoded = JsonSerializer.Deserialize(encodedSegmentId, (JsonSerializerOptions?)null); + return decoded? + .Where(segmentId => !string.IsNullOrWhiteSpace(segmentId)) + .ToArray() + ?? [encodedSegmentId]; + } + catch (JsonException) + { + return [encodedSegmentId]; + } + } +} diff --git a/backend/Queue/DeobfuscationSteps/1.FetchFirstSegment/FetchFirstSegmentsStep.cs b/backend/Queue/DeobfuscationSteps/1.FetchFirstSegment/FetchFirstSegmentsStep.cs index 8ba571e4..7ea1a62f 100644 --- a/backend/Queue/DeobfuscationSteps/1.FetchFirstSegment/FetchFirstSegmentsStep.cs +++ b/backend/Queue/DeobfuscationSteps/1.FetchFirstSegment/FetchFirstSegmentsStep.cs @@ -37,8 +37,10 @@ CancellationToken cancellationToken try { // get the first article stream - var firstSegment = nzbFile.Segments[0].MessageId; - var article = await usenetClient.DecodedArticleAsync(firstSegment, cancellationToken).ConfigureAwait(false); + var firstSegment = nzbFile.GetSegmentIds()[0]; + var article = await usenetClient + .DecodedArticleWithFallbackAsync(firstSegment, cancellationToken) + .ConfigureAwait(false); await using var bodyStream = article.Stream!; // read up to the first 16KB from the stream @@ -105,4 +107,4 @@ private bool HasMagic(byte[] sequence) First16KB.AsSpan(0, sequence.Length).SequenceEqual(sequence); } } -} \ No newline at end of file +} diff --git a/backend/Queue/DeobfuscationSteps/2.GetPar2FileDescriptors/GetPar2FileDescriptorsStep.cs b/backend/Queue/DeobfuscationSteps/2.GetPar2FileDescriptors/GetPar2FileDescriptorsStep.cs index 4653ea3a..04fec5bc 100644 --- a/backend/Queue/DeobfuscationSteps/2.GetPar2FileDescriptors/GetPar2FileDescriptorsStep.cs +++ b/backend/Queue/DeobfuscationSteps/2.GetPar2FileDescriptors/GetPar2FileDescriptorsStep.cs @@ -19,13 +19,13 @@ public static async Task> GetPar2FileDescriptors var par2Index = files .Where(x => !x.MissingFirstSegment) .Where(x => Par2.HasPar2MagicBytes(x.First16KB!)) - .MinBy(x => x.NzbFile.Segments.Count); + .MinBy(x => x.NzbFile.GetLogicalSegmentCount()); if (par2Index is null) return []; // return all file descriptors var fileDescriptors = new List(); var segments = par2Index.NzbFile.GetSegmentIds(); - var filesize = par2Index.NzbFile.Segments.Count == 1 + var filesize = par2Index.NzbFile.GetLogicalSegmentCount() == 1 ? par2Index.Header!.PartOffset + par2Index.Header!.PartSize : await usenetClient.GetFileSizeAsync(par2Index.NzbFile, cancellationToken).ConfigureAwait(false); await using var stream = usenetClient.GetFileStream(segments, filesize, articleBufferSize: 0); @@ -33,4 +33,4 @@ public static async Task> GetPar2FileDescriptors fileDescriptors.Add(fileDescriptor); return fileDescriptors; } -} \ No newline at end of file +} diff --git a/backend/Queue/QueueItemProcessor.cs b/backend/Queue/QueueItemProcessor.cs index d53bf268..983818ae 100644 --- a/backend/Queue/QueueItemProcessor.cs +++ b/backend/Queue/QueueItemProcessor.cs @@ -117,7 +117,7 @@ await MarkQueueItemCompleted(startTime, error, () => Task.FromResult(existingMou // step 0 -- perform article existence pre-check against cache // https://github.com/nzbdav-dev/nzbdav/issues/101 - var articlesToPrecheck = nzbFiles.SelectMany(x => x.Segments).Select(x => x.MessageId); + var articlesToPrecheck = nzbFiles.SelectMany(x => x.GetSegmentIds()); HealthCheckService.CheckCachedMissingSegmentIds(articlesToPrecheck); // step 1 -- get name and size of each nzb file @@ -375,4 +375,4 @@ private async Task RefreshMonitoredDownloads(ArrClient arrClient) Log.Debug($"Could not refresh monitored downloads for Arr instance: `{arrClient.Host}`. {e.Message}"); } } -} \ No newline at end of file +} diff --git a/backend/Services/HealthCheckService.cs b/backend/Services/HealthCheckService.cs index fc640e52..2528e92a 100644 --- a/backend/Services/HealthCheckService.cs +++ b/backend/Services/HealthCheckService.cs @@ -6,6 +6,7 @@ using NzbWebDAV.Database.Models; using NzbWebDAV.Exceptions; using NzbWebDAV.Extensions; +using NzbWebDAV.Models.Nzb; using NzbWebDAV.Queue.PostProcessors; using NzbWebDAV.Utils; using NzbWebDAV.Websocket; @@ -153,7 +154,8 @@ CancellationToken ct _ = _websocketManager.SendMessage(WebsocketTopic.HealthItemProgress, $"{davItem.Id}|done"); if (FilenameUtil.IsImportantFileType(davItem.Name)) lock (_missingSegmentIds) - _missingSegmentIds.Add(e.SegmentId); + foreach (var segmentId in NzbSegmentIdSet.Decode(e.SegmentId)) + _missingSegmentIds.Add(segmentId); // when usenet article is missing, perform repairs await Repair(davItem, dbClient, ct).ConfigureAwait(false); @@ -164,7 +166,7 @@ private async Task UpdateReleaseDate(DavItem davItem, List segments, Can { var firstSegmentId = StringUtil.EmptyToNull(segments.FirstOrDefault()); if (firstSegmentId == null) return; - var articleHeadersResponse = await _usenetClient.HeadAsync(firstSegmentId, ct).ConfigureAwait(false); + var articleHeadersResponse = await _usenetClient.HeadWithFallbackAsync(firstSegmentId, ct).ConfigureAwait(false); var articleHeaders = articleHeadersResponse.ArticleHeaders!; davItem.ReleaseDate = articleHeaders.Date; } @@ -343,8 +345,8 @@ public static void CheckCachedMissingSegmentIds(IEnumerable segmentIds) lock (_missingSegmentIds) { foreach (var segmentId in segmentIds) - if (_missingSegmentIds.Contains(segmentId)) + if (NzbSegmentIdSet.Decode(segmentId).All(candidateSegmentId => _missingSegmentIds.Contains(candidateSegmentId))) throw new UsenetArticleNotFoundException(segmentId); } } -} \ No newline at end of file +} diff --git a/backend/Streams/MultiSegmentStream.cs b/backend/Streams/MultiSegmentStream.cs index 1d7b56dc..b181f872 100644 --- a/backend/Streams/MultiSegmentStream.cs +++ b/backend/Streams/MultiSegmentStream.cs @@ -53,8 +53,7 @@ private async Task DownloadSegments(CancellationToken cancellationToken) var segmentId = _segmentIds.Span[i]; await _streamTasks.Writer.WaitToWriteAsync(cancellationToken); - var connection = await _usenetClient.AcquireExclusiveConnectionAsync(segmentId, cancellationToken); - var streamTask = DownloadSegment(segmentId, connection, cancellationToken); + var streamTask = DownloadSegment(segmentId, cancellationToken); if (_streamTasks.Writer.TryWrite(streamTask)) continue; // if we never get a chance to write the stream to the writer @@ -74,12 +73,15 @@ private async Task DownloadSegments(CancellationToken cancellationToken) private async Task DownloadSegment ( string segmentId, - UsenetExclusiveConnection exclusiveConnection, CancellationToken cancellationToken ) { var bodyResponse = await _usenetClient - .DecodedBodyAsync(segmentId, exclusiveConnection, cancellationToken) + .DecodedBodyWithFallbackAsync( + segmentId, + cancellationToken, + (candidateSegmentId, ct) => _usenetClient.AcquireExclusiveConnectionAsync(candidateSegmentId, ct) + ) .ConfigureAwait(false); return bodyResponse.Stream; } @@ -131,4 +133,4 @@ protected override void Dispose(bool disposing) base.Dispose(); } -} \ No newline at end of file +} diff --git a/backend/Streams/UnbufferedMultiSegmentStream.cs b/backend/Streams/UnbufferedMultiSegmentStream.cs index 6f2fbb43..9d715b5b 100644 --- a/backend/Streams/UnbufferedMultiSegmentStream.cs +++ b/backend/Streams/UnbufferedMultiSegmentStream.cs @@ -28,16 +28,18 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation if (_stream == null) { if (_currentIndex >= _segmentIds.Length) return 0; - var body = await _usenetClient.DecodedBodyAsync(_segmentIds.Span[_currentIndex++], cancellationToken); + var body = await _usenetClient + .DecodedBodyWithFallbackAsync(_segmentIds.Span[_currentIndex++], cancellationToken) + .ConfigureAwait(false); _stream = body.Stream; } // read from the stream - var read = await _stream.ReadAsync(buffer, cancellationToken); + var read = await _stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); if (read > 0) return read; // if the stream ended, continue to the next stream. - await _stream.DisposeAsync(); + await _stream.DisposeAsync().ConfigureAwait(false); _stream = null; } @@ -57,4 +59,4 @@ protected override void Dispose(bool disposing) _stream?.Dispose(); base.Dispose(); } -} \ No newline at end of file +}