Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
8 changes: 4 additions & 4 deletions Ix.NET/Documentation/adr/0001-Ix-Ref-Assembly-Mismatches.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ At the time of writing this, the current version of `System.Interactive` is 6.0.
* `net6.0`
* `netstandard2.1`


The use of `net4.8` in `ref` seems to have been a bug: that should have been `net48`. (The main reason I am confident it's a bug, and not a clever but obscure trick that we've not understood, is that the [commit of 2021/12/06 that added this](https://github.com/dotnet/reactive/commit/a2410b2267abe193191f3894d243771ae4b126fd) used [`net48` in reference assemblies for one of the other packages](https://github.com/dotnet/reactive/commit/a2410b2267abe193191f3894d243771ae4b126fd#diff-3b568c93a468dab1b1a619a450bf1c4d88d3ec9539737d09fa6fb7659bc0ae5fR7), so this just seems to have been a slip.)

The other discrepancy is that we have `netstandard2.0` in the `lib` folder but `netstandard2.1` in the ref folder. At first glance, this too looks quite a lot like a mistake, particularly when you examine the history. Here is the point in the release history at which the `ref` folder first started having a `netstandard2.1` folder:
Expand All @@ -45,12 +45,12 @@ And yet, on closer inspection, this appears to be deliberate. Looking at this co

https://github.com/dotnet/reactive/commit/0252fb537c9d335b9bc863b65291f152c07ba385

we see a [comment in Ix.NET/Source/refs/Directory.build.props](https://github.com/dotnet/reactive/commit/0252fb537c9d335b9bc863b65291f152c07ba385#diff-909504334cbab5c432709c95ae78c24fb2910d850958af2ef6de444b18e5c8ecR6) saying:
we see a [comment in Ix.NET/Source/refs/Directory.Build.props](https://github.com/dotnet/reactive/commit/0252fb537c9d335b9bc863b65291f152c07ba385#diff-909504334cbab5c432709c95ae78c24fb2910d850958af2ef6de444b18e5c8ecR6) saying:

> This is here so we can create a fake .NET Standard 2.1 facade

I can only guess that they knew .NET Standard 2.1 was coming, and wanted to ensure that `System.Interactive` was ready for it when it shipped.

So it was deliberate. But offering reference assemblies for a platform without any corresponding implementation for that platform is an odd choice. (And although at the time this was a placholder for a forthcoming .NET Standard version, it continued to look like this after .NET Standard 2.1 shipped. All subsequent Ix.NET releases have continued to provide `netstandard2.1` in the `ref` folder with no matching folder in `lib`. So it wasn't just a temporary measure.) What purpose does this serve?

Some of the features that Ix offers eventually became available in .NET Core, such as `EnumerableEx.SkipLast`. This method exists in the implementation assemblies for every TFM of Ix.NET, but the `netstandard2.1` and `net6.0` reference assemblies omit it. This has the effect that if you're targetting any version of .NET recent enough to have these methods built into the .NET runtime libraries, the Ix.NET equivalents will:
Expand Down
4 changes: 2 additions & 2 deletions Ix.NET/Source/Ix.Async.NET.sln
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
AsyncQueryableGenerator.t4 = AsyncQueryableGenerator.t4
..\..\azure-pipelines.ix.yml = ..\..\azure-pipelines.ix.yml
CodeCoverage.runsettings = CodeCoverage.runsettings
Directory.build.props = Directory.build.props
Directory.build.targets = Directory.build.targets
Directory.Build.props = Directory.Build.props
Directory.Build.targets = Directory.Build.targets
global.json = global.json
NuGet.Config = NuGet.Config
version.json = version.json
Expand Down
6 changes: 3 additions & 3 deletions Ix.NET/Source/Ix.NET.sln
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
..\..\.editorconfig = ..\..\.editorconfig
..\..\azure-pipelines.ix.yml = ..\..\azure-pipelines.ix.yml
CodeCoverage.runsettings = CodeCoverage.runsettings
Directory.build.props = Directory.build.props
Directory.build.targets = Directory.build.targets
Directory.Build.props = Directory.Build.props
Directory.Build.targets = Directory.Build.targets
version.json = version.json
EndProjectSection
EndProject
Expand Down Expand Up @@ -61,7 +61,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLinq", "FasterLinq\Fa
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Refs", "Refs", "{A3D72E6E-4ADA-42E0-8B2A-055B1F244281}"
ProjectSection(SolutionItems) = preProject
refs\Directory.build.props = refs\Directory.build.props
refs\Directory.Build.props = refs\Directory.Build.props
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Interactive", "refs\System.Interactive\System.Interactive.csproj", "{2EC0C302-B029-4DDB-AC91-000BF11006AD}"
Expand Down
4 changes: 2 additions & 2 deletions Rx.NET/Documentation/adr/0001-net7.0-era-tooling-update.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ The following sections describe how we will deal with each of the issues raised

In all projects that used to target `net5.0`, change this to `net6.0`. Likewise, in any projects that target `net5.0-windows10.0.19041`, change that to `net6.0-windows10.0.19041`.

Modify the entries in `Directory.build.targets`
Modify the entries in `Directory.Build.targets`

Remove the entries in `Directory.build.targets` that refer to the out-of-support TFMs. This means that several preprocessor constants are no longer used. We should scour the codebase and remove all conditionally compiled sections of code that will no longer be used because the target frameworks that used to bring them in no longer exist. The constants no longer in use are:
Remove the entries in `Directory.Build.targets` that refer to the out-of-support TFMs. This means that several preprocessor constants are no longer used. We should scour the codebase and remove all conditionally compiled sections of code that will no longer be used because the target frameworks that used to bring them in no longer exist. The constants no longer in use are:

* `NETSTANDARD1_0`
* `NETSTANDARD1_3`
Expand Down
12 changes: 6 additions & 6 deletions Rx.NET/Documentation/adr/0003-uap-targets.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ Note that the specific version we actually want to target, 18362, isn't in there

The following sections explain how we enable `uap10.0.18362` to be specified as a target framework, even though the tools do not support this.

The project has `Directory.build.props` and `Directory.build.targets` files. The build tools search for these and automatically load them for all projects in the solution. The `Directory.build.props` file has a `<PropertyGroup>` with a `Condition` that means it runs only when the `uap10.0.18362` target is being built, and it sets numerous properties, as described in the following sections.
The project has `Directory.Build.props` and `Directory.Build.targets` files. The build tools search for these and automatically load them for all projects in the solution. The `Directory.Build.props` file has a `<PropertyGroup>` with a `Condition` that means it runs only when the `uap10.0.18362` target is being built, and it sets numerous properties, as described in the following sections.


#### Target Platform Version

We make our minmum platform version match the one in the TFM:

```xml
```xml
<TargetPlatformMinVersion>10.0.18362</TargetPlatformMinVersion>
<TargetPlatformVersion>10.0.18362.0</TargetPlatformVersion>
```
Expand All @@ -130,7 +130,7 @@ However, only _some_ properties should use the old name. We need to set _all_ of

#### Compiler Constants

When using the supported UWP build tools (with the old-form project system, which we can't use because we also need to build modern targets), the `WINDOWS_UWP` define constant is set, enabling source code compiled into multiple targets to detect that it is being built for UWP with a `#if WINDOWS_UWP`. So we need this in `Directory.build.props`:
When using the supported UWP build tools (with the old-form project system, which we can't use because we also need to build modern targets), the `WINDOWS_UWP` define constant is set, enabling source code compiled into multiple targets to detect that it is being built for UWP with a `#if WINDOWS_UWP`. So we need this in `Directory.Build.props`:

```xml
<DefineConstants>$(DefineConstants);WINDOWS_UWP</DefineConstants>
Expand All @@ -141,15 +141,15 @@ When using the supported UWP build tools (with the old-form project system, whic

Normally, when you specify a TFM, the .NET SDK works out what framework library references are required and adds them for you. So if you write `<TargetFramework>net8.0<TargetFramework>` in a project file, you will automatically have access to all the .NET 8.0 runtime libraries. But because the .NET SDK does not support UWP, this doesn't work at all. So we need to do three things.

First, we need to set this property in `Directory.build.props`:
First, we need to set this property in `Directory.Build.props`:

```xml
<NoStdLib>True</NoStdLib>
```

Without this, the build tools attempt to add a reference to `mscorlib.dll`, but they don't seem to realise that a) this is the wrong thing and b) they don't actually have a correct location for that, so the reference ends up being `\mscorlib.dll` (i.e., it looks on the root of the hard drive).

Second we need an `ItemGroup` in `Directory.build.targets` containing this:
Second we need an `ItemGroup` in `Directory.bBild.targets` containing this:

```xml
<PackageReference Include="Microsoft.NETCore.UniversalWindowsPlatform"
Expand All @@ -171,7 +171,7 @@ You might be wondering about that 26100 in there. Why is that not 18362, consist

#### Prevent Over-Zealous WinRT Interop Code Generation

The .NET SDK has a feature by which it can generate WinRT versions of .NET types to enable interop between .NET and WinRT code. Unfortunately, the way we've rigged things up to be able to build for `uap10.0.18362.0` seems to cause this to generate these interop types for any .NET class that implements `IDisposable`! This is not helpful. So we disable the feature in `Directory.build.targets`:
The .NET SDK has a feature by which it can generate WinRT versions of .NET types to enable interop between .NET and WinRT code. Unfortunately, the way we've rigged things up to be able to build for `uap10.0.18362.0` seems to cause this to generate these interop types for any .NET class that implements `IDisposable`! This is not helpful. So we disable the feature in `Directory.Build.targets`:

```xml
<CsWinRTAotOptimizerEnabled>false</CsWinRTAotOptimizerEnabled>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<LangVersion>latest</LangVersion>
<EnableWindowsTargeting>true</EnableWindowsTargeting>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' != 'Debug'">
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
</PropertyGroup>

<PropertyGroup Condition="'$(TF_BUILD)' == 'true'">
Expand All @@ -31,7 +32,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All"/>

<!--
Nerdbank.GitVersioning 3.6.128 injects a reference to a .proj file that doesn't work inside the
UWP test runner project. We don't ship that as a NuGet package, so it doesn't matter what its
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
<Project>

<!-- This props all need to be set in targets as they depend on the values set earlier -->
<PropertyGroup>

<PropertyGroup>
<Product>$(AssemblyName) ($(TargetFramework))</Product>
</PropertyGroup>

<PropertyGroup Condition="'$(TargetFramework)' == 'net472'">
<DefineConstants>$(DefineConstants);HAS_WINFORMS;HAS_WPF;HAS_WINRT;HAS_DISPATCHER;HAS_REMOTING;DESKTOPCLR;NO_NULLABLE_ATTRIBUTES</DefineConstants>
</PropertyGroup>
Expand All @@ -21,6 +21,9 @@
<PropertyGroup Condition="$(TargetFramework.StartsWith('net6.0-windows')) or $(TargetFramework.StartsWith('net8.0-windows')) or $(TargetFramework.StartsWith('net9.0-windows'))">
<DefineConstants>$(DefineConstants);HAS_WINRT;HAS_WINFORMS;HAS_WPF;HAS_DISPATCHER;DESKTOPCLR;WINDOWS;CSWINRT</DefineConstants>
</PropertyGroup>
<PropertyGroup Condition="$(TargetFramework.StartsWith('net9.0'))">
<DefineConstants>$(DefineConstants);HAS_LOCK_CLASS</DefineConstants>
</PropertyGroup>

<ItemGroup Condition="('$(TargetFramework)' == 'net472' or '$(TargetFramework)' == 'uap10.0.18362' or '$(TargetFramework)' == 'netstandard2.0') and $(IsPackable)">
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
Expand Down
10 changes: 5 additions & 5 deletions Rx.NET/Source/System.Reactive.sln
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
.editorconfig = .editorconfig
analyzers.globalconfig = analyzers.globalconfig
..\..\azure-pipelines.rx.yml = ..\..\azure-pipelines.rx.yml
Directory.build.props = Directory.build.props
Directory.build.targets = Directory.build.targets
Directory.Build.props = Directory.Build.props
Directory.Build.targets = Directory.Build.targets
global.json = global.json
Rx.ruleset = Rx.ruleset
Test.ruleset = Test.ruleset
Expand All @@ -30,13 +30,13 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{D324579D-CBE6-4867-8980-D7842C7C45A2}"
ProjectSection(SolutionItems) = preProject
tests\.editorconfig = tests\.editorconfig
tests\Directory.build.props = tests\Directory.build.props
tests\Directory.build.targets = tests\Directory.build.targets
tests\Directory.Build.props = tests\Directory.Build.props
tests\Directory.Build.targets = tests\Directory.Build.targets
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Facades", "Facades", "{A0F39260-B8F8-4FCB-9679-0ED917A22BDF}"
ProjectSection(SolutionItems) = preProject
facades\Directory.build.props = facades\Directory.build.props
facades\Directory.Build.props = facades\Directory.Build.props
facades\System.Reactive.Compatibility.nuspec = facades\System.Reactive.Compatibility.nuspec
EndProjectSection
EndProject
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<Project>
<Import Project="..\Directory.build.props" />
<Import Project="..\Directory.Build.props" />
<PropertyGroup>
<GenerateAssemblyVersionAttribute>false</GenerateAssemblyVersionAttribute>
<GenerateAssemblyVersionInfo>false</GenerateAssemblyVersionInfo>
Expand Down
3 changes: 3 additions & 0 deletions Rx.NET/Source/facades/Directory.Build.targets
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<Project>
<Import Project="..\Directory.Build.targets" />
</Project>
3 changes: 0 additions & 3 deletions Rx.NET/Source/facades/Directory.build.targets

This file was deleted.

3 changes: 3 additions & 0 deletions Rx.NET/Source/src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<Project>
<Import Project="..\Directory.Build.props" />
</Project>
3 changes: 3 additions & 0 deletions Rx.NET/Source/src/Directory.Build.targets
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<Project>
<Import Project="..\Directory.Build.targets" />
</Project>
3 changes: 0 additions & 3 deletions Rx.NET/Source/src/Directory.build.props

This file was deleted.

3 changes: 0 additions & 3 deletions Rx.NET/Source/src/Directory.build.targets

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
// See the LICENSE file in the project root for more information.

namespace System.Reactive.Concurrency
{
internal sealed class Synchronize<TSource> : Producer<TSource, Synchronize<TSource>._>
internal sealed class Synchronize<TSource, TGate> : Producer<TSource, Synchronize<TSource, TGate>._>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will do what you intend. I think you are assuming that then this type gets instantiated with TGate set to Lock, the lock keyword usage inside the sink's OnNext, OnError, and OnCompleted will use the new behaviour for the Lock type, and when it is instantiated with object, it will use the old Monitor-based behaviour.

As far as I can tell this is not what happens. If you write a generic type like this and then use the lock keyword on some expression of type TGate, the compiler will emit the old-style lock code. E.g., in ILDASM I see this:

  IL_0000:  nop
  IL_0001:  ldarg.0
  IL_0002:  ldfld      !1 class Synchronize`2<!TSource,!TGate>::_gate
  IL_0007:  box        !TGate
  IL_000c:  stloc.0
  IL_000d:  ldc.i4.0
  IL_000e:  stloc.1
  .try
  {
    IL_000f:  ldloc.0
    IL_0010:  ldloca.s   V_1
    IL_0012:  call       void [System.Threading]System.Threading.Monitor::Enter(object,
                                                                                bool&)

So no matter what type TGate gets instantiated as, we're going to get the old-style Monitor-based behaviour for lock.

As far as I know, the C# compiler will only generate the new System.Threading.Lock based code for a lock in cases where it knows at compile time that the expression is definitely of type Lock, which won't necessarily be the case here.

(Remember that the compiler is going to emit exactly one implementation of OnNext, OnError, and OnCompleted in the sink class. The instantiation for specific type arguments happens at runtime, but the IL generation has already happened by then. And the behaviour of lock—do you get the Monitor or the Lock version—is determined at compile time. It generates different IL for these two cases.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, really good catch. I was simply hoping to avoid duping the whole Producer implementation. I'll rewrite it.

where TGate : notnull, new()
{
private readonly IObservable<TSource> _source;
private readonly object? _gate;
private readonly TGate? _gate;

public Synchronize(IObservable<TSource> source, object gate)
public Synchronize(IObservable<TSource> source, TGate gate)
{
_source = source;
_gate = gate;
Expand All @@ -26,12 +27,12 @@ public Synchronize(IObservable<TSource> source)

internal sealed class _ : IdentitySink<TSource>
{
private readonly object _gate;
private readonly TGate _gate;

public _(Synchronize<TSource> parent, IObserver<TSource> observer)
public _(Synchronize<TSource, TGate> parent, IObserver<TSource> observer)
: base(observer)
{
_gate = parent._gate ?? new object();
_gate = parent._gate ?? new TGate();
}

public override void OnNext(TSource value)
Expand Down
31 changes: 28 additions & 3 deletions Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
// See the LICENSE file in the project root for more information.

using System.ComponentModel;
using System.Reactive.Disposables;
Expand Down Expand Up @@ -229,7 +229,7 @@ public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> sou
throw new ArgumentNullException(nameof(source));
}

return new Synchronize<TSource>(source);
return new Synchronize<TSource, object>(source);
}

/// <summary>
Expand All @@ -252,9 +252,34 @@ public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> sou
throw new ArgumentNullException(nameof(gate));
}

return new Synchronize<TSource>(source, gate);
return new Synchronize<TSource, object>(source, gate);
}

#if HAS_LOCK_CLASS
/// <summary>
/// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence.</param>
/// <param name="gate">Gate object to synchronize each observer call on.</param>
/// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is <c>null</c>.</exception>
public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, Lock gate)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the only additional public method required. Unless I've missed it, you've not added an equivalent extra overload in the Observable.Concurrency.cs type. (And you might also need to add an equivalent method to IQueryLanguage.)

Oh you'll also need to rerun the homoiconicity project to generate the IQbservable<T> forms of this, which I don't think you've done?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot about the whole IQueryLanguage layer, yeah. I've debugged through it before, so I'm familiar with how it works, but I've never tried to implement anything.

I have basically no knowledge of anything related to IQbservable<T>, but I'll look into it.

Copy link
Collaborator

@idg10 idg10 Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The basic idea behind IQbservable<T> is that it supports exactly the same API as IObservable<T>, but the output is effectively a description of whatever query you've written. So if you do this:

IObservable<int> src = GetSomeObservable();

IObservable<int> xs = src
    .Where(x => x > 0)
    .Select(x => x * 2);

then xs is a thing you can actually subscribe to that removes negative numbers, and then doubles everything else. But if we write the very similar:

IQbservable<int> xs = src
    .AsQbservable()
    .Where(x => x > 0)
    .Select(x => x * 2);

Console.WriteLine(xs.Expression);

then we've basically got the same query but this time as an IQbservable<int>, and that means that this is a description of the observable source. Instead of just being a thing we can subscribe to, we can inspect this. That Console.WriteLine(xs.Expression); displays this:

System.Reactive.Linq.ObservableImpl.RangeRecursive.Where(x => (x > 0)).Select(x => (x * 2))

(The RangeRecursive there comes from the fact that in the example I'm testing this in, my GetSomeObservable() is returning Observable.Range(0, 10);. So that's really just the type of src here.)

So you can see that this thing knows that this is a Where clause followed by a Select clause. And if you were to inspect xs in the debugger, you'll see a DebugView property that looks like this:

.Call System.Reactive.Linq.Qbservable.Select(
    .Call System.Reactive.Linq.Qbservable.Where(
        .Constant<System.Reactive.ObservableQuery`1[System.Int32]>(System.Reactive.Linq.ObservableImpl.RangeRecursive),
        '(.Lambda #Lambda1<System.Func`2[System.Int32,System.Boolean]>)),
    '(.Lambda #Lambda2<System.Func`2[System.Int32,System.Int32]>))

.Lambda #Lambda1<System.Func`2[System.Int32,System.Boolean]>(System.Int32 $x) {
    $x > 0
}

.Lambda #Lambda2<System.Func`2[System.Int32,System.Int32]>(System.Int32 $x) {
    $x * 2
}

So the basic idea here is that an IQbservable<T> is a complete description of the subscription, one that can be inspected at runtime.

IQbservable<T> is to IObservable<T> as IQueryable<T> is to IEnumerable<T>. Systems like Entity Framework exploit the fact that an IQueryable<T> remembers exactly how the query was constructed—just like with the IQbservable<T> I've shown here, an IQueryable<T> would remember that it was built up as (say) a Where and a Select, and EF would then use that information to work out what SQL query to generate to be able to execute the logic that the query represents on a database.

The idea behind IQueryable<T> is that it could be used to support similar mechanisms. You could imagine a remote monitoring device presenting an IQueryable<T> API, and if that remote device were able to support local filtering, you could imagine this implementation of IQueryable<T> detecting when your query starts with a Where clause, and translating that into whatever format the remote device uses to express the filtering (in exactly the same way that Entity Framework translates a LINQ Where into a SQL WHERE clause.)

Admittedly, I'm not aware of any public libraries that actually do that. But it's a model we support, and it's also the basis of distributed query execution in Reaqtor (https://reaqtive.net/). So there are automated tests that check that the public API we define for IObservable<T> is fully matched by the one available for IQueryable<T>. And we have a tool in the repo that auto-generates the necessary code (the Homoiconicity tool). Although the tool is a bit cranky, and currently I have to manually fix the code it generates...you end up running it and then discarding most of what it changed, and keeping just the new bits. At some point I need to fix the tool so that we don't have to do that every time we add a new API feature.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IQbservable<T> is to IObservable<T> as IQueryable<T> is to IEnumerable<T>.

I suspected it might be something like that. And you're saying there's no actual implementations here, just the modeling. I can work with that.

{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}

if (gate == null)
{
throw new ArgumentNullException(nameof(gate));
}

return new Synchronize<TSource, Lock>(source, gate);
}
#endif

#endregion
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<Project>
<Import Project="..\Directory.build.props" />
<Import Project="..\Directory.Build.props" />
<PropertyGroup>
<CodeAnalysisRuleSet>$(MSBuildThisFileDirectory)..\Test.ruleset</CodeAnalysisRuleSet>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<Project>
<Import Project="..\Directory.build.targets" />
<Project>
<Import Project="..\Directory.Build.targets" />
<PropertyGroup>
<DebugType Condition="'$(TargetFramework)' != 'net472'">portable</DebugType>
<DebugType Condition="'$(TargetFramework)' == 'net472'">full</DebugType>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
// See the LICENSE file in the project root for more information.

using System;
using System.Reactive.Concurrency;
Expand Down Expand Up @@ -37,7 +37,10 @@ public void Synchronization_Synchronize_ArgumentChecking()
{
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(default(IObservable<int>)));
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(default(IObservable<int>), new object()));
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(DummyObservable<int>.Instance, null));
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(DummyObservable<int>.Instance, null as object));
#if HAS_LOCK_CLASS
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(DummyObservable<int>.Instance, null as Lock));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this is fine as far as it goes, you've not added any behavioural tests.

