Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 137 additions & 61 deletions rpcclient/infrastructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ const (
// defaultHTTPTimeout is the default timeout for an http request, so the
// request does not block indefinitely.
defaultHTTPTimeout = time.Minute

// sendPostRequestTries is the number of times to retry failed HTTP POST
// requests before giving up.
sendPostRequestTries = 10
)

// jsonRequest holds information about a json request that is used to properly
Expand Down Expand Up @@ -766,46 +770,53 @@ out:
// handleSendPostMessage handles performing the passed HTTP request, reading the
// result, unmarshalling it, and delivering the unmarshalled result to the
// provided response channel.
func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
func (c *Client) handleSendPostMessage(ctx context.Context, jReq *jsonRequest) {
c.handleSendPostMessageWithRetry(ctx, jReq, sendPostRequestTries)
}

// handleSendPostMessageWithRetry performs HTTP POST retries and decodes the
// response result.
func handleSendPostMessageWithRetry(ctx context.Context, jReq *jsonRequest,
tries int, httpClient *http.Client, config *ConnConfig,
batch bool) ([]byte, error) {

var (
lastErr error
backoff time.Duration
httpResponse *http.Response
err error
)

httpURL, err := c.config.httpURL()
httpURL, err := config.httpURL()
if err != nil {
jReq.responseChan <- &Response{
err: fmt.Errorf("failed to parse address %v", err),
}
return
return nil, fmt.Errorf("failed to parse address %v", err)
}

tries := 10
retryloop:
for i := 0; i < tries; i++ {
var httpReq *http.Request

bodyReader := bytes.NewReader(jReq.marshalledJSON)
httpReq, err = http.NewRequest("POST", httpURL, bodyReader)
httpReq, err = http.NewRequestWithContext(
ctx, "POST", httpURL, bodyReader,
)
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
return nil, err
}
httpReq.Close = true
httpReq.Header.Set("Content-Type", "application/json")
for key, value := range c.config.ExtraHeaders {
for key, value := range config.ExtraHeaders {
httpReq.Header.Set(key, value)
}

// Configure basic access authorization.
user, pass, err := c.config.getAuth()
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
user, pass, authErr := config.getAuth()
if authErr != nil {
return nil, authErr
}
httpReq.SetBasicAuth(user, pass)

httpResponse, err = c.httpClient.Do(httpReq)
httpResponse, err = httpClient.Do(httpReq)

// Quit the retry loop on success or if we can't retry anymore.
if err == nil || i == tries-1 {
Expand All @@ -830,39 +841,34 @@ func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
select {
case <-time.After(backoff):

case <-c.shutdown:
return
case <-ctx.Done():
err = ctx.Err()
break retryloop
}
}
if err != nil {
jReq.responseChan <- &Response{err: err}
return
return nil, err
}

// We still want to return an error if for any reason the response
// remains empty.
if httpResponse == nil {
jReq.responseChan <- &Response{
err: fmt.Errorf("invalid http POST response (nil), "+
"method: %s, id: %d, last error=%v",
jReq.method, jReq.id, lastErr),
}
return
return nil, fmt.Errorf("invalid http POST response (nil), "+
"method: %s, id: %d, last error=%v",
jReq.method, jReq.id, lastErr)
}

// Read the raw bytes and close the response.
respBytes, err := io.ReadAll(httpResponse.Body)
httpResponse.Body.Close()
if err != nil {
err = fmt.Errorf("error reading json reply: %v", err)
jReq.responseChan <- &Response{err: err}
return
return nil, fmt.Errorf("error reading json reply: %w", err)
}

// Try to unmarshal the response as a regular JSON-RPC response.
var resp rawResponse
var batchResponse json.RawMessage
if c.batch {
if batch {
err = json.Unmarshal(respBytes, &batchResponse)
} else {
err = json.Unmarshal(respBytes, &resp)
Expand All @@ -871,50 +877,70 @@ func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
// When the response itself isn't a valid JSON-RPC response
// return an error which includes the HTTP status code and raw
// response bytes.
err = fmt.Errorf("status code: %d, response: %q",
return nil, fmt.Errorf("status code: %d, response: %q",
httpResponse.StatusCode, string(respBytes))
jReq.responseChan <- &Response{err: err}
return
}
var res []byte
if c.batch {
// errors must be dealt with downstream since a whole request cannot
// "error out" other than through the status code error handled above
res, err = batchResponse, nil
} else {
res, err = resp.result()

if batch {
// Errors must be dealt with downstream since a whole request
// cannot "error out" other than through the status code error
// handled above.
return batchResponse, nil
}

return resp.result()
}

// handleSendPostMessageWithRetry runs handleSendPostMessage using the provided
// retry count. It exists so tests can exercise retry edge cases quickly.
func (c *Client) handleSendPostMessageWithRetry(ctx context.Context,
jReq *jsonRequest, tries int) {

res, err := handleSendPostMessageWithRetry(
ctx, jReq, tries, c.httpClient, c.config, c.batch,
)

// Preserves the client contract that shutdown-related cancellations are
// surfaced as ErrClientShutdown.
if errors.Is(err, context.Canceled) &&
errors.Is(context.Cause(ctx), ErrClientShutdown) {

err = ErrClientShutdown
}

jReq.responseChan <- &Response{
result: res,
err: err,
}
jReq.responseChan <- &Response{result: res, err: err}
}

// sendPostHandler handles all outgoing messages when the client is running
// in HTTP POST mode. It uses a buffered channel to serialize output messages
// while allowing the sender to continue running asynchronously. It must be run
// as a goroutine.
func (c *Client) sendPostHandler() {
func (c *Client) sendPostHandler(ctx context.Context) {
out:
for {
// Send any messages ready for send until the shutdown channel
// is closed.
select {
case jReq := <-c.sendPostChan:
c.handleSendPostMessage(jReq)
c.handleSendPostMessage(ctx, jReq)

case <-c.shutdown:
case <-ctx.Done():
break out
}
}

err := context.Cause(ctx)

// Drain any wait channels before exiting so nothing is left waiting
// around to send.
cleanup:
for {
select {
case jReq := <-c.sendPostChan:
jReq.responseChan <- &Response{
result: nil,
err: ErrClientShutdown,
}
jReq.responseChan <- &Response{result: nil, err: err}

default:
break cleanup
Expand All @@ -928,19 +954,32 @@ cleanup:
// HTTP client associated with the client. It is backed by a buffered channel,
// so it will not block until the send channel is full.
func (c *Client) sendPostRequest(jReq *jsonRequest) {
// Don't send the message if shutting down.
// Prefer shutdown when it is already closed so this path is
// deterministic. This mirrors addRequest and avoids post-shutdown
// enqueueing.
select {
case <-c.shutdown:
jReq.responseChan <- &Response{result: nil, err: ErrClientShutdown}
jReq.responseChan <- &Response{
result: nil,
err: ErrClientShutdown,
}

return

default:
}

// Normal path: either enqueue, or fail if shutdown closes in the race
// window after the guard above.
select {
case c.sendPostChan <- jReq:
log.Tracef("Sent command [%s] with id %d", jReq.method, jReq.id)

case <-c.shutdown:
return
jReq.responseChan <- &Response{
result: nil,
err: ErrClientShutdown,
}
}
}

Expand Down Expand Up @@ -1178,8 +1217,13 @@ func (c *Client) start() {
// Start the I/O processing handlers depending on whether the client is
// in HTTP POST mode or the default websocket mode.
if c.config.HTTPPostMode {
ctx, cancel := context.WithCancelCause(context.Background())
c.wg.Add(1)
go c.sendPostHandler()
go c.sendPostHandler(ctx)
go func() {
<-c.shutdown
cancel(ErrClientShutdown)
}()
} else {
c.wg.Add(3)
go func() {
Expand Down Expand Up @@ -1564,13 +1608,18 @@ func NewBatch(config *ConnConfig) (*Client, error) {
if !config.HTTPPostMode {
return nil, errors.New("http post mode is required to use batch client")
}
// notification parameter is nil since notifications are not supported in POST mode.

// The notification parameter is nil since notifications are not
// supported in POST mode.
client, err := New(config, nil)
if err != nil {
return nil, err
}
client.batch = true //copy the client with changed batch setting
client.start()

// New() already started the HTTP handlers, so only toggle batch
// semantics.
client.batch = true

return client, nil
}

Expand Down Expand Up @@ -1716,6 +1765,38 @@ func (c *Client) sendAsync() (FutureGetBulkResult, error) {
return responseChan, nil
}

// failBatchRequests resolves every queued batch request with the provided error
// and clears all internal request tracking.
//
// This function is safe for concurrent access.
func (c *Client) failBatchRequests(err error) {
c.requestLock.Lock()
defer c.requestLock.Unlock()

c.batchLock.Lock()
defer c.batchLock.Unlock()

for e := c.batchList.Front(); e != nil; e = e.Next() {
req := e.Value.(*jsonRequest)

// Resolve all pending futures on the first batch-level failure
// so callers waiting on Receive don't block indefinitely.
// Safe: batch-mode responseChan buffers are unwritten here,
// so this send won't block while locks are held. Batch-mode
// requests only use addRequest (not sendPostRequest), so each
// responseChan buffer is still empty.
req.responseChan <- &Response{err: err}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This send is safe because in batch mode sendRequest only calls addRequest (never sendPostRequest), so individual responseChan buffers (size 1) are guaranteed unwritten at this point. A brief comment documenting this invariant would help future readers — e.g.:

// Safe: batch-mode responseChan buffers are unwritten here,
// so this send won't block while locks are held.
req.responseChan <- &Response{err: err}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment:

		// Resolve all pending futures on the first batch-level failure
		// so callers waiting on Receive don't block indefinitely.
		// Safe: batch-mode responseChan buffers are unwritten here,
		// so this send won't block while locks are held. Batch-mode
		// requests only use addRequest (not sendPostRequest), so each
		// responseChan buffer is still empty.
		req.responseChan <- &Response{err: err}

}

c.requestMap = make(map[uint64]*list.Element)
c.batchList = list.New()

// Batch-mode requests are tracked in batchList, so requestList should
// already be empty. Keep this defensive reset for invariants and future
// call paths.
c.requestList.Init()
}

// Marshall's bulk requests and sends to the server
// creates a response channel to receive the response
func (c *Client) Send() error {
Expand All @@ -1726,12 +1807,7 @@ func (c *Client) Send() error {

batchResp, err := future.Receive()
if err != nil {
// Clear batchlist in case of an error.

c.batchLock.Lock()
c.batchList = list.New()
c.batchLock.Unlock()

c.failBatchRequests(err)
return err
}

Expand Down
Loading
Loading