diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index e34fc1827e..e4a63b638c 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -98,6 +98,10 @@ const ( // defaultHTTPTimeout is the default timeout for an http request, so the // request does not block indefinitely. defaultHTTPTimeout = time.Minute + + // sendPostRequestTries is the number of times to retry failed HTTP POST + // requests before giving up. + sendPostRequestTries = 10 ) // jsonRequest holds information about a json request that is used to properly @@ -766,46 +770,53 @@ out: // handleSendPostMessage handles performing the passed HTTP request, reading the // result, unmarshalling it, and delivering the unmarshalled result to the // provided response channel. -func (c *Client) handleSendPostMessage(jReq *jsonRequest) { +func (c *Client) handleSendPostMessage(ctx context.Context, jReq *jsonRequest) { + c.handleSendPostMessageWithRetry(ctx, jReq, sendPostRequestTries) +} + +// handleSendPostMessageWithRetry performs HTTP POST retries and decodes the +// response result. +func handleSendPostMessageWithRetry(ctx context.Context, jReq *jsonRequest, + tries int, httpClient *http.Client, config *ConnConfig, + batch bool) ([]byte, error) { + var ( lastErr error backoff time.Duration httpResponse *http.Response + err error ) - httpURL, err := c.config.httpURL() + httpURL, err := config.httpURL() if err != nil { - jReq.responseChan <- &Response{ - err: fmt.Errorf("failed to parse address %v", err), - } - return + return nil, fmt.Errorf("failed to parse address %v", err) } - tries := 10 +retryloop: for i := 0; i < tries; i++ { var httpReq *http.Request bodyReader := bytes.NewReader(jReq.marshalledJSON) - httpReq, err = http.NewRequest("POST", httpURL, bodyReader) + httpReq, err = http.NewRequestWithContext( + ctx, "POST", httpURL, bodyReader, + ) if err != nil { - jReq.responseChan <- &Response{result: nil, err: err} - return + return nil, err } httpReq.Close = true httpReq.Header.Set("Content-Type", "application/json") - for key, value := range c.config.ExtraHeaders { + for key, value := range config.ExtraHeaders { httpReq.Header.Set(key, value) } // Configure basic access authorization. - user, pass, err := c.config.getAuth() - if err != nil { - jReq.responseChan <- &Response{result: nil, err: err} - return + user, pass, authErr := config.getAuth() + if authErr != nil { + return nil, authErr } httpReq.SetBasicAuth(user, pass) - httpResponse, err = c.httpClient.Do(httpReq) + httpResponse, err = httpClient.Do(httpReq) // Quit the retry loop on success or if we can't retry anymore. if err == nil || i == tries-1 { @@ -830,39 +841,34 @@ func (c *Client) handleSendPostMessage(jReq *jsonRequest) { select { case <-time.After(backoff): - case <-c.shutdown: - return + case <-ctx.Done(): + err = ctx.Err() + break retryloop } } if err != nil { - jReq.responseChan <- &Response{err: err} - return + return nil, err } // We still want to return an error if for any reason the response // remains empty. if httpResponse == nil { - jReq.responseChan <- &Response{ - err: fmt.Errorf("invalid http POST response (nil), "+ - "method: %s, id: %d, last error=%v", - jReq.method, jReq.id, lastErr), - } - return + return nil, fmt.Errorf("invalid http POST response (nil), "+ + "method: %s, id: %d, last error=%v", + jReq.method, jReq.id, lastErr) } // Read the raw bytes and close the response. respBytes, err := io.ReadAll(httpResponse.Body) httpResponse.Body.Close() if err != nil { - err = fmt.Errorf("error reading json reply: %v", err) - jReq.responseChan <- &Response{err: err} - return + return nil, fmt.Errorf("error reading json reply: %w", err) } // Try to unmarshal the response as a regular JSON-RPC response. var resp rawResponse var batchResponse json.RawMessage - if c.batch { + if batch { err = json.Unmarshal(respBytes, &batchResponse) } else { err = json.Unmarshal(respBytes, &resp) @@ -871,50 +877,70 @@ func (c *Client) handleSendPostMessage(jReq *jsonRequest) { // When the response itself isn't a valid JSON-RPC response // return an error which includes the HTTP status code and raw // response bytes. - err = fmt.Errorf("status code: %d, response: %q", + return nil, fmt.Errorf("status code: %d, response: %q", httpResponse.StatusCode, string(respBytes)) - jReq.responseChan <- &Response{err: err} - return } - var res []byte - if c.batch { - // errors must be dealt with downstream since a whole request cannot - // "error out" other than through the status code error handled above - res, err = batchResponse, nil - } else { - res, err = resp.result() + + if batch { + // Errors must be dealt with downstream since a whole request + // cannot "error out" other than through the status code error + // handled above. + return batchResponse, nil + } + + return resp.result() +} + +// handleSendPostMessageWithRetry runs handleSendPostMessage using the provided +// retry count. It exists so tests can exercise retry edge cases quickly. +func (c *Client) handleSendPostMessageWithRetry(ctx context.Context, + jReq *jsonRequest, tries int) { + + res, err := handleSendPostMessageWithRetry( + ctx, jReq, tries, c.httpClient, c.config, c.batch, + ) + + // Preserves the client contract that shutdown-related cancellations are + // surfaced as ErrClientShutdown. + if errors.Is(err, context.Canceled) && + errors.Is(context.Cause(ctx), ErrClientShutdown) { + + err = ErrClientShutdown + } + + jReq.responseChan <- &Response{ + result: res, + err: err, } - jReq.responseChan <- &Response{result: res, err: err} } // sendPostHandler handles all outgoing messages when the client is running // in HTTP POST mode. It uses a buffered channel to serialize output messages // while allowing the sender to continue running asynchronously. It must be run // as a goroutine. -func (c *Client) sendPostHandler() { +func (c *Client) sendPostHandler(ctx context.Context) { out: for { // Send any messages ready for send until the shutdown channel // is closed. select { case jReq := <-c.sendPostChan: - c.handleSendPostMessage(jReq) + c.handleSendPostMessage(ctx, jReq) - case <-c.shutdown: + case <-ctx.Done(): break out } } + err := context.Cause(ctx) + // Drain any wait channels before exiting so nothing is left waiting // around to send. cleanup: for { select { case jReq := <-c.sendPostChan: - jReq.responseChan <- &Response{ - result: nil, - err: ErrClientShutdown, - } + jReq.responseChan <- &Response{result: nil, err: err} default: break cleanup @@ -928,19 +954,32 @@ cleanup: // HTTP client associated with the client. It is backed by a buffered channel, // so it will not block until the send channel is full. func (c *Client) sendPostRequest(jReq *jsonRequest) { - // Don't send the message if shutting down. + // Prefer shutdown when it is already closed so this path is + // deterministic. This mirrors addRequest and avoids post-shutdown + // enqueueing. select { case <-c.shutdown: - jReq.responseChan <- &Response{result: nil, err: ErrClientShutdown} + jReq.responseChan <- &Response{ + result: nil, + err: ErrClientShutdown, + } + + return + default: } + // Normal path: either enqueue, or fail if shutdown closes in the race + // window after the guard above. select { case c.sendPostChan <- jReq: log.Tracef("Sent command [%s] with id %d", jReq.method, jReq.id) case <-c.shutdown: - return + jReq.responseChan <- &Response{ + result: nil, + err: ErrClientShutdown, + } } } @@ -1178,8 +1217,13 @@ func (c *Client) start() { // Start the I/O processing handlers depending on whether the client is // in HTTP POST mode or the default websocket mode. if c.config.HTTPPostMode { + ctx, cancel := context.WithCancelCause(context.Background()) c.wg.Add(1) - go c.sendPostHandler() + go c.sendPostHandler(ctx) + go func() { + <-c.shutdown + cancel(ErrClientShutdown) + }() } else { c.wg.Add(3) go func() { @@ -1564,13 +1608,18 @@ func NewBatch(config *ConnConfig) (*Client, error) { if !config.HTTPPostMode { return nil, errors.New("http post mode is required to use batch client") } - // notification parameter is nil since notifications are not supported in POST mode. + + // The notification parameter is nil since notifications are not + // supported in POST mode. client, err := New(config, nil) if err != nil { return nil, err } - client.batch = true //copy the client with changed batch setting - client.start() + + // New() already started the HTTP handlers, so only toggle batch + // semantics. + client.batch = true + return client, nil } @@ -1716,6 +1765,38 @@ func (c *Client) sendAsync() (FutureGetBulkResult, error) { return responseChan, nil } +// failBatchRequests resolves every queued batch request with the provided error +// and clears all internal request tracking. +// +// This function is safe for concurrent access. +func (c *Client) failBatchRequests(err error) { + c.requestLock.Lock() + defer c.requestLock.Unlock() + + c.batchLock.Lock() + defer c.batchLock.Unlock() + + for e := c.batchList.Front(); e != nil; e = e.Next() { + req := e.Value.(*jsonRequest) + + // Resolve all pending futures on the first batch-level failure + // so callers waiting on Receive don't block indefinitely. + // Safe: batch-mode responseChan buffers are unwritten here, + // so this send won't block while locks are held. Batch-mode + // requests only use addRequest (not sendPostRequest), so each + // responseChan buffer is still empty. + req.responseChan <- &Response{err: err} + } + + c.requestMap = make(map[uint64]*list.Element) + c.batchList = list.New() + + // Batch-mode requests are tracked in batchList, so requestList should + // already be empty. Keep this defensive reset for invariants and future + // call paths. + c.requestList.Init() +} + // Marshall's bulk requests and sends to the server // creates a response channel to receive the response func (c *Client) Send() error { @@ -1726,12 +1807,7 @@ func (c *Client) Send() error { batchResp, err := future.Receive() if err != nil { - // Clear batchlist in case of an error. - - c.batchLock.Lock() - c.batchList = list.New() - c.batchLock.Unlock() - + c.failBatchRequests(err) return err } diff --git a/rpcclient/infrastructure_test.go b/rpcclient/infrastructure_test.go index 8416b7ad3c..d21dd644d6 100644 --- a/rpcclient/infrastructure_test.go +++ b/rpcclient/infrastructure_test.go @@ -1,11 +1,87 @@ package rpcclient import ( + "context" + "errors" + "io" + "net" + "net/http" + "strings" + "sync/atomic" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +// postRoundTripFunc adapts a function to implement http.RoundTripper. +type postRoundTripFunc func(*http.Request) (*http.Response, error) + +// RoundTrip invokes the wrapped test transport function. +func (f postRoundTripFunc) RoundTrip( + req *http.Request) (*http.Response, error) { + + return f(req) +} + +// cancelOnReadBody is a test response body that blocks reads until context +// cancellation is observed. +type cancelOnReadBody struct { + // ctx is the request context that drives cancellation. + ctx context.Context + // readStarted is closed when the first read call starts. + readStarted chan struct{} +} + +// Read blocks until the request context is canceled, then returns that error. +func (b *cancelOnReadBody) Read(_ []byte) (int, error) { + select { + case <-b.readStarted: + default: + close(b.readStarted) + } + + <-b.ctx.Done() + + return 0, b.ctx.Err() +} + +// Close implements io.Closer for the test body. +func (b *cancelOnReadBody) Close() error { + return nil +} + +// newPostModeTestClient builds a minimal HTTP POST-mode client for transport +// behavior tests. +func newPostModeTestClient(rt http.RoundTripper) *Client { + return &Client{ + config: &ConnConfig{ + Host: "127.0.0.1:8332", + User: "user", + Pass: "pass", + DisableTLS: true, + HTTPPostMode: true, + }, + httpClient: &http.Client{ + Transport: rt, + }, + } +} + +// newPostTestRequest creates a minimal JSON-RPC request used by POST handler +// tests. +func newPostTestRequest() *jsonRequest { + body := `{"jsonrpc":"1.0","id":1,"method":"getblockcount","params":[]}` + + return &jsonRequest{ + id: 1, + method: "getblockcount", + marshalledJSON: []byte(body), + responseChan: make(chan *Response, 1), + } +} + // TestParseAddressString checks different variation of supported and // unsupported addresses. func TestParseAddressString(t *testing.T) { @@ -93,18 +169,521 @@ func TestParseAddressString(t *testing.T) { } for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { addr, err := ParseAddressString(tc.addressString) if tc.expErrStr != "" { require.Error(t, err) require.Contains(t, err.Error(), tc.expErrStr) - return + } else { + require.NoError(t, err) + require.Equal(t, tc.expNetwork, addr.Network()) + require.Equal(t, tc.expAddress, addr.String()) } - require.NoError(t, err) - require.Equal(t, tc.expNetwork, addr.Network()) - require.Equal(t, tc.expAddress, addr.String()) }) } } + +// TestHandleSendPostMessageWithRetrySuccess ensures that +// handleSendPostMessageWithRetry returns a decoded result and no error on +// a successful response. +func TestHandleSendPostMessageWithRetrySuccess(t *testing.T) { + client := newPostModeTestClient(postRoundTripFunc( + func(*http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader( + `{"result":1,"error":null,"id":1}`, + )), + }, nil + }, + )) + jReq := newPostTestRequest() + + result, err := handleSendPostMessageWithRetry( + context.Background(), jReq, 1, client.httpClient, client.config, + false, + ) + require.NoError(t, err) + require.Equal(t, []byte("1"), result) +} + +// TestHandleSendPostMessageWithRetryShutdownDuringRetryBackoff ensures +// that handleSendPostMessageWithRetry returns context cancellation from the +// retry-backoff path. +func TestHandleSendPostMessageWithRetryShutdownDuringRetryBackoff( + t *testing.T) { + + var attempts int32 + ctx, cancel := context.WithCancelCause(context.Background()) + + client := newPostModeTestClient(postRoundTripFunc( + func(*http.Request) (*http.Response, error) { + if atomic.AddInt32(&attempts, 1) == 1 { + cancel(ErrClientShutdown) + } + return nil, errors.New("transient transport error") + }, + )) + jReq := newPostTestRequest() + + result, err := handleSendPostMessageWithRetry( + ctx, jReq, 2, client.httpClient, client.config, false, + ) + require.Nil(t, result) + require.ErrorIs(t, err, context.Canceled) + require.EqualValues(t, 1, atomic.LoadInt32(&attempts)) +} + +// TestHandleSendPostMessageWithRetryShutdownOnFinalRetry ensures that +// handleSendPostMessageWithRetry returns context cancellation from the final +// retry attempt path. +func TestHandleSendPostMessageWithRetryShutdownOnFinalRetry(t *testing.T) { + var attempts int32 + ctx, cancel := context.WithCancelCause(context.Background()) + + client := newPostModeTestClient(postRoundTripFunc( + func(*http.Request) (*http.Response, error) { + current := atomic.AddInt32(&attempts, 1) + if current == 1 { + return nil, errors.New("transient transport " + + "error") + } + + cancel(ErrClientShutdown) + return nil, context.Canceled + }, + )) + jReq := newPostTestRequest() + + result, err := handleSendPostMessageWithRetry( + ctx, jReq, 2, client.httpClient, client.config, false, + ) + require.Nil(t, result) + require.ErrorIs(t, err, context.Canceled) + require.EqualValues(t, 2, atomic.LoadInt32(&attempts)) +} + +// TestHandleSendPostMessageWithRetryShutdownOnBodyRead ensures that +// handleSendPostMessageWithRetry returns the body read error when cancellation +// arrives during io.ReadAll. +func TestHandleSendPostMessageWithRetryShutdownOnBodyRead(t *testing.T) { + ctx, cancel := context.WithCancelCause(context.Background()) + readStarted := make(chan struct{}) + + client := newPostModeTestClient(postRoundTripFunc( + func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: &cancelOnReadBody{ + ctx: req.Context(), + readStarted: readStarted, + }, + }, nil + }, + )) + jReq := newPostTestRequest() + + go func() { + <-readStarted + cancel(ErrClientShutdown) + }() + + result, err := handleSendPostMessageWithRetry( + ctx, jReq, 1, client.httpClient, client.config, false, + ) + require.Nil(t, result) + require.ErrorContains(t, err, "error reading json reply") + require.ErrorIs(t, err, context.Canceled) +} + +// TestHTTPPostShutdownInterruptsPendingRequest ensures that a client operating +// in HTTP POST mode can interrupt an in-flight request during shutdown. +func TestHTTPPostShutdownInterruptsPendingRequest(t *testing.T) { + t.Parallel() + + // Start a local TCP listener that accepts exactly one HTTP request and + // then blocks until the client side closes the connection. + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + // requestAccepted signals when the test server has accepted the + // client's connection. + requestAccepted := make(chan struct{}) + + // serverDone signals when the server goroutine has exited. + serverDone := make(chan struct{}) + + // Run a minimum server goroutine. It accepts one connection and drains + // the request stream without replying so the client request stays in + // flight. + go func() { + defer close(serverDone) + + conn, err := listener.Accept() + if err != nil { + return + } + defer func() { + err := conn.Close() + assert.NoError(t, err) + }() + + close(requestAccepted) + + _, _ = io.Copy(io.Discard, conn) + }() + + // Ensure the listener is closed and the server goroutine exits. + t.Cleanup(func() { + err := listener.Close() + require.NoError(t, err) + <-serverDone + }) + + // Configure a POST-mode client against the local listener. + connCfg := &ConnConfig{ + Host: listener.Addr().String(), + User: "user", + Pass: "pass", + DisableTLS: true, + HTTPPostMode: true, + } + + // Start the client and register cleanup for idempotent shutdown. + client, err := New(connCfg, nil) + require.NoError(t, err) + t.Cleanup(client.Shutdown) + + // Launch one async request that should remain pending until shutdown. + future := client.GetBlockCountAsync() + + // Ensure the server sees the request before we initiate shutdown. + select { + case <-requestAccepted: + + case <-time.After(2 * time.Second): + t.Fatalf("server did not accept client connection") + } + + // The request should remain pending until shutdown is requested. + select { + case <-future: + t.Fatalf("expected request to remain pending until shutdown") + + case <-time.After(100 * time.Millisecond): + } + + client.Shutdown() + + waitDone := make(chan struct{}) + go func() { + client.WaitForShutdown() + close(waitDone) + }() + + // Wait for shutdown to complete before asserting the final error. + select { + case <-waitDone: + + case <-time.After(5 * time.Second): + t.Fatalf("client shutdown did not complete") + } + + result, err := future.Receive() + require.Zero(t, result) + require.ErrorContains(t, err, ErrClientShutdown.Error()) +} + +// TestHandleSendPostMessageShutdownDuringRetryBackoff ensures shutdown +// cancellation interrupts retry backoff and remaps to ErrClientShutdown. +func TestHandleSendPostMessageShutdownDuringRetryBackoff(t *testing.T) { + var attempts int32 + attemptStarted := make(chan struct{}, 1) + client := newPostModeTestClient(postRoundTripFunc( + func(*http.Request) (*http.Response, error) { + atomic.AddInt32(&attempts, 1) + attemptStarted <- struct{}{} + return nil, errors.New("transient transport error") + }, + )) + + ctx, cancel := context.WithCancelCause(context.Background()) + jReq := newPostTestRequest() + + go client.handleSendPostMessageWithRetry(ctx, jReq, 2) + + select { + case <-attemptStarted: + case <-time.After(2 * time.Second): + t.Fatal("first attempt did not start") + } + + cancel(ErrClientShutdown) + + select { + case resp := <-jReq.responseChan: + require.ErrorIs(t, resp.err, ErrClientShutdown) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for response") + } +} + +// TestHandleSendPostMessageShutdownOnFinalRetry ensures cancellation on the +// final retry attempt is still remapped to ErrClientShutdown. +func TestHandleSendPostMessageShutdownOnFinalRetry(t *testing.T) { + var attempts int32 + ctx, cancel := context.WithCancelCause(context.Background()) + + client := newPostModeTestClient(postRoundTripFunc( + func(*http.Request) (*http.Response, error) { + current := atomic.AddInt32(&attempts, 1) + if current == 1 { + return nil, errors.New("transient transport " + + "error") + } + + cancel(ErrClientShutdown) + return nil, context.Canceled + }, + )) + jReq := newPostTestRequest() + + go client.handleSendPostMessageWithRetry(ctx, jReq, 2) + + select { + case resp := <-jReq.responseChan: + require.ErrorIs(t, resp.err, ErrClientShutdown) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for response") + } + + require.EqualValues(t, 2, atomic.LoadInt32(&attempts)) +} + +// TestHandleSendPostMessageShutdownDuringBodyRead ensures cancellation while +// reading a response body is remapped to ErrClientShutdown. +func TestHandleSendPostMessageShutdownDuringBodyRead(t *testing.T) { + ctx, cancel := context.WithCancelCause(context.Background()) + readStarted := make(chan struct{}) + + client := newPostModeTestClient(postRoundTripFunc( + func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: &cancelOnReadBody{ + ctx: req.Context(), + readStarted: readStarted, + }, + }, nil + }, + )) + jReq := newPostTestRequest() + + go client.handleSendPostMessageWithRetry(ctx, jReq, 1) + + select { + case <-readStarted: + case <-time.After(2 * time.Second): + t.Fatal("response body read did not start") + } + + cancel(ErrClientShutdown) + + select { + case resp := <-jReq.responseChan: + require.ErrorIs(t, resp.err, ErrClientShutdown) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for response") + } +} + +// TestSendPostRequestShutdownPrioritizesFailure ensures shutdown always wins +// when it is already closed before sendPostRequest is called. +func TestSendPostRequestShutdownPrioritizesFailure(t *testing.T) { + client := &Client{ + sendPostChan: make(chan *jsonRequest, 1), + shutdown: make(chan struct{}), + } + + close(client.shutdown) + + const attempts = 200 + for i := 0; i < attempts; i++ { + jReq := &jsonRequest{ + id: uint64(i), + method: "getblockcount", + responseChan: make(chan *Response, 1), + } + client.sendPostRequest(jReq) + + select { + case resp := <-jReq.responseChan: + require.ErrorIs(t, resp.err, ErrClientShutdown) + default: + t.Fatalf("request id=%d was not failed immediately", + jReq.id) + } + + select { + case <-client.sendPostChan: + t.Fatalf("request id=%d was enqueued after shutdown", + jReq.id) + + case <-time.After(10 * time.Millisecond): + } + } +} + +// TestBatchSendErrorResolvesQueuedFutures ensures a batch send failure resolves +// all queued futures instead of leaving them blocked. +func TestBatchSendErrorResolvesQueuedFutures(t *testing.T) { + connCfg := &ConnConfig{ + Host: "127.0.0.1:8332", + User: "user", + Pass: "pass", + DisableTLS: true, + HTTPPostMode: true, + } + + client, err := NewBatch(connCfg) + require.NoError(t, err) + t.Cleanup(func() { + client.Shutdown() + client.WaitForShutdown() + }) + + client.httpClient.Transport = postRoundTripFunc( + func(*http.Request) (*http.Response, error) { + body := io.NopCloser(strings.NewReader("not-json")) + + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: body, + }, nil + }, + ) + + f1 := client.GetBlockCountAsync() + f2 := client.GetBlockCountAsync() + + sendErr := client.Send() + require.Error(t, sendErr) + + assertFutureErr := func(f FutureGetBlockCountResult) { + t.Helper() + + done := make(chan error, 1) + go func() { + _, err := f.Receive() + done <- err + }() + + select { + case err := <-done: + require.Error(t, err) + require.EqualError(t, err, sendErr.Error()) + + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for queued batch future " + + "to resolve") + } + } + + assertFutureErr(f1) + assertFutureErr(f2) +} + +// TestNewBatchSerializesPostSends ensures a batch client still serializes POST +// sends through a single handler goroutine. +func TestNewBatchSerializesPostSends(t *testing.T) { + connCfg := &ConnConfig{ + Host: "127.0.0.1:8332", + User: "user", + Pass: "pass", + DisableTLS: true, + HTTPPostMode: true, + } + + client, err := NewBatch(connCfg) + require.NoError(t, err) + t.Cleanup(func() { + client.Shutdown() + client.WaitForShutdown() + }) + + var active int32 + var maxActive int32 + release := make(chan struct{}) + + client.httpClient.Transport = postRoundTripFunc( + func(*http.Request) (*http.Response, error) { + current := atomic.AddInt32(&active, 1) + for { + prev := atomic.LoadInt32(&maxActive) + if current <= prev { + break + } + if atomic.CompareAndSwapInt32( + &maxActive, prev, current, + ) { + break + } + } + + // Hold the request open so the test can observe if + // a second POST enters the transport concurrently. + <-release + atomic.AddInt32(&active, -1) + + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader( + `{"result":1,"error":null}`, + )), + }, nil + }, + ) + + makeReq := func(id uint64) *jsonRequest { + return &jsonRequest{ + id: id, + method: "getblockcount", + marshalledJSON: []byte( + `{"jsonrpc":"1.0","id":1,` + + `"method":"getblockcount","params":[]}`, + ), + responseChan: make(chan *Response, 1), + } + } + + req1 := makeReq(1) + req2 := makeReq(2) + client.sendPostChan <- req1 + client.sendPostChan <- req2 + + require.Eventually(t, func() bool { + return atomic.LoadInt32(&active) >= 1 + }, time.Second, 5*time.Millisecond) + + // Allow any extra send handler goroutines to start a second in-flight + // request. + time.Sleep(100 * time.Millisecond) + observedMax := atomic.LoadInt32(&maxActive) + close(release) + + for i, req := range []*jsonRequest{req1, req2} { + select { + case resp := <-req.responseChan: + require.NoError(t, resp.err, "request %d failed", i) + case <-time.After(2 * time.Second): + t.Fatalf("timed out waiting for request %d response", i) + } + } + + require.EqualValues(t, 1, observedMax, "POST sends must be serialized") +}