Skip to content

Pipelines#174

Draft
Aidan63 wants to merge 11 commits intomasterfrom
pipelines
Draft

Pipelines#174
Aidan63 wants to merge 11 commits intomasterfrom
pipelines

Conversation

@Aidan63
Copy link
Contributor

@Aidan63 Aidan63 commented Mar 15, 2026

Here's a first pass at most of pipelines (still some things to implement). It pretty closely follows the C# api so I'll explain the usage and where I think I'm going to diverge.

Writing

To write data into a pipe you call getBuffer which returns a haxe.io.ArrayBufferView of unspecified size which you can write into. There is an optional argument for the minimum size of the returned buffer. After you've written data into the buffer you call advance specifying the number of bytes you wrote. Neither of these functions are coroutines.

final buffer = writer.getBuffer();
final count = writeData(buffer);

writer.advance(count);

You cannot reuse the buffer returned by getBuffer after you call advance, if you want to write more data you must get another buffer. E.g. if you had a number of packets in some protocol you cannot do this.

final buffer = writer.getBuffer();

for (packet in packets) {
    writer.advance(writePacket(buffer, packet));
}

You should instead do the following.

for (packet in packets) {
    final buffer = writer.getBuffer();

    writer.advance(writePacket(buffer, packet));
}

After a call to advance the data is not yet visible to the reader, you must call flush, which is a coroutine, to make that data visible. This function will suspend if the back pressure writer threshold is reached and resume when the reader threshold is reached (not fully implemented).

for (packet in packets) {
    final buffer = writer.getBuffer();

    writer.advance(writePacket(buffer, packet));
}

writer.flush();

Reading

On the reader size you call read (returns an unspecified amount of data) or readAtLeast (not yet implemented) which both give you a haxe.io.ArrayBufferView of data. Like the writer you then call advance to specify both how much data you have consumed (can be dropped) and observed (kept, but represents incomplete data). Both of these values are in bytes, for consumed it's from the beginning of the buffer and for observed it's bytes starting from the end of the consumed region.

There are some important differences with how read responds depending on what you provide consumed and observed with which I will document below.

  • If you do not consume the entire buffer and observe nothing, then read will immediately return with the unconsumed range.
  • if you consume everything read will suspend until more data is received.
  • if you consume some or no data and observe the rest of the buffer, read will suspend until more data is received.
  • if you consume some or no data and observe some of the buffer, read will immediately returned with the observed and unspecified range.

Potential Changes

Reader WaitForRead

In the C# version Read returns a struct which contains the buffer as well as a field saying if this read was cancelled or the writer has been closed. Performance is such a concern for them that they have non-exception paths for all this. Assuming we're not that focused on it I wonder if re-using the waitForRead and tryRead style from the channel reader would make sense. E.g. the reader functions become.

@:coroutine function waitForRead():Bool;

function tryRead(out:Out<ArrayBufferView>):Bool;

function tryReadAtLeast(bytes:Int, out:Out<ArrayBufferView>):Bool;

A typical reader loop with that might look something like this.

final readResult  = new Out();
final parseResult = new Out();

while (reader.waitForRead()) {
    while (reader.tryReadAtLeast(Packet.HEADER_SIZE, readResult)) {
        if (Packet.tryParse(readResult.get(), parseResult)) {
            final packet = parseResult.get().packet;
            final bytes   = parseResult.get().consumed;

            // Do something with the parsed packet.

            reader.advance(bytes, 0);
        } else {
            reader.advance(0, readResult.byteLength);
        }
    }
}

Memory Pools

Also related to C# performance features, when creating a pipe you can specify a custom MemoryPool<T> the writer will use to manage it's internal buffers. I have not checked the actual implementation, but I assume all buffer allocations for expanding, compacting, etc, will go through this object as opposed to direct allocations.
Maybe we do want something like this?

Offsets vs Absolute Positions

For readers advance consumed and observed are byte offsets, but in C# they are positions. This doesn't make a difference for consumed but it does for observed. I bring this up because of the following point.

Complimantary Buffer Reading Library

Along with the System.IO.Pipelines package a complimantary System.Buffers was released, designed around easily reading and writing to pipes. Something similar is probably out of scope for hxcoro but I want to make sure it would be easy enough to do something similar.

Take a look at the following read world pipeline reading code for C#

