From 3f0f714a53413c5e0bf17399da0f8f5896f6b91c Mon Sep 17 00:00:00 2001 From: Lalit Sudhir Date: Fri, 17 Oct 2025 14:53:25 -0700 Subject: [PATCH 1/7] replace errgroup with conc pool in WriteExecute method --- client/client.go | 43 +++++++++++++++++++++---------------------- client/client_test.go | 6 ++++-- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/client/client.go b/client/client.go index d9f6355..09b80ae 100644 --- a/client/client.go +++ b/client/client.go @@ -15,6 +15,7 @@ package client import ( _context "context" "encoding/json" + "errors" "fmt" "math" _nethttp "net/http" @@ -1784,18 +1785,15 @@ func (client *OpenFgaClient) WriteExecute(request SdkClientWriteRequestInterface if request.GetBody() != nil { for i := 0; i < len(request.GetBody().Writes); i += writeChunkSize { end := int(math.Min(float64(i+writeChunkSize), float64(len(request.GetBody().Writes)))) - writeChunks = append(writeChunks, (request.GetBody().Writes)[i:end]) } } - writeGroup, ctx := errgroup.WithContext(request.GetContext()) - - writeGroup.SetLimit(int(maxParallelReqs)) + writePool := pool.NewWithResults[*ClientWriteResponse]().WithContext(request.GetContext()).WithMaxGoroutines(int(maxParallelReqs)) writeResponses := make([]ClientWriteResponse, len(writeChunks)) for index, writeBody := range writeChunks { index, writeBody := index, writeBody - writeGroup.Go(func() error { + writePool.Go(func(ctx _context.Context) (*ClientWriteResponse, error) { singleResponse, err := client.WriteExecute(&SdkClientWriteRequest{ ctx: ctx, Client: client, @@ -1809,19 +1807,19 @@ func (client *OpenFgaClient) WriteExecute(request SdkClientWriteRequestInterface Conflict: options.Conflict, }, }) - - if _, ok := err.(fgaSdk.FgaApiAuthenticationError); ok { - return err + var authErr fgaSdk.FgaApiAuthenticationError + if errors.As(err, &authErr) { + return nil, err } - writeResponses[index] = *singleResponse + if singleResponse != nil { + writeResponses[index] = *singleResponse + } - return nil + return singleResponse, nil }) } - - err = writeGroup.Wait() - // If an error was returned then it will be an authentication error so we want to return + _, err = writePool.Wait() if err != nil { return &response, err } @@ -1836,12 +1834,11 @@ func (client *OpenFgaClient) WriteExecute(request SdkClientWriteRequestInterface } } - deleteGroup, ctx := errgroup.WithContext(request.GetContext()) - deleteGroup.SetLimit(int(maxParallelReqs)) + deletePool := pool.NewWithResults[*ClientWriteResponse]().WithContext(request.GetContext()).WithMaxGoroutines(int(maxParallelReqs)) deleteResponses := make([]ClientWriteResponse, len(deleteChunks)) for index, deleteBody := range deleteChunks { index, deleteBody := index, deleteBody - deleteGroup.Go(func() error { + deletePool.Go(func(ctx _context.Context) (*ClientWriteResponse, error) { singleResponse, err := client.WriteExecute(&SdkClientWriteRequest{ ctx: ctx, Client: client, @@ -1856,19 +1853,21 @@ func (client *OpenFgaClient) WriteExecute(request SdkClientWriteRequestInterface }, }) - if _, ok := err.(fgaSdk.FgaApiAuthenticationError); ok { - return err + var authErr fgaSdk.FgaApiAuthenticationError + if errors.As(err, &authErr) { + return nil, err } - deleteResponses[index] = *singleResponse + if singleResponse != nil { + deleteResponses[index] = *singleResponse + } - return nil + return singleResponse, nil }) } - err = deleteGroup.Wait() + _, err = deletePool.Wait() if err != nil { - // If an error was returned then it will be an authentication error so we want to return return &response, err } diff --git a/client/client_test.go b/client/client_test.go index f051e50..c4924c3 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -15,6 +15,7 @@ package client_test import ( "context" "encoding/json" + "errors" "fmt" "net/http" "sync" @@ -1775,7 +1776,8 @@ func TestOpenFgaClient(t *testing.T) { t.Fatalf("Expect error with invalid auth but there is none") } - if _, ok := err.(openfga.FgaApiAuthenticationError); !ok { + var authErr openfga.FgaApiAuthenticationError + if !errors.As(err, &authErr) { t.Fatalf("Expected an api auth error") } @@ -1793,7 +1795,7 @@ func TestOpenFgaClient(t *testing.T) { t.Fatalf("Expect error with invalid auth but there is none") } - if _, ok := err.(openfga.FgaApiAuthenticationError); !ok { + if !errors.As(err, &authErr) { t.Fatalf("Expected an api auth error") } }) From 0517d04237a0aab8e6b24fe4dd09ff5b6e72551f Mon Sep 17 00:00:00 2001 From: Lalit Sudhir Date: Fri, 17 Oct 2025 17:38:35 -0700 Subject: [PATCH 2/7] simplify WriteExecute concurrency using conc pool --- client/client.go | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/client/client.go b/client/client.go index 09b80ae..25baf19 100644 --- a/client/client.go +++ b/client/client.go @@ -1790,9 +1790,8 @@ func (client *OpenFgaClient) WriteExecute(request SdkClientWriteRequestInterface } writePool := pool.NewWithResults[*ClientWriteResponse]().WithContext(request.GetContext()).WithMaxGoroutines(int(maxParallelReqs)) - writeResponses := make([]ClientWriteResponse, len(writeChunks)) - for index, writeBody := range writeChunks { - index, writeBody := index, writeBody + for _, writeBody := range writeChunks { + writeBody := writeBody writePool.Go(func(ctx _context.Context) (*ClientWriteResponse, error) { singleResponse, err := client.WriteExecute(&SdkClientWriteRequest{ ctx: ctx, @@ -1812,14 +1811,10 @@ func (client *OpenFgaClient) WriteExecute(request SdkClientWriteRequestInterface return nil, err } - if singleResponse != nil { - writeResponses[index] = *singleResponse - } - return singleResponse, nil }) } - _, err = writePool.Wait() + writeResponses, err := writePool.Wait() if err != nil { return &response, err } @@ -1835,9 +1830,8 @@ func (client *OpenFgaClient) WriteExecute(request SdkClientWriteRequestInterface } deletePool := pool.NewWithResults[*ClientWriteResponse]().WithContext(request.GetContext()).WithMaxGoroutines(int(maxParallelReqs)) - deleteResponses := make([]ClientWriteResponse, len(deleteChunks)) - for index, deleteBody := range deleteChunks { - index, deleteBody := index, deleteBody + for _, deleteBody := range deleteChunks { + deleteBody := deleteBody deletePool.Go(func(ctx _context.Context) (*ClientWriteResponse, error) { singleResponse, err := client.WriteExecute(&SdkClientWriteRequest{ ctx: ctx, @@ -1857,16 +1851,11 @@ func (client *OpenFgaClient) WriteExecute(request SdkClientWriteRequestInterface if errors.As(err, &authErr) { return nil, err } - - if singleResponse != nil { - deleteResponses[index] = *singleResponse - } - return singleResponse, nil }) } - _, err = deletePool.Wait() + deleteResponses, err := deletePool.Wait() if err != nil { return &response, err } From dfeff0b3d2d025d0c25ff854939376f92138ce96 Mon Sep 17 00:00:00 2001 From: Lalit Sudhir Date: Sun, 2 Nov 2025 22:41:43 -0700 Subject: [PATCH 3/7] Added the error related comment --- client/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/client.go b/client/client.go index 25baf19..3134285 100644 --- a/client/client.go +++ b/client/client.go @@ -1807,6 +1807,7 @@ func (client *OpenFgaClient) WriteExecute(request SdkClientWriteRequestInterface }, }) var authErr fgaSdk.FgaApiAuthenticationError + // If an error was returned then it will be an authentication error so we want to return if errors.As(err, &authErr) { return nil, err } @@ -1856,6 +1857,7 @@ func (client *OpenFgaClient) WriteExecute(request SdkClientWriteRequestInterface } deleteResponses, err := deletePool.Wait() + // If an error was returned then it will be an authentication error so we want to return if err != nil { return &response, err } From 2e1595967026c64c9299fa5bd161aab817754a1e Mon Sep 17 00:00:00 2001 From: Lalit Sudhir Date: Thu, 11 Dec 2025 18:04:56 -0700 Subject: [PATCH 4/7] move ClientBatchCheckExecute to use conc pool --- client/client.go | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/client/client.go b/client/client.go index 0415059..cb296d5 100644 --- a/client/client.go +++ b/client/client.go @@ -10,7 +10,6 @@ import ( "time" "github.com/sourcegraph/conc/pool" - "golang.org/x/sync/errgroup" fgaSdk "github.com/openfga/go-sdk" "github.com/openfga/go-sdk/credentials" @@ -2215,7 +2214,7 @@ func (request *SdkClientBatchCheckClientRequest) GetOptions() *ClientBatchCheckC } func (client *OpenFgaClient) ClientBatchCheckExecute(request SdkClientBatchCheckClientRequestInterface) (*ClientBatchCheckClientResponse, error) { - group, ctx := errgroup.WithContext(request.GetContext()) + ctx := request.GetContext() requestOptions := RequestOptions{} maxParallelReqs := int(DEFAULT_MAX_METHOD_PARALLEL_REQS) if request.GetOptions() != nil { @@ -2225,7 +2224,6 @@ func (client *OpenFgaClient) ClientBatchCheckExecute(request SdkClientBatchCheck } } - group.SetLimit(maxParallelReqs) var numOfChecks = len(*request.GetBody()) response := make(ClientBatchCheckClientResponse, numOfChecks) authorizationModelId, err := client.getAuthorizationModelId(request.GetAuthorizationModelIdOverride()) @@ -2249,9 +2247,15 @@ func (client *OpenFgaClient) ClientBatchCheckExecute(request SdkClientBatchCheck checkOptions.Consistency = request.GetOptions().Consistency } + type batchCheckResult struct { + Index int + Response ClientBatchCheckClientSingleResponse + } + + checkPool := pool.NewWithResults[*batchCheckResult]().WithContext(ctx).WithMaxGoroutines(maxParallelReqs) for index, checkBody := range *request.GetBody() { index, checkBody := index, checkBody - group.Go(func() error { + checkPool.Go(func(ctx _context.Context) (*batchCheckResult, error) { singleResponse, err := client.CheckExecute(&SdkClientCheckRequest{ ctx: ctx, Client: client, @@ -2259,24 +2263,32 @@ func (client *OpenFgaClient) ClientBatchCheckExecute(request SdkClientBatchCheck options: checkOptions, }) - if _, ok := err.(fgaSdk.FgaApiAuthenticationError); ok { - return err - } - - response[index] = ClientBatchCheckClientSingleResponse{ - Request: checkBody, - ClientCheckResponse: *singleResponse, - Error: err, + var authErr fgaSdk.FgaApiAuthenticationError + // If an error was returned then it will be an authentication error so we want to return + if errors.As(err, &authErr) { + return nil, err } - return nil + return &batchCheckResult{ + Index: index, + Response: ClientBatchCheckClientSingleResponse{ + Request: checkBody, + ClientCheckResponse: *singleResponse, + Error: err, + }, + }, nil }) } - if err := group.Wait(); err != nil { + results, err := checkPool.Wait() + if err != nil { return nil, err } + for _, result := range results { + response[result.Index] = result.Response + } + return &response, nil } From aeb2632cd7cab995fd39e53d2ae080f614cb4417 Mon Sep 17 00:00:00 2001 From: Lalit Sudhir Date: Sun, 21 Dec 2025 12:08:19 -0800 Subject: [PATCH 5/7] added nil checks for the ClientBatchExecute --- client/client.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index 96f6587..cfb0bed 100644 --- a/client/client.go +++ b/client/client.go @@ -2269,11 +2269,17 @@ func (client *OpenFgaClient) ClientBatchCheckExecute(request SdkClientBatchCheck return nil, err } + // Handle nil response + var checkResponse ClientCheckResponse + if singleResponse != nil { + checkResponse = *singleResponse + } + return &batchCheckResult{ Index: index, Response: ClientBatchCheckClientSingleResponse{ Request: checkBody, - ClientCheckResponse: *singleResponse, + ClientCheckResponse: checkResponse, Error: err, }, }, nil From e4adb7ada3d14ddf7c4fd2d549112587bbe68299 Mon Sep 17 00:00:00 2001 From: Lalit Sudhir Date: Mon, 22 Dec 2025 12:47:46 -0800 Subject: [PATCH 6/7] added nil checks i n writeExecute --- client/client.go | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/client/client.go b/client/client.go index cfb0bed..15f4223 100644 --- a/client/client.go +++ b/client/client.go @@ -1795,11 +1795,19 @@ func (client *OpenFgaClient) WriteExecute(request SdkClientWriteRequestInterface }, }) var authErr fgaSdk.FgaApiAuthenticationError - // If an error was returned then it will be an authentication error so we want to return + // If an authentication error was returned, we want to return it immediately if errors.As(err, &authErr) { return nil, err } + // Handle nil response - create zero value if singleResponse is nil + if singleResponse == nil { + return &ClientWriteResponse{ + Writes: []ClientWriteRequestWriteResponse{}, + Deletes: []ClientWriteRequestDeleteResponse{}, + }, nil + } + return singleResponse, nil }) } @@ -1837,25 +1845,39 @@ func (client *OpenFgaClient) WriteExecute(request SdkClientWriteRequestInterface }) var authErr fgaSdk.FgaApiAuthenticationError + // If an authentication error was returned, we want to return it immediately if errors.As(err, &authErr) { return nil, err } + + // Handle nil response - create zero value if singleResponse is nil + if singleResponse == nil { + return &ClientWriteResponse{ + Writes: []ClientWriteRequestWriteResponse{}, + Deletes: []ClientWriteRequestDeleteResponse{}, + }, nil + } + return singleResponse, nil }) } deleteResponses, err := deletePool.Wait() - // If an error was returned then it will be an authentication error so we want to return + // If authencication error was returned, we want to return it immediately if err != nil { return &response, err } for _, writeResponse := range writeResponses { - response.Writes = append(response.Writes, writeResponse.Writes...) + if writeResponse != nil { + response.Writes = append(response.Writes, writeResponse.Writes...) + } } for _, deleteResponse := range deleteResponses { - response.Deletes = append(response.Deletes, deleteResponse.Deletes...) + if deleteResponse != nil { + response.Deletes = append(response.Deletes, deleteResponse.Deletes...) + } } return &response, nil From 5ce3bea5e3db243a02db4fb666b324b6df041408 Mon Sep 17 00:00:00 2001 From: Lalit Sudhir Date: Tue, 6 Jan 2026 11:11:56 -0800 Subject: [PATCH 7/7] Implemented suggested changes --- client/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index 15f4223..27c561c 100644 --- a/client/client.go +++ b/client/client.go @@ -2286,7 +2286,8 @@ func (client *OpenFgaClient) ClientBatchCheckExecute(request SdkClientBatchCheck }) var authErr fgaSdk.FgaApiAuthenticationError - // If an error was returned then it will be an authentication error so we want to return + // If the error is an authentication error, propagate it so the batch fails fast. + // Non-authentication errors are captured in the per-request response below. if errors.As(err, &authErr) { return nil, err }