You need to verify that it really does acquire the Lock in the proper way (which, as per my earlier comment, I don't believe it actually does right now).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got no issue with writing new functional tests, I just figured I'd start by following the pattern of what the existing operator has, which is nothing. Like, perhaps testing for thread lock mechanics was deemed not worth it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following the pattern of what the existing operator has, which is nothing

It's not quite as bad as that, it's just that the tests are not where you thought they were (probably because you hadn't realised that Rx strangely offers two completely different APIs for using this: the Synchronization class, whose meagre tests you have found, and also the Synchronize LINQ operator, whose existence you missed.)

The main tests for this code all go in through the LINQ operator, and can be found in Rx.NET\Source\tests\Tests.System.Reactive\Tests\Linq\Observable\SynchronizeTest.cs

That said, now that I look at these tests they aren't exactly great.

This is the first time I've looked at this part of the code since taking over. If I were implementing this new feature, I'd want to expand the tests that verify the behaviour for the existing code too. I'd consider that to be an important part of ensuring that the behaviour is consistent across these two variations. The most important things to test would be:

  1. if something else is in possession of the Lock (or owns the monitor if we're testing existing code), and a source produces an item, that item is not delivered to the subscriber until after that lock is released
  2. if a subscriber's OnNext, OnComplete, or OnError has been called and has not yet returned, it is not possible for some other thread to acquire the lock (or monitor) until after the subscriber returns

There's a third scenario in which a no-arguments Synchronize() supposedly protects you from a rogue source: if some observable source breaks the contract by calling some method on an IObserver before the preceding call has completed, this form of Synchronize is supposed to block that until the lock becomes available. I don't know how well this works in practice—such a source is illegal according to the rules of Rx, so it's not clear what we would guarantee in practice. The cases I'd be suspicious of is if an OnNext is in progress, and then the source invokes both OnNext and OnComplete (or OnNext and OnError), does Synchronize make any effort to ensure the calls are delivered in the order in which they started? I suspect you probably just end up with all the threads waiting on the lock, and whichever one the CLR happens to allow through first gets to go next.

However...I've just discovered we actually rely on this rogue source protection internally! Looking in QueryLanguageEx I see that we define an internal Combine operator that gets used by the public ForkJoin, and this does appear to rely on the ability of Synchronize to convert potentially concurrent notifications into one-at-a-time ones. So I guess we probably should be testing that too. However, since you don't need to implement any Lock-flavoured version of that, this third thing is arguably well outside the scope of this particular change.

Copy link
Author

@JakenVeina JakenVeina Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and also the Synchronize LINQ operator, whose existence you missed

I wouldn't say I missed its existence, but I definitely forgot to walk through the IQueryLanguage abstraction layer. when I was searching for affected references.

If I were implementing this new feature, I'd want to expand the tests that verify the behaviour for the existing code too. I'd consider that to be an important part of ensuring that the behaviour is consistent across these two variations.

I like you. I know too many people who would call this "scope creep".

if something else is in possession of the Lock (or owns the monitor if we're testing existing code), and a source produces an item, that item is not delivered to the subscriber until after that lock is released
if a subscriber's OnNext, OnComplete, or OnError has been called and has not yet returned, it is not possible for some other thread to acquire the lock (or monitor) until after the subscriber returns

Fully agreed, with direct access to the gate object, that sounds like the proper way to approach this.

There's a third scenario in which a no-arguments Synchronize() supposedly protects you from a rogue source.

Yup, I've encountered this in DynamicData, and we actually have testing fixtures to detect rogue sources.

The cases I'd be suspicious of is if an OnNext is in progress, and then the source invokes both OnNext and OnComplete (or OnNext and OnError), does Synchronize make any effort to ensure the calls are delivered in the order in which they started?

The existing implementation sure doesn't look like it would do this. It just hands off to lock/Monitor.Enter() which doesn't technically make any guarantee of order.

I suspect you probably just end up with all the threads waiting on the lock, and whichever one the CLR happens to allow through first gets to go next.

I think Synchronize() is presented rather clearly as an RX-logical-equivalent of a lock, so I would call this expected behavior. Consumers that REALLY need to guarantee serialization-in-time have options to do so at higher levels. I'm thinking something like a Subject<> or an IScheduler backed by a Channel<>.

#endif
}

private class MySyncCtx : SynchronizationContext
Expand Down
Loading