private async Task ReadPackets(IObserver<Packet> observer, CancellationToken ct)
{
    try
    {
        while (ct.IsCancellationRequested is false)
        {
            var result = await pipe.Input.ReadAtLeastAsync(Constants.NSE_PACKET_MINIMUM_SIZE, ct);
            var buffer = result.Buffer;

            while (ConsumeBuffer(ref buffer, out var packet))
            {
                observer.OnNext(packet);
            }

            pipe.Input.AdvanceTo(buffer.Start, buffer.End);

            if (result.IsCompleted)
            {
                break;
            }
        }

        observer.OnCompleted();
    }
    catch (OperationCanceledException)
    {
        observer.OnCompleted();
    }
    catch (Exception exn)
    {
        observer.OnError(exn);
    }
}

The interesting thing here is ConsumeBuffer which is continually called until it returns false and is passed a reference to the buffer (a stack only type, ReadOnlySequence<byte>). Also that AdvanceTo uses the start and end properties of that buffer.

If we take a look at the ConsumeBuffer function the SequenceReader type provides the magic which makes it all work.

private static bool ConsumeBuffer(ref ReadOnlySequence<byte> sequence, out Packet packet)
{
    var reader = new SequenceReader<byte>(sequence);

    while (reader.TryAdvanceTo(Constants.NSE_HEADER))
    {
        if (reader.TryRead(out var register) is false)
        {
            break;
        }

        if (reader.TryReadExact(Constants.NSE_PACKET_PAYLOAD_SIZE, out var payload) is false)
        {
            break;
        }

        if (reader.TryRead(out var trailer) is false)
        {
            break;
        }

        if (trailer == Constants.NSE_TRAILER)
        {
            sequence = reader.UnreadSequence;
            packet   = new Packet
            {
                Buffer   = payload.ToArray(),
                Register = (Register)register
            };

            return true;
        }
    }

    packet = null;

    return false;
}

If the TryRead calls succeed the sequence reader keeps track of how much of the underlying sequence it has read and how much has been unread. The core part here is where we then re-assign the sequence reference to reader.UnreadSequence once we can't read any more packets out of the reader. This then causes that AdvanceTo call to consume the range we have read packets out of and mark the remaining unread section as observed.

Non Exception Paths

As briefly mentioned, C# allows you to cancel pipeline operations (reads, flushes) with out exceptions. I have no experience with these parts of the API so can't really speak to them, I've only used pipelines as a much easier alternative to the traditional socket reading and writing with very low data rate serial ports.

Implementation

I've implemented the pipe on top of a channel where "pages" of data are passed through. This is probably not the most optimal and I'm sure there all sorts of fancy buffer sharing that could be done.

Sorry for the wall of text!

@Aidan63 Aidan63 marked this pull request as draft March 15, 2026 15:36
@Simn
Copy link
Member

Simn commented Mar 15, 2026

Yay pipelines! Just as with channels, I'll probably be asking a lot of dumb questions until I get it. So let's get started:

Writer

This part looks immediately strange to me:

final buffer = writer.getBuffer();
final count = writeData(buffer);

writer.advance(count);

Why do I have to tell the writer how much I've written into it? Intuitively, this seems like something it should know itself, so it's not clear to me why I have to keep track of this count value and then pass it back to the writer. I understand the need for some kind of "we're done" call, but the counting part confuses me.

Reader

if you consume everything read will suspend until more data is received.

Hmm, what if I consume everything and that happens to be enough, so I don't actually need more data? The semantics aren't quite clear to me here.

Performance and non-exception paths

Performance is important of course, but before we make things more complex we should measure if it could actually have an effect.

Memory pools

This always makes me feel like we're doing the job the GC is supposed to be doing, so I'm generally not in favor of it.

@Aidan63
Copy link
Contributor Author

Aidan63 commented Mar 15, 2026

Needing to specify how many bytes in advance allows you to request a buffer but not entirely fill it.
E.g. if you have a packet structure which has 5 bytes for the header then a variable amount of bytes for the payload and you know that only one of those bytes in the header specifies the payload size, then the max payload size is 256 bytes. That plus the 5 byte header gives a maximum possible packet size of 261, so you could could always request a buffer of 261 bytes safe in the knowledge you can write a packet into it.
Saves you needing to precalculate the exact encoded size of the packet before you encode it.

If with advance you consume everything and that contained all the data you need then don't immediately call read again until you want to parse more data from the pipe? I tend to have one task running who's entire job is to read from the pipe, parse packets out of the data, and push those packets into a channel, observable, or other data structure of choice.

With those two paragraphs I'm starting to understand maybe why C# uses Advance and AdvanceTo, it's hard to keep track of what side of the pipe is being referenced when they're both advance...

@Simn
Copy link
Member

Simn commented Mar 15, 2026

I have asked Claude locally to write me a Haxe lexer using the pipelines API, and he was quite happy about that, calling the approach elegant and a good fit. His only complaints (other than readAtLeast not being implemented yet) were this:

No writeBytes(b:Bytes) convenience — the getBuffer → blit → advance → flush ceremony is verbose. A writeBytes(b:Bytes):Coro on PipeWriter would reduce that to one line everywhere (tests, file readers).

This seems agreeable (and I love that he calls it a "ceremony"). I guess that would be a good static extension convenience function.

EOF via exception is awkward — read() throws ChannelClosedException on EOF, which is a normal control-flow event. An Option-returning tryRead():Option (non-coroutine) or returning Null would be cleaner. Currently every caller needs a try/catch to distinguish "no data yet" from "stream ended", and the current workaround leaks a new hxcoro.ds.Out() dead variable in tryRefill.

This goes into the exception-free approach I suppose, and yes looking at the lexer it indeed is awkward having to try-catch a simple read-operation. I'm not sure what he means regarding that dead variable though because the implementation is just this right now:

	@:coroutine static function tryRefill(src:PipeReader):Null<ArrayBufferView> {
		try {
			return src.read();
		} catch (_:hxcoro.ds.channels.exceptions.ChannelClosedException) {
			return null;
		}
	}

@Aidan63
Copy link
Contributor Author

Aidan63 commented Mar 15, 2026

Yeah, some sort of write extension function was definitely on my radar (C# has the same), I was ultimately thinking of having a IPipeReader and IPipeWriter interface to mirror channels so having extenions for those would be good.

I'm going to give the waitForRead and try approach a go and see if that seems better. The other approach would be to have read return some sort of ReadResult object like C#, but for us that would probably mean an allocation per read which might not be great.

@Simn
Copy link
Member

Simn commented Mar 15, 2026

This stuff is really nice, I now have a Bytes -> Pipeline -> Channel -> Parser chain that even makes a lot of sense to me conceptually. The only thing missing is an asys file reader in the front to feed the pipeline from. Other than that this is great!

... although I should probably never benchmark this against a real parser or I'll be really sad.

@Aidan63
Copy link
Contributor Author

Aidan63 commented Mar 15, 2026

Aesthetic driven development is now the new trend.

I've just pushed a waitForRead and tryRead approach, there are still some corner cases I want to try out and document but all the existing tests pass. I've also implemented tryReadAtLeast but there are currently no tests for it, so I have no idea if it actually works!

I'm also going to give this a go with the hxcpp asys project, should be another nice use case for it.

@Simn
Copy link
Member

Simn commented Mar 15, 2026

Claude approves of that change:

What we improved:

  1. tryRefill rewritten — old approach: try { src.read() } catch (ChannelClosed) { null } (always suspended into the scheduler). New:

    @:coroutine static function tryRefill(src:PipeReader, out:Out<ArrayBufferView>):Bool {
        if (src.tryRead(out)) return true;       // sync fast path — no suspension
        if (!src.waitForRead()) return false;    // suspend only when buffer empty
        return src.tryRead(out);
    }

    When the file is already buffered in the pipe (the common case for small/medium files), every tryRefill call returns without any coroutine suspension. The try/catch around the now-deleted read() is gone entirely.

  2. pipeOut field added to LexerState — a single Out<ArrayBufferView> instance reused across all refills, so no allocation per refill.

  3. All 5 call sites updated: the main lex() loop, plus lexString, lexRegexp, and lexBlockComment.

@Simn
Copy link
Member

Simn commented Mar 16, 2026

In trying to use this further, "we" have found that there's a bug related to suspendedWriter never being resumed. Claude seems to be expecting that to happen in PipeReader.advance, which makes some sense to me.

I'm also seeing that suspendedReader is never resumed either, although that one is also never set to anything in the first place.

@Aidan63
Copy link
Contributor Author

Aidan63 commented Mar 16, 2026

Suspending due to backpressure (suspendedWriter) isn't implemented yet, the writer can suspend but I need to look at channels again to remember how we handle resuming the other side without missing assignments to that field.
I think suspendedReader can go, iirc I added it before I switched to a channels based implementation.

@Simn
Copy link
Member

Simn commented Mar 16, 2026

... although I should probably never benchmark this against a real parser or I'll be really sad.

That's not really the case it seems! Real parser:

C:\git\haxe>.\_build\default\scripts\parse_std.exe std
Parsing 2581 .hx files in 'std' ...
Done. 2581 files parsed in 0.842 seconds

My local hxcoro-pipeline-thing compiled to C++:

Found 2581 .hx files under c:\git\haxe\std
Total time: 2363 ms
Parsed cleanly: 2581 / 2581 files

3x slower is not bad at all for a totally unoptimized implementation that just started working 5 minutes ago. JS and HL have very similar times, which is a good sign for our cross-target performance. On JVM it comes in at around 1000ms which is actually close to beating the real parser.

Nice!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants