Skip to content

Commit 0b8ee21

Browse files
committed
pipe: add server backlog for concurrent Accept()
Teach `pipe.go:ListenPipe()` to create multiple instances of the server pipe in the kernel so that client connections are less likely to receive a `windows.ERROR_PIPE_BUSY` error. This is conceptually similar to the `backlog` argument of the Unix `listen(2)` function. The current `listenerRoutine()` function works sequentially and in response to calls to `Accept()`, such that there will only be at most one unbound server pipe present at any time. Even if the server application calls `Accept()` concurrrently from a pool of application threads, `listenerRoutine()` will process them sequentially. In this model and because there is only one `listenerRoutine()` instance, there is an interval of time where there are no available unbound/free server pipes. When `ConnectNamedPipe()` returns `listenerRoutine()` sends the new pipe handle over a channel to the caller of `Accept()`. Application code then has an opportunity to dispatch/process it and then call `Accept()` again. This causes `listenerRoutine()` to create a new unbound serer pipe and wait for the next connection. Anytime during this interval, a client will get a pipe busy error. Code in `DialPipe()` hides this from GOLANG callers because it includes a busy retry loop. However, clients written in other languages without this assistance are likely to see it and deal with it. This change introduces an "accept queue" using a buffered channel and splits `listenerRoutine()` into a pool of listener worker threads. Each worker creates a new unbound pipe and waits for a client connection. The NPFS and kernel handle connectioni delivery to a random listener worker. The resulting connected pipe is delivered back to the caller `Accept()` as before. A `PipeConfig.QueueSize` variable controls the number of listener worker threads and the maximum number of unbound/free pipes server pipes that will be present at any given time. Note that a listener worker will normally have an unbound/free pipe except during that same delivery interval. Having multiple active workers gives us extra capacity to handle rapidly arriving connections. The application is encouraged to call `Accept()` from a pool of application workers. The size of the application pool should be the same or larger than the queue size to take full advantage of the listener queue. To preserve backwards compatibility, a queue size of 0 or 1 will behave as before. Also for backwards compatibility, listener workers are required to wait for an `Accept()` call so that the worker has a return channel to send the connected pipe and error code. This implies that the number of unbound pipes will be the smaller of the queue size and the application pool size. Finally, a Mutex was added to `l.Close()` to ensure that concurrent threads do not simultaneously try to shutdown the pipe. Signed-off-by: Jeff Hostetler <jeffhostetler@github.com>
1 parent 4f41be6 commit 0b8ee21

File tree

2 files changed

+370
-41
lines changed

2 files changed

+370
-41
lines changed

pipe.go

Lines changed: 151 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net"
1212
"os"
1313
"runtime"
14+
"sync"
1415
"syscall"
1516
"time"
1617
"unsafe"
@@ -258,9 +259,30 @@ type win32PipeListener struct {
258259
firstHandle syscall.Handle
259260
path string
260261
config PipeConfig
261-
acceptCh chan (chan acceptResponse)
262-
closeCh chan int
263-
doneCh chan int
262+
263+
// `acceptQueueCh` is a buffered channel (of channels). Calls to
264+
// Accept() will append to this queue to schedule a listener-worker
265+
// to create a new named pipe instance in the named pipe file system
266+
// (NPFS) and then listen for a connection from a client.
267+
//
268+
// The resulting connected pipe (or error) will be signalled (back
269+
// to `Accept()`) on the channel value's channel.
270+
acceptQueueCh chan (chan acceptResponse)
271+
272+
// `shutdownStartedCh` will be closed to indicate that all listener
273+
// workers should shutdown. `l.Close()` will signal this to begin
274+
// a shutdown.
275+
shutdownStartedCh chan struct{}
276+
277+
// `shutdownFinishedCh` will be closed to indicate that `l.listenerRoutine()`
278+
// has stopped all of the listener worker threads and has finished the
279+
// shutdown. `l.Close()` must wait for this signal before returning.
280+
shutdownFinishedCh chan struct{}
281+
282+
// `closeMux` is used to create a critical section in `l.Close()` and
283+
// coordinate the shutdown and prevent problems if a second thread calls
284+
// `l.Close()` while a shutdown is in progress.
285+
closeMux sync.Mutex
264286
}
265287

266288
func makeServerPipeHandle(path string, sd []byte, c *PipeConfig, first bool) (syscall.Handle, error) {
@@ -383,7 +405,7 @@ func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) {
383405
p.Close()
384406
p = nil
385407
}
386-
case <-l.closeCh:
408+
case <-l.shutdownStartedCh:
387409
// Abort the connect request by closing the handle.
388410
p.Close()
389411
p = nil
@@ -395,33 +417,44 @@ func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) {
395417
return p, err
396418
}
397419

398-
func (l *win32PipeListener) listenerRoutine() {
399-
closed := false
400-
for !closed {
420+
func (l *win32PipeListener) listenerWorker(wg *sync.WaitGroup) {
421+
var stop bool
422+
for !stop {
401423
select {
402-
case <-l.closeCh:
403-
closed = true
404-
case responseCh := <-l.acceptCh:
405-
var (
406-
p *win32File
407-
err error
408-
)
409-
for {
410-
p, err = l.makeConnectedServerPipe()
411-
// If the connection was immediately closed by the client, try
412-
// again.
413-
if err != windows.ERROR_NO_DATA { //nolint:errorlint // err is Errno
414-
break
415-
}
416-
}
424+
case <-l.shutdownStartedCh:
425+
stop = true
426+
case responseCh := <-l.acceptQueueCh:
427+
p, err := l.makeConnectedServerPipe()
417428
responseCh <- acceptResponse{p, err}
418-
closed = err == ErrPipeListenerClosed //nolint:errorlint // err is Errno
419429
}
420430
}
431+
432+
wg.Done()
433+
}
434+
435+
func (l *win32PipeListener) listenerRoutine(queueSize int) {
436+
var wg sync.WaitGroup
437+
438+
for k := 0; k < queueSize; k++ {
439+
wg.Add(1)
440+
go l.listenerWorker(&wg)
441+
}
442+
443+
wg.Wait() // for all listenerWorkers to finish.
444+
445+
// We can assert here that `l.shutdownStartedCh` has been
446+
// signalled (since `l.Close()` closed it).
447+
//
448+
// We might consider draining the `l.acceptQueueCh` and
449+
// closing each of the channel instances, but that is not
450+
// necessary since the second "select" in `l.Accept()` is
451+
// waiting on the `requestCh` and `l.shutdownFinishedCh`.
452+
// And we're going to signal the latter in a moment.
453+
421454
syscall.Close(l.firstHandle)
422455
l.firstHandle = 0
423456
// Notify Close() and Accept() callers that the handle has been closed.
424-
close(l.doneCh)
457+
close(l.shutdownFinishedCh)
425458
}
426459

427460
// PipeConfig contain configuration for the pipe listener.
@@ -442,6 +475,19 @@ type PipeConfig struct {
442475

443476
// OutputBufferSize specifies the size of the output buffer, in bytes.
444477
OutputBufferSize int32
478+
479+
// QueueSize specifies the maximum number of concurrently active pipe server
480+
// handles to allow. This is conceptually similar to the `backlog` argument
481+
// to `listen(2)` on Unix systems. Increasing this value reduces the likelyhood
482+
// of a connecting client receiving a `windows.ERROR_PIPE_BUSY` error.
483+
// (Assuming that the server is written to call `l.Accept()` using a pool of
484+
// application worker threads.)
485+
//
486+
// This value should be larger than your expected client arrival rate so that
487+
// there are always a few extra listener worker threads and (more importantly)
488+
// unbound server pipes in the kernel, so that a client "CreateFile()" should
489+
// not get a busy signal.
490+
QueueSize int32
445491
}
446492

447493
// ListenPipe creates a listener on a Windows named pipe path, e.g. \\.\pipe\mypipe.
@@ -460,19 +506,30 @@ func ListenPipe(path string, c *PipeConfig) (net.Listener, error) {
460506
return nil, err
461507
}
462508
}
509+
510+
queueSize := int(c.QueueSize)
511+
if queueSize < 1 {
512+
// Legacy calls will pass 0 since they won't know to set the queue size.
513+
// Default to legacy behavior where we never have more than 1 available
514+
// unbound pipe and that is only present when an application thread is
515+
// blocked in `l.Accept()`.
516+
queueSize = 1
517+
}
518+
463519
h, err := makeServerPipeHandle(path, sd, c, true)
464520
if err != nil {
465521
return nil, err
466522
}
467523
l := &win32PipeListener{
468-
firstHandle: h,
469-
path: path,
470-
config: *c,
471-
acceptCh: make(chan (chan acceptResponse)),
472-
closeCh: make(chan int),
473-
doneCh: make(chan int),
474-
}
475-
go l.listenerRoutine()
524+
firstHandle: h,
525+
path: path,
526+
config: *c,
527+
acceptQueueCh: make(chan chan acceptResponse, queueSize),
528+
shutdownStartedCh: make(chan struct{}),
529+
shutdownFinishedCh: make(chan struct{}),
530+
closeMux: sync.Mutex{},
531+
}
532+
go l.listenerRoutine(queueSize)
476533
return l, nil
477534
}
478535

@@ -492,31 +549,84 @@ func connectPipe(p *win32File) error {
492549
}
493550

494551
func (l *win32PipeListener) Accept() (net.Conn, error) {
552+
tryAgain:
495553
ch := make(chan acceptResponse)
554+
496555
select {
497-
case l.acceptCh <- ch:
498-
response := <-ch
499-
err := response.err
500-
if err != nil {
501-
return nil, err
556+
case l.acceptQueueCh <- ch:
557+
// We have queued a request for a worker thread to listen
558+
// for a connection.
559+
case <-l.shutdownFinishedCh:
560+
// The shutdown completed before we could request a connection.
561+
return nil, ErrPipeListenerClosed
562+
case <-l.shutdownStartedCh:
563+
// The shutdown is already in progress. Don't bother trying to
564+
// schedule a new request.
565+
return nil, ErrPipeListenerClosed
566+
}
567+
568+
// We queued a request. Now wait for a connection signal or a
569+
// shutdown while we were waiting.
570+
571+
select {
572+
case response := <-ch:
573+
if response.f == nil && response.err == nil {
574+
// The listener worker could close our channel instance
575+
// to indicate that the listener is shut down.
576+
return nil, ErrPipeListenerClosed
577+
}
578+
if errors.Is(response.err, ErrPipeListenerClosed) {
579+
return nil, ErrPipeListenerClosed
580+
}
581+
if response.err == windows.ERROR_NO_DATA { //nolint:errorlint // err is Errno
582+
// If the connection was immediately closed by the client,
583+
// try again (without reporting an error or a dead connection
584+
// to the `Accept()` caller). This avoids spurious
585+
// "The pipe is being closed." messages.
586+
goto tryAgain
587+
}
588+
if response.err != nil {
589+
return nil, response.err
502590
}
503591
if l.config.MessageMode {
504592
return &win32MessageBytePipe{
505593
win32Pipe: win32Pipe{win32File: response.f, path: l.path},
506594
}, nil
507595
}
508596
return &win32Pipe{win32File: response.f, path: l.path}, nil
509-
case <-l.doneCh:
597+
case <-l.shutdownFinishedCh:
598+
// The shutdown started and completed while we were waiting for a
599+
// connection.
510600
return nil, ErrPipeListenerClosed
601+
602+
// case <-l.shutdownStartedCh:
603+
// We DO NOT watch for `l.shutdownStartedCh` because we need
604+
// to keep listening on our local `ch` so that the associated
605+
// listener worker can signal it without blocking when throwing
606+
// an ErrPipeListenerClosed error.
511607
}
512608
}
513609

514610
func (l *win32PipeListener) Close() error {
611+
l.closeMux.Lock()
515612
select {
516-
case l.closeCh <- 1:
517-
<-l.doneCh
518-
case <-l.doneCh:
613+
case <-l.shutdownFinishedCh:
614+
// The shutdown has already completed. Nothing to do.
615+
default:
616+
select {
617+
case <-l.shutdownStartedCh:
618+
// The shutdown is in progress. We should not get here because
619+
// of the Mutex, but either way, we don't want to race here
620+
// and accidentally close `l.shutdownStartedCh` twice.
621+
default:
622+
// Cause all listener workers to abort.
623+
close(l.shutdownStartedCh)
624+
// Wait for listenerRoutine to stop the workers and clean up.
625+
<-l.shutdownFinishedCh
626+
}
519627
}
628+
l.closeMux.Unlock()
629+
520630
return nil
521631
}
522632

0 commit comments

Comments
 (0)