-
Couldn't load subscription status.
- Fork 777
Add support for System.Threading.Lock #2254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Thanks for this contribution. Because we currently have work in progress on another branch that makes changes to the build system (as part of fixing the long-standing packaging problems that cause bloat when using self-contained deployment with a Windows TFM) I think I need get that merged to main before we will be able to merge this PR. I'm going to review it anyway, but I think your changes to the build might not be able to stay, or at any rate may need to change a bit. I'm a little confused by them anyway, because I've had to fix some things on that other branch to align with current tooling, but we're building just fine on Linux for the integration test phase without having to make most of the changes you've made. For example, you seem to have changed the casing in a filename: |
| namespace System.Reactive.Concurrency | ||
| { | ||
| internal sealed class Synchronize<TSource> : Producer<TSource, Synchronize<TSource>._> | ||
| internal sealed class Synchronize<TSource, TGate> : Producer<TSource, Synchronize<TSource, TGate>._> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will do what you intend. I think you are assuming that then this type gets instantiated with TGate set to Lock, the lock keyword usage inside the sink's OnNext, OnError, and OnCompleted will use the new behaviour for the Lock type, and when it is instantiated with object, it will use the old Monitor-based behaviour.
As far as I can tell this is not what happens. If you write a generic type like this and then use the lock keyword on some expression of type TGate, the compiler will emit the old-style lock code. E.g., in ILDASM I see this:
IL_0000: nop
IL_0001: ldarg.0
IL_0002: ldfld !1 class Synchronize`2<!TSource,!TGate>::_gate
IL_0007: box !TGate
IL_000c: stloc.0
IL_000d: ldc.i4.0
IL_000e: stloc.1
.try
{
IL_000f: ldloc.0
IL_0010: ldloca.s V_1
IL_0012: call void [System.Threading]System.Threading.Monitor::Enter(object,
bool&)
So no matter what type TGate gets instantiated as, we're going to get the old-style Monitor-based behaviour for lock.
As far as I know, the C# compiler will only generate the new System.Threading.Lock based code for a lock in cases where it knows at compile time that the expression is definitely of type Lock, which won't necessarily be the case here.
(Remember that the compiler is going to emit exactly one implementation of OnNext, OnError, and OnCompleted in the sink class. The instantiation for specific type arguments happens at runtime, but the IL generation has already happened by then. And the behaviour of lock—do you get the Monitor or the Lock version—is determined at compile time. It generates different IL for these two cases.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, really good catch. I was simply hoping to avoid duping the whole Producer implementation. I'll rewrite it.
| /// <param name="gate">Gate object to synchronize each observer call on.</param> | ||
| /// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns> | ||
| /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is <c>null</c>.</exception> | ||
| public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, Lock gate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the only additional public method required. Unless I've missed it, you've not added an equivalent extra overload in the Observable.Concurrency.cs type. (And you might also need to add an equivalent method to IQueryLanguage.)
Oh you'll also need to rerun the homoiconicity project to generate the IQbservable<T> forms of this, which I don't think you've done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot about the whole IQueryLanguage layer, yeah. I've debugged through it before, so I'm familiar with how it works, but I've never tried to implement anything.
I have basically no knowledge of anything related to IQbservable<T>, but I'll look into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The basic idea behind IQbservable<T> is that it supports exactly the same API as IObservable<T>, but the output is effectively a description of whatever query you've written. So if you do this:
IObservable<int> src = GetSomeObservable();
IObservable<int> xs = src
.Where(x => x > 0)
.Select(x => x * 2);then xs is a thing you can actually subscribe to that removes negative numbers, and then doubles everything else. But if we write the very similar:
IQbservable<int> xs = src
.AsQbservable()
.Where(x => x > 0)
.Select(x => x * 2);
Console.WriteLine(xs.Expression);then we've basically got the same query but this time as an IQbservable<int>, and that means that this is a description of the observable source. Instead of just being a thing we can subscribe to, we can inspect this. That Console.WriteLine(xs.Expression); displays this:
System.Reactive.Linq.ObservableImpl.RangeRecursive.Where(x => (x > 0)).Select(x => (x * 2))
(The RangeRecursive there comes from the fact that in the example I'm testing this in, my GetSomeObservable() is returning Observable.Range(0, 10);. So that's really just the type of src here.)
So you can see that this thing knows that this is a Where clause followed by a Select clause. And if you were to inspect xs in the debugger, you'll see a DebugView property that looks like this:
.Call System.Reactive.Linq.Qbservable.Select(
.Call System.Reactive.Linq.Qbservable.Where(
.Constant<System.Reactive.ObservableQuery`1[System.Int32]>(System.Reactive.Linq.ObservableImpl.RangeRecursive),
'(.Lambda #Lambda1<System.Func`2[System.Int32,System.Boolean]>)),
'(.Lambda #Lambda2<System.Func`2[System.Int32,System.Int32]>))
.Lambda #Lambda1<System.Func`2[System.Int32,System.Boolean]>(System.Int32 $x) {
$x > 0
}
.Lambda #Lambda2<System.Func`2[System.Int32,System.Int32]>(System.Int32 $x) {
$x * 2
}
So the basic idea here is that an IQbservable<T> is a complete description of the subscription, one that can be inspected at runtime.
IQbservable<T> is to IObservable<T> as IQueryable<T> is to IEnumerable<T>. Systems like Entity Framework exploit the fact that an IQueryable<T> remembers exactly how the query was constructed—just like with the IQbservable<T> I've shown here, an IQueryable<T> would remember that it was built up as (say) a Where and a Select, and EF would then use that information to work out what SQL query to generate to be able to execute the logic that the query represents on a database.
The idea behind IQueryable<T> is that it could be used to support similar mechanisms. You could imagine a remote monitoring device presenting an IQueryable<T> API, and if that remote device were able to support local filtering, you could imagine this implementation of IQueryable<T> detecting when your query starts with a Where clause, and translating that into whatever format the remote device uses to express the filtering (in exactly the same way that Entity Framework translates a LINQ Where into a SQL WHERE clause.)
Admittedly, I'm not aware of any public libraries that actually do that. But it's a model we support, and it's also the basis of distributed query execution in Reaqtor (https://reaqtive.net/). So there are automated tests that check that the public API we define for IObservable<T> is fully matched by the one available for IQueryable<T>. And we have a tool in the repo that auto-generates the necessary code (the Homoiconicity tool). Although the tool is a bit cranky, and currently I have to manually fix the code it generates...you end up running it and then discarding most of what it changed, and keeping just the new bits. At some point I need to fix the tool so that we don't have to do that every time we add a new API feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IQbservable<T>is toIObservable<T>asIQueryable<T>is toIEnumerable<T>.
I suspected it might be something like that. And you're saying there's no actual implementations here, just the modeling. I can work with that.
| ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(DummyObservable<int>.Instance, null)); | ||
| ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(DummyObservable<int>.Instance, null as object)); | ||
| #if HAS_LOCK_CLASS | ||
| ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(DummyObservable<int>.Instance, null as Lock)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although this is fine as far as it goes, you've not added any behavioural tests.
You need to verify that it really does acquire the Lock in the proper way (which, as per my earlier comment, I don't believe it actually does right now).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've got no issue with writing new functional tests, I just figured I'd start by following the pattern of what the existing operator has, which is nothing. Like, perhaps testing for thread lock mechanics was deemed not worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following the pattern of what the existing operator has, which is nothing
It's not quite as bad as that, it's just that the tests are not where you thought they were (probably because you hadn't realised that Rx strangely offers two completely different APIs for using this: the Synchronization class, whose meagre tests you have found, and also the Synchronize LINQ operator, whose existence you missed.)
The main tests for this code all go in through the LINQ operator, and can be found in Rx.NET\Source\tests\Tests.System.Reactive\Tests\Linq\Observable\SynchronizeTest.cs
That said, now that I look at these tests they aren't exactly great.
This is the first time I've looked at this part of the code since taking over. If I were implementing this new feature, I'd want to expand the tests that verify the behaviour for the existing code too. I'd consider that to be an important part of ensuring that the behaviour is consistent across these two variations. The most important things to test would be:
- if something else is in possession of the
Lock(or owns the monitor if we're testing existing code), and a source produces an item, that item is not delivered to the subscriber until after that lock is released - if a subscriber's
OnNext,OnComplete, orOnErrorhas been called and has not yet returned, it is not possible for some other thread to acquire the lock (or monitor) until after the subscriber returns
There's a third scenario in which a no-arguments Synchronize() supposedly protects you from a rogue source: if some observable source breaks the contract by calling some method on an IObserver before the preceding call has completed, this form of Synchronize is supposed to block that until the lock becomes available. I don't know how well this works in practice—such a source is illegal according to the rules of Rx, so it's not clear what we would guarantee in practice. The cases I'd be suspicious of is if an OnNext is in progress, and then the source invokes both OnNext and OnComplete (or OnNext and OnError), does Synchronize make any effort to ensure the calls are delivered in the order in which they started? I suspect you probably just end up with all the threads waiting on the lock, and whichever one the CLR happens to allow through first gets to go next.
However...I've just discovered we actually rely on this rogue source protection internally! Looking in QueryLanguageEx I see that we define an internal Combine operator that gets used by the public ForkJoin, and this does appear to rely on the ability of Synchronize to convert potentially concurrent notifications into one-at-a-time ones. So I guess we probably should be testing that too. However, since you don't need to implement any Lock-flavoured version of that, this third thing is arguably well outside the scope of this particular change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and also the Synchronize LINQ operator, whose existence you missed
I wouldn't say I missed its existence, but I definitely forgot to walk through the IQueryLanguage abstraction layer. when I was searching for affected references.
If I were implementing this new feature, I'd want to expand the tests that verify the behaviour for the existing code too. I'd consider that to be an important part of ensuring that the behaviour is consistent across these two variations.
I like you. I know too many people who would call this "scope creep".
if something else is in possession of the Lock (or owns the monitor if we're testing existing code), and a source produces an item, that item is not delivered to the subscriber until after that lock is released
if a subscriber's OnNext, OnComplete, or OnError has been called and has not yet returned, it is not possible for some other thread to acquire the lock (or monitor) until after the subscriber returns
Fully agreed, with direct access to the gate object, that sounds like the proper way to approach this.
There's a third scenario in which a no-arguments Synchronize() supposedly protects you from a rogue source.
Yup, I've encountered this in DynamicData, and we actually have testing fixtures to detect rogue sources.
The cases I'd be suspicious of is if an OnNext is in progress, and then the source invokes both OnNext and OnComplete (or OnNext and OnError), does Synchronize make any effort to ensure the calls are delivered in the order in which they started?
The existing implementation sure doesn't look like it would do this. It just hands off to lock/Monitor.Enter() which doesn't technically make any guarantee of order.
I suspect you probably just end up with all the threads waiting on the lock, and whichever one the CLR happens to allow through first gets to go next.
I think Synchronize() is presented rather clearly as an RX-logical-equivalent of a lock, so I would call this expected behavior. Consumers that REALLY need to guarantee serialization-in-time have options to do so at higher levels. I'm thinking something like a Subject<> or an IScheduler backed by a Channel<>.
|
Perhaps it's not Linux per-se, but the filesystem thst's different between your environments and mine. Ultimately, what's happening on my end is that the build is failing because the IIRC off the top of my head, I'm just using ext4 on my system. Would the filesystem in your Linux environments be different, by chance? Or is there maybe actually a config setting for case sensitivity, that I'm not aware of? |
Ah—I think I understand now. Although we do have some Linux steps in our CI build, most of it runs on Windows hosted build agents in Azure DevOps. So although we do run some parts on Linux, it's only the integration test suite that we run there. And that takes the already-built So in fact we never actually perform a full build on Linux. The packages are always built on Windows, and the parts of the build that run on Linux only execute certain test suites. And that stage has a different set of solution files (for reasons I have no insight into—it was all set up like that before we took this over, and I've never found any explanation for it) so I think the problem you're seeing doesn't afflict that. So you are essentially the first person to attempt to build the main So I now understand why you've needed to make these changes when I've never had to: you're using Linux in stages of the build process where we've never attempted to use it before. |
Enhancement
Implements #2204.
Added an additional overload for
.Synchronize()that accepts aSystem.Threading.Lock, to allow for compiler optimizations based around the new class.Also included a commit that fixes a handful of issues preventing successful build on Linux. If this isn't really desired, the main commit with the new operator should cherry-pickable.
For the moment,
System.Reactivedoesn't implement any .NET 9 targets, so merging this now wouldn't really accomplish much, but I manually added net9.0 as a target during testing, to verify that it builds.I included tests to match the existing tests for
.Synchronize(), which consists of just verifying argument null checks. Unless I missed some other tests, somewhere.