diff --git a/AsyncRx.NET/Directory.build.props b/AsyncRx.NET/Directory.Build.props similarity index 100% rename from AsyncRx.NET/Directory.build.props rename to AsyncRx.NET/Directory.Build.props diff --git a/Ix.NET/Documentation/adr/0001-Ix-Ref-Assembly-Mismatches.md b/Ix.NET/Documentation/adr/0001-Ix-Ref-Assembly-Mismatches.md index a28c0329dc..69dd8ddc84 100644 --- a/Ix.NET/Documentation/adr/0001-Ix-Ref-Assembly-Mismatches.md +++ b/Ix.NET/Documentation/adr/0001-Ix-Ref-Assembly-Mismatches.md @@ -27,7 +27,7 @@ At the time of writing this, the current version of `System.Interactive` is 6.0. * `net6.0` * `netstandard2.1` - + The use of `net4.8` in `ref` seems to have been a bug: that should have been `net48`. (The main reason I am confident it's a bug, and not a clever but obscure trick that we've not understood, is that the [commit of 2021/12/06 that added this](https://github.com/dotnet/reactive/commit/a2410b2267abe193191f3894d243771ae4b126fd) used [`net48` in reference assemblies for one of the other packages](https://github.com/dotnet/reactive/commit/a2410b2267abe193191f3894d243771ae4b126fd#diff-3b568c93a468dab1b1a619a450bf1c4d88d3ec9539737d09fa6fb7659bc0ae5fR7), so this just seems to have been a slip.) The other discrepancy is that we have `netstandard2.0` in the `lib` folder but `netstandard2.1` in the ref folder. At first glance, this too looks quite a lot like a mistake, particularly when you examine the history. Here is the point in the release history at which the `ref` folder first started having a `netstandard2.1` folder: @@ -45,12 +45,12 @@ And yet, on closer inspection, this appears to be deliberate. Looking at this co https://github.com/dotnet/reactive/commit/0252fb537c9d335b9bc863b65291f152c07ba385 - we see a [comment in Ix.NET/Source/refs/Directory.build.props](https://github.com/dotnet/reactive/commit/0252fb537c9d335b9bc863b65291f152c07ba385#diff-909504334cbab5c432709c95ae78c24fb2910d850958af2ef6de444b18e5c8ecR6) saying: - + we see a [comment in Ix.NET/Source/refs/Directory.Build.props](https://github.com/dotnet/reactive/commit/0252fb537c9d335b9bc863b65291f152c07ba385#diff-909504334cbab5c432709c95ae78c24fb2910d850958af2ef6de444b18e5c8ecR6) saying: + > This is here so we can create a fake .NET Standard 2.1 facade I can only guess that they knew .NET Standard 2.1 was coming, and wanted to ensure that `System.Interactive` was ready for it when it shipped. - + So it was deliberate. But offering reference assemblies for a platform without any corresponding implementation for that platform is an odd choice. (And although at the time this was a placholder for a forthcoming .NET Standard version, it continued to look like this after .NET Standard 2.1 shipped. All subsequent Ix.NET releases have continued to provide `netstandard2.1` in the `ref` folder with no matching folder in `lib`. So it wasn't just a temporary measure.) What purpose does this serve? Some of the features that Ix offers eventually became available in .NET Core, such as `EnumerableEx.SkipLast`. This method exists in the implementation assemblies for every TFM of Ix.NET, but the `netstandard2.1` and `net6.0` reference assemblies omit it. This has the effect that if you're targetting any version of .NET recent enough to have these methods built into the .NET runtime libraries, the Ix.NET equivalents will: diff --git a/Ix.NET/Source/Ix.Async.NET.sln b/Ix.NET/Source/Ix.Async.NET.sln index 09d6fadf4d..0170551859 100644 --- a/Ix.NET/Source/Ix.Async.NET.sln +++ b/Ix.NET/Source/Ix.Async.NET.sln @@ -11,8 +11,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution AsyncQueryableGenerator.t4 = AsyncQueryableGenerator.t4 ..\..\azure-pipelines.ix.yml = ..\..\azure-pipelines.ix.yml CodeCoverage.runsettings = CodeCoverage.runsettings - Directory.build.props = Directory.build.props - Directory.build.targets = Directory.build.targets + Directory.Build.props = Directory.Build.props + Directory.Build.targets = Directory.Build.targets global.json = global.json NuGet.Config = NuGet.Config version.json = version.json diff --git a/Ix.NET/Source/Ix.NET.sln b/Ix.NET/Source/Ix.NET.sln index b6e3fb52ad..93bde2dc84 100644 --- a/Ix.NET/Source/Ix.NET.sln +++ b/Ix.NET/Source/Ix.NET.sln @@ -13,8 +13,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution ..\..\.editorconfig = ..\..\.editorconfig ..\..\azure-pipelines.ix.yml = ..\..\azure-pipelines.ix.yml CodeCoverage.runsettings = CodeCoverage.runsettings - Directory.build.props = Directory.build.props - Directory.build.targets = Directory.build.targets + Directory.Build.props = Directory.Build.props + Directory.Build.targets = Directory.Build.targets version.json = version.json EndProjectSection EndProject @@ -61,7 +61,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLinq", "FasterLinq\Fa EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Refs", "Refs", "{A3D72E6E-4ADA-42E0-8B2A-055B1F244281}" ProjectSection(SolutionItems) = preProject - refs\Directory.build.props = refs\Directory.build.props + refs\Directory.Build.props = refs\Directory.Build.props EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Interactive", "refs\System.Interactive\System.Interactive.csproj", "{2EC0C302-B029-4DDB-AC91-000BF11006AD}" diff --git a/Rx.NET/Documentation/adr/0001-net7.0-era-tooling-update.md b/Rx.NET/Documentation/adr/0001-net7.0-era-tooling-update.md index 659ba8c46f..91454c1b44 100644 --- a/Rx.NET/Documentation/adr/0001-net7.0-era-tooling-update.md +++ b/Rx.NET/Documentation/adr/0001-net7.0-era-tooling-update.md @@ -74,9 +74,9 @@ The following sections describe how we will deal with each of the issues raised In all projects that used to target `net5.0`, change this to `net6.0`. Likewise, in any projects that target `net5.0-windows10.0.19041`, change that to `net6.0-windows10.0.19041`. -Modify the entries in `Directory.build.targets` +Modify the entries in `Directory.Build.targets` -Remove the entries in `Directory.build.targets` that refer to the out-of-support TFMs. This means that several preprocessor constants are no longer used. We should scour the codebase and remove all conditionally compiled sections of code that will no longer be used because the target frameworks that used to bring them in no longer exist. The constants no longer in use are: +Remove the entries in `Directory.Build.targets` that refer to the out-of-support TFMs. This means that several preprocessor constants are no longer used. We should scour the codebase and remove all conditionally compiled sections of code that will no longer be used because the target frameworks that used to bring them in no longer exist. The constants no longer in use are: * `NETSTANDARD1_0` * `NETSTANDARD1_3` diff --git a/Rx.NET/Documentation/adr/0003-uap-targets.md b/Rx.NET/Documentation/adr/0003-uap-targets.md index dea14298d8..3fb5d8d06c 100644 --- a/Rx.NET/Documentation/adr/0003-uap-targets.md +++ b/Rx.NET/Documentation/adr/0003-uap-targets.md @@ -99,14 +99,14 @@ Note that the specific version we actually want to target, 18362, isn't in there The following sections explain how we enable `uap10.0.18362` to be specified as a target framework, even though the tools do not support this. -The project has `Directory.build.props` and `Directory.build.targets` files. The build tools search for these and automatically load them for all projects in the solution. The `Directory.build.props` file has a `` with a `Condition` that means it runs only when the `uap10.0.18362` target is being built, and it sets numerous properties, as described in the following sections. +The project has `Directory.Build.props` and `Directory.Build.targets` files. The build tools search for these and automatically load them for all projects in the solution. The `Directory.Build.props` file has a `` with a `Condition` that means it runs only when the `uap10.0.18362` target is being built, and it sets numerous properties, as described in the following sections. #### Target Platform Version We make our minmum platform version match the one in the TFM: -```xml +```xml 10.0.18362 10.0.18362.0 ``` @@ -130,7 +130,7 @@ However, only _some_ properties should use the old name. We need to set _all_ of #### Compiler Constants -When using the supported UWP build tools (with the old-form project system, which we can't use because we also need to build modern targets), the `WINDOWS_UWP` define constant is set, enabling source code compiled into multiple targets to detect that it is being built for UWP with a `#if WINDOWS_UWP`. So we need this in `Directory.build.props`: +When using the supported UWP build tools (with the old-form project system, which we can't use because we also need to build modern targets), the `WINDOWS_UWP` define constant is set, enabling source code compiled into multiple targets to detect that it is being built for UWP with a `#if WINDOWS_UWP`. So we need this in `Directory.Build.props`: ```xml $(DefineConstants);WINDOWS_UWP @@ -141,7 +141,7 @@ When using the supported UWP build tools (with the old-form project system, whic Normally, when you specify a TFM, the .NET SDK works out what framework library references are required and adds them for you. So if you write `net8.0` in a project file, you will automatically have access to all the .NET 8.0 runtime libraries. But because the .NET SDK does not support UWP, this doesn't work at all. So we need to do three things. -First, we need to set this property in `Directory.build.props`: +First, we need to set this property in `Directory.Build.props`: ```xml True @@ -149,7 +149,7 @@ First, we need to set this property in `Directory.build.props`: Without this, the build tools attempt to add a reference to `mscorlib.dll`, but they don't seem to realise that a) this is the wrong thing and b) they don't actually have a correct location for that, so the reference ends up being `\mscorlib.dll` (i.e., it looks on the root of the hard drive). -Second we need an `ItemGroup` in `Directory.build.targets` containing this: +Second we need an `ItemGroup` in `Directory.bBild.targets` containing this: ```xml false diff --git a/Rx.NET/Source/Directory.build.props b/Rx.NET/Source/Directory.Build.props similarity index 98% rename from Rx.NET/Source/Directory.build.props rename to Rx.NET/Source/Directory.Build.props index 505b9863f1..1a1813eb5b 100644 --- a/Rx.NET/Source/Directory.build.props +++ b/Rx.NET/Source/Directory.Build.props @@ -18,11 +18,12 @@ true true latest + true true - snupkg + snupkg @@ -31,7 +32,7 @@ - + - - + + $(AssemblyName) ($(TargetFramework)) - + $(DefineConstants);HAS_WINFORMS;HAS_WPF;HAS_WINRT;HAS_DISPATCHER;HAS_REMOTING;DESKTOPCLR;NO_NULLABLE_ATTRIBUTES @@ -18,9 +18,12 @@ $(DefineConstants);HAS_TRIMMABILITY_ATTRIBUTES - + $(DefineConstants);HAS_WINRT;HAS_WINFORMS;HAS_WPF;HAS_DISPATCHER;DESKTOPCLR;WINDOWS;CSWINRT + + $(DefineConstants);HAS_SYSTEM_THREADING_LOCK + diff --git a/Rx.NET/Source/System.Reactive.sln b/Rx.NET/Source/System.Reactive.sln index 18ed2a45f5..83e5d9769c 100644 --- a/Rx.NET/Source/System.Reactive.sln +++ b/Rx.NET/Source/System.Reactive.sln @@ -19,8 +19,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution .editorconfig = .editorconfig analyzers.globalconfig = analyzers.globalconfig ..\..\azure-pipelines.rx.yml = ..\..\azure-pipelines.rx.yml - Directory.build.props = Directory.build.props - Directory.build.targets = Directory.build.targets + Directory.Build.props = Directory.Build.props + Directory.Build.targets = Directory.Build.targets global.json = global.json Rx.ruleset = Rx.ruleset Test.ruleset = Test.ruleset @@ -30,13 +30,13 @@ EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{D324579D-CBE6-4867-8980-D7842C7C45A2}" ProjectSection(SolutionItems) = preProject tests\.editorconfig = tests\.editorconfig - tests\Directory.build.props = tests\Directory.build.props - tests\Directory.build.targets = tests\Directory.build.targets + tests\Directory.Build.props = tests\Directory.Build.props + tests\Directory.Build.targets = tests\Directory.Build.targets EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Facades", "Facades", "{A0F39260-B8F8-4FCB-9679-0ED917A22BDF}" ProjectSection(SolutionItems) = preProject - facades\Directory.build.props = facades\Directory.build.props + facades\Directory.Build.props = facades\Directory.Build.props facades\System.Reactive.Compatibility.nuspec = facades\System.Reactive.Compatibility.nuspec EndProjectSection EndProject diff --git a/Rx.NET/Source/facades/Directory.build.props b/Rx.NET/Source/facades/Directory.Build.props similarity index 91% rename from Rx.NET/Source/facades/Directory.build.props rename to Rx.NET/Source/facades/Directory.Build.props index 9b4f5df845..ac9af0e945 100644 --- a/Rx.NET/Source/facades/Directory.build.props +++ b/Rx.NET/Source/facades/Directory.Build.props @@ -1,5 +1,5 @@ - + false false diff --git a/Rx.NET/Source/facades/Directory.Build.targets b/Rx.NET/Source/facades/Directory.Build.targets new file mode 100644 index 0000000000..cbbb8b230a --- /dev/null +++ b/Rx.NET/Source/facades/Directory.Build.targets @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/Rx.NET/Source/facades/Directory.build.targets b/Rx.NET/Source/facades/Directory.build.targets deleted file mode 100644 index 92c4bd8cc7..0000000000 --- a/Rx.NET/Source/facades/Directory.build.targets +++ /dev/null @@ -1,3 +0,0 @@ - - - \ No newline at end of file diff --git a/Rx.NET/Source/src/Directory.Build.props b/Rx.NET/Source/src/Directory.Build.props new file mode 100644 index 0000000000..3a0fbcdeba --- /dev/null +++ b/Rx.NET/Source/src/Directory.Build.props @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/Rx.NET/Source/src/Directory.Build.targets b/Rx.NET/Source/src/Directory.Build.targets new file mode 100644 index 0000000000..cbbb8b230a --- /dev/null +++ b/Rx.NET/Source/src/Directory.Build.targets @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/Rx.NET/Source/src/Directory.build.props b/Rx.NET/Source/src/Directory.build.props deleted file mode 100644 index 22c6e4c8bf..0000000000 --- a/Rx.NET/Source/src/Directory.build.props +++ /dev/null @@ -1,3 +0,0 @@ - - - \ No newline at end of file diff --git a/Rx.NET/Source/src/Directory.build.targets b/Rx.NET/Source/src/Directory.build.targets deleted file mode 100644 index 92c4bd8cc7..0000000000 --- a/Rx.NET/Source/src/Directory.build.targets +++ /dev/null @@ -1,3 +0,0 @@ - - - \ No newline at end of file diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.Synchronize.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.Synchronize.cs index b8579e72ac..4b1bad5198 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.Synchronize.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.Synchronize.cs @@ -1,21 +1,22 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. -// See the LICENSE file in the project root for more information. +// See the LICENSE file in the project root for more information. +using System.Threading; namespace System.Reactive.Concurrency { - internal sealed class Synchronize : Producer._> + internal sealed class SynchronizeWithObject : Producer._> { private readonly IObservable _source; private readonly object? _gate; - public Synchronize(IObservable source, object gate) + public SynchronizeWithObject(IObservable source, object gate) { _source = source; _gate = gate; } - public Synchronize(IObservable source) + public SynchronizeWithObject(IObservable source) { _source = source; } @@ -28,7 +29,7 @@ internal sealed class _ : IdentitySink { private readonly object _gate; - public _(Synchronize parent, IObserver observer) + public _(SynchronizeWithObject parent, IObserver observer) : base(observer) { _gate = parent._gate ?? new object(); @@ -59,4 +60,57 @@ public override void OnCompleted() } } } + + #if HAS_SYSTEM_THREADING_LOCK + internal sealed class SynchronizeWithLock : Producer._> + { + private readonly IObservable _source; + private readonly Lock _gate; + + public SynchronizeWithLock(IObservable source, Lock gate) + { + _source = source; + _gate = gate; + } + + protected override _ CreateSink(IObserver observer) => new(this, observer); + + protected override void Run(_ sink) => sink.Run(_source); + + internal sealed class _ : IdentitySink + { + private readonly Lock _gate; + + public _(SynchronizeWithLock parent, IObserver observer) + : base(observer) + { + _gate = parent._gate; + } + + public override void OnNext(TSource value) + { + lock (_gate) + { + ForwardOnNext(value); + } + } + + public override void OnError(Exception error) + { + lock (_gate) + { + ForwardOnError(error); + } + } + + public override void OnCompleted() + { + lock (_gate) + { + ForwardOnCompleted(); + } + } + } + } + #endif } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs index b4a5b53708..eb3b19a790 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs @@ -1,6 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. -// See the LICENSE file in the project root for more information. +// See the LICENSE file in the project root for more information. using System.ComponentModel; using System.Reactive.Disposables; @@ -229,7 +229,7 @@ public static IObservable Synchronize(IObservable sou throw new ArgumentNullException(nameof(source)); } - return new Synchronize(source); + return new SynchronizeWithObject(source); } /// @@ -252,9 +252,34 @@ public static IObservable Synchronize(IObservable sou throw new ArgumentNullException(nameof(gate)); } - return new Synchronize(source, gate); + return new SynchronizeWithObject(source, gate); } + #if HAS_SYSTEM_THREADING_LOCK + /// + /// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object. + /// + /// The type of the elements in the source sequence. + /// Source sequence. + /// Gate object to synchronize each observer call on. + /// The source sequence whose outgoing calls to observers are synchronized on the given gate object. + /// or is null. + public static IObservable Synchronize(IObservable source, Lock gate) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (gate == null) + { + throw new ArgumentNullException(nameof(gate)); + } + + return new SynchronizeWithLock(source, gate); + } + #endif + #endregion } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs b/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs index 667ab8184f..08e02f82e4 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs @@ -1,6 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. -// See the LICENSE file in the project root for more information. +// See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; @@ -369,6 +369,9 @@ internal partial interface IQueryLanguage IObservable Synchronize(IObservable source); IObservable Synchronize(IObservable source, object gate); + #if HAS_SYSTEM_THREADING_LOCK + IObservable Synchronize(IObservable source, Lock gate); + #endif #endregion diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable.Concurrency.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable.Concurrency.cs index a261c87434..e01884937a 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable.Concurrency.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable.Concurrency.cs @@ -1,6 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. -// See the LICENSE file in the project root for more information. +// See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; using System.Threading; @@ -175,6 +175,32 @@ public static IObservable Synchronize(this IObservable + /// Synchronizes the observable sequence such that observer notifications cannot be delivered concurrently, using the specified gate object. + /// This overload is useful when writing n-ary query operators, in order to prevent concurrent callbacks from different sources by synchronizing on a common gate object. + /// + /// The type of the elements in the source sequence. + /// Source sequence. + /// Gate object to synchronize each observer call on. + /// The source sequence whose outgoing calls to observers are synchronized on the given gate object. + /// or is null. + public static IObservable Synchronize(this IObservable source, Lock gate) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (gate == null) + { + throw new ArgumentNullException(nameof(gate)); + } + + return s_impl.Synchronize(source, gate); + } + #endif + #endregion } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Concurrency.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Concurrency.cs index 89daaba0ab..516eafd5d6 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Concurrency.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Concurrency.cs @@ -1,6 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. -// See the LICENSE file in the project root for more information. +// See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; using System.Threading; @@ -49,6 +49,13 @@ public virtual IObservable Synchronize(IObservable so return Synchronization.Synchronize(source, gate); } + #if HAS_SYSTEM_THREADING_LOCK + public virtual IObservable Synchronize(IObservable source, Lock gate) + { + return Synchronization.Synchronize(source, gate); + } + #endif + #endregion } } diff --git a/Rx.NET/Source/tests/Directory.build.props b/Rx.NET/Source/tests/Directory.Build.props similarity index 95% rename from Rx.NET/Source/tests/Directory.build.props rename to Rx.NET/Source/tests/Directory.Build.props index d29ecf3f99..7e01560abe 100644 --- a/Rx.NET/Source/tests/Directory.build.props +++ b/Rx.NET/Source/tests/Directory.Build.props @@ -1,5 +1,5 @@ - + $(MSBuildThisFileDirectory)..\Test.ruleset diff --git a/Rx.NET/Source/tests/Directory.build.targets b/Rx.NET/Source/tests/Directory.Build.targets similarity index 76% rename from Rx.NET/Source/tests/Directory.build.targets rename to Rx.NET/Source/tests/Directory.Build.targets index 90608d84e9..ff921c180e 100644 --- a/Rx.NET/Source/tests/Directory.build.targets +++ b/Rx.NET/Source/tests/Directory.Build.targets @@ -1,5 +1,5 @@ - - + + portable full diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/SynchronizationTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/SynchronizationTest.cs index 7211f7d40c..c7ded90956 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/SynchronizationTest.cs +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/SynchronizationTest.cs @@ -1,6 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. -// See the LICENSE file in the project root for more information. +// See the LICENSE file in the project root for more information. using System; using System.Reactive.Concurrency; @@ -37,7 +37,10 @@ public void Synchronization_Synchronize_ArgumentChecking() { ReactiveAssert.Throws(() => Synchronization.Synchronize(default(IObservable))); ReactiveAssert.Throws(() => Synchronization.Synchronize(default(IObservable), new object())); - ReactiveAssert.Throws(() => Synchronization.Synchronize(DummyObservable.Instance, null)); + ReactiveAssert.Throws(() => Synchronization.Synchronize(DummyObservable.Instance, (null as object)!)); + #if HAS_SYSTEM_THREADING_LOCK + ReactiveAssert.Throws(() => Synchronization.Synchronize(DummyObservable.Instance, (null as Lock)!)); + #endif } private class MySyncCtx : SynchronizationContext diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/SynchronizeTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/SynchronizeTest.cs index 3ea1c155c9..5a52a9a739 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/SynchronizeTest.cs +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/SynchronizeTest.cs @@ -1,12 +1,9 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT License. -// See the LICENSE file in the project root for more information. - using System; -using System.Linq; -using System.Reactive.Concurrency; +using System.Collections.Generic; using System.Reactive.Linq; +using System.Reactive.Subjects; using System.Threading; +using System.Threading.Tasks; using Microsoft.Reactive.Testing; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -15,116 +12,314 @@ namespace ReactiveTests.Tests { [TestClass] - public class SynchronizeTest : TestBase + public class SynchronizeTest { - [TestMethod] - public void Synchronize_ArgumentChecking() - { - var someObservable = Observable.Empty(); - - ReactiveAssert.Throws(() => Observable.Synchronize(default)); + [TestMethod] + public void Synchronize_WithObject_WhenArgumentsAreNull_ThrowsException() + { + ReactiveAssert.Throws(() => Observable.Synchronize(source: null!)); - ReactiveAssert.Throws(() => Observable.Synchronize(default, new object())); - ReactiveAssert.Throws(() => Observable.Synchronize(someObservable, null)); - } + ReactiveAssert.Throws(() => Observable.Synchronize( + source: null!, + gate: new object())); - [TestMethod] - public void Synchronize_Range() - { - var i = 0; - var outsideLock = true; + ReactiveAssert.Throws(() => Observable.Synchronize( + source: Observable.Empty(), + gate: (null as object)!)); + } - var gate = new object(); - lock (gate) + [TestMethod] + public void Synchronize_WithObject_WhenNotificationsAreSynchronized_PropagatesNotifications() { - outsideLock = false; - Observable.Range(0, 100, NewThreadScheduler.Default).Synchronize(gate).Subscribe(x => i++, () => { Assert.True(outsideLock); }); - Thread.Sleep(100); - Assert.Equal(0, i); - outsideLock = true; + using var source = new Subject(); + + var observer = new TestObserver(); + + using var subscription = source + .Synchronize() + .Subscribe(observer); + + source.OnNext(1); + source.OnNext(2); + source.OnNext(3); + source.OnCompleted(); + + Assert.Equal(observer.ObservedValues, new[] { 1, 2, 3 }); + Assert.Equal(observer.ObservedCompletionCount, 1); } - while (i < 100) + [TestMethod] + [Timeout(10000)] + public async Task Synchronize_WithObject_WhenGateIsLocked_BlocksNotification() { - Thread.Sleep(10); + using var source = new Subject(); + + var gate = new object(); + + var observer = new TestObserver(); + + using var subscription = source + .Synchronize(gate: gate) + .Subscribe(observer); + + var whenHostReady = new ManualResetEventSlim(); + var whenProducerRunning = new ManualResetEventSlim(); + + var whenProducerFinished = Task.Run(() => + { + whenHostReady.Wait(); + + // 3) Try to produce a value, which should block on the gate + whenProducerRunning.Set(); + source.OnNext(1); + }); + + // 1) Lock the gate, so the producer gets blocked when producing a value lock (gate) { - var start = i; - Thread.Sleep(100); - Assert.Equal(start, i); + whenHostReady.Set(); + + // 2) Wait for the producer to actually run, and get blocked + whenProducerRunning.Wait(); + Thread.Yield(); + + // 4) Check that the value hasn't been produced + Assert.Empty(observer.ObservedValues); + Assert.Empty(observer.ObservedErrors); } + + await whenProducerFinished; + + // 5) Check that the second value has been produced + Assert.Equal(observer.ObservedValues, new[] { 1 }); + Assert.Empty(observer.ObservedErrors); + Assert.Equal(observer.ObservedCompletionCount, 0); } + + [TestMethod] + [Timeout(10000)] + public async Task Synchronize_WithObject_WhenPublishingNotification_LocksGate() + { + using var source = new Subject(); + + var gate = new object(); + + var whenObserverRunning = new ManualResetEventSlim(); + var whenHostFinished = new ManualResetEventSlim(); + + using var subscription = source + .Synchronize(gate: gate) + .Subscribe(_ => + { + whenObserverRunning.Set(); + + // 3) Wait for the host to verify the lock + whenHostFinished.Wait(); + }); + + // 1) Check that the gate isn't locked during operator setup. + var wasGateEntered = Monitor.TryEnter(gate); + Assert.True(wasGateEntered); + Monitor.Exit(gate); + + var whenProducerFinished = Task.Run(() => + { + source.OnNext(1); + }); + + // 2) Wait for the observer to open a lock on the gate. + whenObserverRunning.Wait(); + + // 4) Check that the gate is locked + wasGateEntered = Monitor.TryEnter(gate); + try + { + Assert.False(wasGateEntered); + } + finally + { + if (wasGateEntered) + Monitor.Exit(gate); + } + + whenHostFinished.Set(); + + // 5) Wait for the producer/observer to release the gate + await whenProducerFinished; + + // 6) Check that the gate is unlocked again + wasGateEntered = Monitor.TryEnter(gate); + Assert.True(wasGateEntered); + Monitor.Exit(gate); + } + + [TestMethod] + public void Synchronize_WithLock_WhenArgumentsAreNull_ThrowsException() + { + ReactiveAssert.Throws(() => Observable.Synchronize( + source: null!, + gate: new Lock())); + + ReactiveAssert.Throws(() => Observable.Synchronize( + source: Observable.Empty(), + gate: (null as Lock)!)); } [TestMethod] - public void Synchronize_Throw() + public void Synchronize_WithLock_WhenNotificationsAreSynchronized_PropagatesNotifications() { - var ex = new Exception(); - var resLock = new object(); - var e = default(Exception); - var outsideLock = true; + using var source = new Subject(); - var gate = new object(); - lock (gate) + var observer = new TestObserver(); + + using var subscription = source + .Synchronize(new Lock()) + .Subscribe(observer); + + source.OnNext(1); + source.OnNext(2); + source.OnNext(3); + source.OnCompleted(); + + Assert.Equal(observer.ObservedValues, new[] { 1, 2, 3 }); + Assert.Equal(observer.ObservedCompletionCount, 1); + } + + [TestMethod] + [Timeout(10000)] + public async Task Synchronize_WithLock_WhenGateIsLocked_BlocksNotification() + { + using var source = new Subject(); + + var gate = new Lock(); + + var observer = new TestObserver(); + + using var subscription = source + .Synchronize(gate: gate) + .Subscribe(observer); + + var whenHostReady = new ManualResetEventSlim(); + var whenProducerRunning = new ManualResetEventSlim(); + + var whenProducerFinished = Task.Run(() => { - outsideLock = false; - Observable.Throw(ex, NewThreadScheduler.Default).Synchronize(gate).Subscribe(x => { Assert.True(false); }, err => { lock (resLock) { e = err; } }, () => { Assert.True(outsideLock); }); - Thread.Sleep(100); - Assert.Null(e); - outsideLock = true; - } + whenHostReady.Wait(); + + // 3) Try to produce a value, which should block on the gate + whenProducerRunning.Set(); + source.OnNext(1); + }); - while (true) + // 1) Lock the gate, so the producer gets blocked when producing a value + lock (gate) { - lock (resLock) - { - if (e != null) - { - break; - } - } + whenHostReady.Set(); + + // 2) Wait for the producer to actually run, and get blocked + whenProducerRunning.Wait(); + Thread.Yield(); + + // 4) Check that the value hasn't been produced + Assert.Empty(observer.ObservedValues); + Assert.Empty(observer.ObservedErrors); } - Assert.Same(ex, e); + await whenProducerFinished; + + // 5) Check that the second value has been produced + Assert.Equal(observer.ObservedValues, new[] { 1 }); + Assert.Empty(observer.ObservedErrors); + Assert.Equal(observer.ObservedCompletionCount, 0); } [TestMethod] - public void Synchronize_BadObservable() + [Timeout(10000)] + public async Task Synchronize_WithLock_WhenPublishingNotification_LocksGate() { - var o = Observable.Create(obs => - { - var t1 = new Thread(() => - { - for (var i = 0; i < 100; i++) - { - obs.OnNext(i); - } - }); + using var source = new Subject(); + + var gate = new Lock(); - new Thread(() => + var whenObserverRunning = new ManualResetEventSlim(); + var whenHostFinished = new ManualResetEventSlim(); + + using var subscription = source + .Synchronize(gate: gate) + .Subscribe(_ => { - t1.Start(); + whenObserverRunning.Set(); - for (var i = 100; i < 200; i++) - { - obs.OnNext(i); - } + // 3) Wait for the host to verify the lock + whenHostFinished.Wait(); + }); - t1.Join(); - obs.OnCompleted(); - }).Start(); + // 1) Check that the gate isn't locked during operator setup. + var wasGateEntered = gate.TryEnter(); + Assert.True(wasGateEntered); + gate.Exit(); - return () => { }; + var whenProducerFinished = Task.Run(() => + { + source.OnNext(1); }); - var evt = new ManualResetEvent(false); + // 2) Wait for the observer to open a lock on the gate. + whenObserverRunning.Wait(); + + // 4) Check that the gate is locked + wasGateEntered = gate.TryEnter(); + try + { + Assert.False(wasGateEntered); + } + finally + { + if (wasGateEntered) + gate.Exit(); + } + + whenHostFinished.Set(); + + // 5) Wait for the producer/observer to release the gate + await whenProducerFinished; + + // 6) Check that the gate is unlocked again + wasGateEntered = gate.TryEnter(); + Assert.True(wasGateEntered); + gate.Exit(); + } + + private class TestObserver + : IObserver + { + public TestObserver() + { + _observedErrors = new(); + _observedValues = new(); + } + + public int ObservedCompletionCount + => _observedCompletionCount; + + public List ObservedErrors + => _observedErrors; + + public List ObservedValues + => _observedValues; + + void IObserver.OnCompleted() + => ++_observedCompletionCount; - var sum = 0; - o.Synchronize().Subscribe(x => sum += x, () => { evt.Set(); }); + void IObserver.OnError(Exception error) + => _observedErrors.Add(error); - evt.WaitOne(); + void IObserver.OnNext(T value) + => _observedValues.Add(value); - Assert.Equal(Enumerable.Range(0, 200).Sum(), sum); + private int _observedCompletionCount; + private readonly List _observedErrors; + private readonly List _observedValues; } } }