From a9c2ac4d6b3db7786a56e42868f26da0397a3dc0 Mon Sep 17 00:00:00 2001 From: lakshayman Date: Mon, 10 Nov 2025 13:41:27 +0530 Subject: [PATCH 1/2] refactor: all lambdas --- call-profile/main.go | 117 +++++++++----------- call-profile/main_test.go | 18 ++-- call-profiles/main.go | 59 +++++----- call-profiles/main_test.go | 8 +- health-check/main.go | 77 ++++++------- health-check/main_test.go | 9 +- layer/utils/auth.go | 6 +- layer/utils/concurrency.go | 82 ++++++++++++++ layer/utils/constants.go | 8 ++ layer/utils/errors.go | 70 ++++++++++++ layer/utils/firestore.go | 214 +++++++++++++++++++++++-------------- layer/utils/handler.go | 41 +++++++ layer/utils/http.go | 58 ++++++++++ layer/utils/logger.go | 139 ++++++++++++++++++++++++ verify/helpers_test.go | 2 +- verify/main.go | 52 ++------- verify/main_test.go | 53 ++++----- 17 files changed, 706 insertions(+), 307 deletions(-) create mode 100644 layer/utils/concurrency.go create mode 100644 layer/utils/errors.go create mode 100644 layer/utils/handler.go create mode 100644 layer/utils/http.go create mode 100644 layer/utils/logger.go diff --git a/call-profile/main.go b/call-profile/main.go index cf70729..5d0ae61 100644 --- a/call-profile/main.go +++ b/call-profile/main.go @@ -1,25 +1,14 @@ package main import ( - "context" "fmt" "identity-service/layer/utils" - "log" - "net/http" "time" - "cloud.google.com/go/firestore" - "github.com/aws/aws-lambda-go/events" - "github.com/aws/aws-lambda-go/lambda" ) -type deps struct { - client *firestore.Client - ctx context.Context -} - -func (d *deps) handler(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { +func handler(d *utils.Deps, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { var userId, sessionId string = utils.GetDataFromBody([]byte(request.Body)) if userId == "" { return events.APIGatewayProxyResponse{ @@ -28,52 +17,63 @@ func (d *deps) handler(request events.APIGatewayProxyRequest) (events.APIGateway }, nil } - dsnap, err := d.client.Collection("users").Doc(userId).Get(d.ctx) - - var userUrl string - var chaincode string - var discordId string + dsnap, err := d.Client.Collection("users").Doc(userId).Get(d.Ctx) + if err != nil { + return events.APIGatewayProxyResponse{ + Body: fmt.Sprintf("Error retrieving user: %v", err), + StatusCode: 500, + }, nil + } - if str, ok := dsnap.Data()["discordId"].(string); ok { - discordId = str - } else { - discordId = "" + data := dsnap.Data() + + var user utils.User + err = dsnap.DataTo(&user) + if err != nil { + utils.LogProfileSkipped(d.Client, d.Ctx, userId, "UserData Type Error: "+fmt.Sprintln(err), sessionId) + return events.APIGatewayProxyResponse{ + Body: "Profile Skipped No User Data", + StatusCode: 200, + }, nil } - if str, ok := dsnap.Data()["profileURL"].(string); ok { - userUrl = str - } else { - utils.LogProfileSkipped(d.client, d.ctx, userId, "Profile URL not available", sessionId) - utils.SetProfileStatusBlocked(d.client, d.ctx, userId, "Profile URL not available", sessionId, discordId) + discordId := user.DiscordID + + if user.ProfileURL == "" { + utils.LogProfileSkipped(d.Client, d.Ctx, userId, "Profile URL not available", sessionId) + utils.SetProfileStatusBlocked(d.Client, d.Ctx, userId, "Profile URL not available", sessionId, discordId) return events.APIGatewayProxyResponse{ Body: "Profile Skipped No Profile URL", StatusCode: 200, }, nil } - if str, ok := dsnap.Data()["chaincode"].(string); ok { - if str == "" { - utils.LogProfileSkipped(d.client, d.ctx, userId, "Profile Service Blocked or Chaincode is empty", sessionId) - utils.SetProfileStatusBlocked(d.client, d.ctx, userId, "Profile Service Blocked or Chaincode is empty", sessionId, discordId) - return events.APIGatewayProxyResponse{ - Body: "Profile Skipped Profile Service Blocked", - StatusCode: 200, - }, nil - } - chaincode = str - } else { - utils.LogProfileSkipped(d.client, d.ctx, userId, "Chaincode Not Found", sessionId) - utils.SetProfileStatusBlocked(d.client, d.ctx, userId, "Chaincode Not Found", sessionId, discordId) + _, chaincodeExists := data["chaincode"] + if !chaincodeExists { + utils.LogProfileSkipped(d.Client, d.Ctx, userId, "Chaincode Not Found", sessionId) + utils.SetProfileStatusBlocked(d.Client, d.Ctx, userId, "Chaincode Not Found", sessionId, discordId) return events.APIGatewayProxyResponse{ Body: "Profile Skipped Chaincode Not Found", StatusCode: 200, }, nil } + if user.Chaincode == "" { + utils.LogProfileSkipped(d.Client, d.Ctx, userId, "Profile Service Blocked or Chaincode is empty", sessionId) + utils.SetProfileStatusBlocked(d.Client, d.Ctx, userId, "Profile Service Blocked or Chaincode is empty", sessionId, discordId) + return events.APIGatewayProxyResponse{ + Body: "Profile Skipped Profile Service Blocked", + StatusCode: 200, + }, nil + } + + userUrl := user.ProfileURL + chaincode := user.Chaincode + var userData utils.Diff err = dsnap.DataTo(&userData) if err != nil { - utils.LogProfileSkipped(d.client, d.ctx, userId, "UserData Type Error: "+fmt.Sprintln(err), sessionId) + utils.LogProfileSkipped(d.Client, d.Ctx, userId, "UserData Type Error: "+fmt.Sprintln(err), sessionId) return events.APIGatewayProxyResponse{ Body: "Profile Skipped No User Data", StatusCode: 200, @@ -83,33 +83,31 @@ func (d *deps) handler(request events.APIGatewayProxyRequest) (events.APIGateway if userUrl[len(userUrl)-1] != '/' { userUrl = userUrl + "/" } + + _, serviceErr := utils.GetWithContext(d.Ctx, userUrl+"health", 5*time.Second) var isServiceRunning bool - c := &http.Client{ - Timeout: 5 * time.Second, - } - _, serviceErr := c.Get(userUrl + "health") if serviceErr != nil { isServiceRunning = false } else { isServiceRunning = true } - utils.LogHealth(d.client, d.ctx, userId, isServiceRunning, sessionId) + utils.LogHealth(d.Client, d.Ctx, userId, isServiceRunning, sessionId) if !isServiceRunning { - utils.LogProfileSkipped(d.client, d.ctx, userId, "Profile Service Down", sessionId) - utils.SetProfileStatusBlocked(d.client, d.ctx, userId, "Profile Service Down", sessionId, discordId) + utils.LogProfileSkipped(d.Client, d.Ctx, userId, "Profile Service Down", sessionId) + utils.SetProfileStatusBlocked(d.Client, d.Ctx, userId, "Profile Service Down", sessionId, discordId) return events.APIGatewayProxyResponse{ Body: "Profile Skipped Service Down", StatusCode: 200, }, nil } - dataErr := utils.Getdata(d.client, d.ctx, userId, userUrl, chaincode, utils.DiffToRes(userData), sessionId, discordId) - if dataErr != "" { - return events.APIGatewayProxyResponse{ - Body: "Profile Skipped " + dataErr, - StatusCode: 200, - }, nil + err = utils.Getdata(d.Client, d.Ctx, userId, userUrl, chaincode, utils.DiffToRes(userData), sessionId, discordId) + if err != nil { + if profileErr, ok := err.(*utils.ProfileError); ok { + return utils.HandleProfileSkippedError(profileErr.Message), nil + } + return utils.HandleProfileSkippedError(err.Error()), nil } return events.APIGatewayProxyResponse{ @@ -119,16 +117,5 @@ func (d *deps) handler(request events.APIGatewayProxyRequest) (events.APIGateway } func main() { - ctx := context.Background() - client, err := utils.InitializeFirestoreClient(ctx) - if err != nil { - log.Fatalf("Failed to initialize Firestore client: %v", err) - } - - d := deps{ - client: client, - ctx: ctx, - } - - lambda.Start(d.handler) + utils.InitializeLambdaWithFirestore("call-profile", handler) } diff --git a/call-profile/main_test.go b/call-profile/main_test.go index 55ea80f..edc2fe3 100644 --- a/call-profile/main_test.go +++ b/call-profile/main_test.go @@ -364,8 +364,8 @@ func TestHandlerIntegration(t *testing.T) { }, userData: nil, mockServer: nil, - expectedBody: "Profile Skipped No Profile URL", - expectedStatus: 200, + expectedBody: "Error retrieving user: rpc error: code = NotFound desc = \"projects/test-project/databases/(default)/documents/users/non-existent-user\" not found", + expectedStatus: 500, expectedError: false, }, { @@ -435,7 +435,7 @@ func TestHandlerIntegration(t *testing.T) { w.Write([]byte("Service Unavailable")) })) }, - expectedBody: "Profile Skipped error in getting profile data", + expectedBody: "Profile Skipped: error in getting profile data: status code 500", expectedStatus: 200, expectedError: false, }, @@ -590,7 +590,7 @@ func TestHandlerEdgeCases(t *testing.T) { "firstName": 123, // Invalid type "lastName": "Doe", }, - expectedBody: "Profile Skipped error in getting profile data", + expectedBody: "Profile Skipped: error in getting profile data: status code 404", expectedStatus: 200, }, { @@ -604,7 +604,7 @@ func TestHandlerEdgeCases(t *testing.T) { "chaincode": "TESTCHAIN", "profileStatus": "PENDING", }, - expectedBody: "Profile Skipped error in getting profile data", // Will fail health check + expectedBody: "Profile Skipped: error in getting profile data: status code 404", // Will fail health check expectedStatus: 200, }, } @@ -635,9 +635,9 @@ func newFirestoreMockClient(ctx context.Context) *firestore.Client { func handlerWithClient(request events.APIGatewayProxyRequest, client *firestore.Client) (events.APIGatewayProxyResponse, error) { ctx := context.Background() - d := deps{ - client: client, - ctx: ctx, + d := &utils.Deps{ + Client: client, + Ctx: ctx, } - return d.handler(request) + return handler(d, request) } \ No newline at end of file diff --git a/call-profiles/main.go b/call-profiles/main.go index 57a912e..9d93b85 100644 --- a/call-profiles/main.go +++ b/call-profiles/main.go @@ -1,30 +1,17 @@ package main import ( - "context" "fmt" "identity-service/layer/utils" - "log" - "sync" "time" - "cloud.google.com/go/firestore" - "github.com/aws/aws-lambda-go/events" - "github.com/aws/aws-lambda-go/lambda" "google.golang.org/api/iterator" ) -var wg sync.WaitGroup - -type deps struct { - client *firestore.Client - ctx context.Context -} - func callProfile(userId string, sessionId string) { - defer wg.Done() + logger := utils.GetLogger() payload := utils.ProfileLambdaCallPayload{ UserId: userId, @@ -33,12 +20,16 @@ func callProfile(userId string, sessionId string) { err := utils.InvokeProfileLambda(payload) if err != nil { - log.Println("error calling profile lambda", err) + logger.Error("Error calling profile lambda", err, map[string]interface{}{ + "function": "callProfile", + "userId": userId, + "sessionId": sessionId, + }) } } -func (d *deps) handler(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { - docRef, _, sessionIdErr := d.client.Collection("identitySessionIds").Add(d.ctx, map[string]interface{}{ +func handler(d *utils.Deps, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { + docRef, _, sessionIdErr := d.Client.Collection("identitySessionIds").Add(d.Ctx, map[string]interface{}{ "Timestamp": time.Now(), }) @@ -48,21 +39,32 @@ func (d *deps) handler(request events.APIGatewayProxyRequest) (events.APIGateway totalProfilesCalled := 0 - iter := d.client.Collection("users").Where("profileStatus", "==", "VERIFIED").Documents(d.ctx) + workerPool := utils.NewWorkerPool(10, 100) + defer workerPool.Close() + + iter := d.Client.Collection("users").Where("profileStatus", "==", "VERIFIED").Documents(d.Ctx) for { doc, err := iter.Next() if err == iterator.Done { break } if err != nil { - log.Fatalf("Failed to iterate: %v", err) + return events.APIGatewayProxyResponse{ + Body: fmt.Sprintf("Failed to iterate users: %v", err), + StatusCode: 500, + }, nil } + + userId := doc.Ref.ID + sessionId := docRef.ID + totalProfilesCalled += 1 - wg.Add(1) - go callProfile(doc.Ref.ID, docRef.ID) + workerPool.Submit(func() { + callProfile(userId, sessionId) + }) } - wg.Wait() + workerPool.Wait() return events.APIGatewayProxyResponse{ Body: fmt.Sprintf("Total Profiles called in session is %d", totalProfilesCalled), @@ -71,16 +73,5 @@ func (d *deps) handler(request events.APIGatewayProxyRequest) (events.APIGateway } func main() { - ctx := context.Background() - client, err := utils.InitializeFirestoreClient(ctx) - if err != nil { - log.Fatalf("Failed to initialize Firestore client: %v", err) - } - - d := deps{ - client: client, - ctx: ctx, - } - - lambda.Start(d.handler) + utils.InitializeLambdaWithFirestore("call-profiles", handler) } diff --git a/call-profiles/main_test.go b/call-profiles/main_test.go index d1cba8b..328505a 100644 --- a/call-profiles/main_test.go +++ b/call-profiles/main_test.go @@ -302,11 +302,11 @@ func newFirestoreMockClient(ctx context.Context) *firestore.Client { func handlerWithClient(request events.APIGatewayProxyRequest, client *firestore.Client) (events.APIGatewayProxyResponse, error) { ctx := context.Background() - d := deps{ - client: client, - ctx: ctx, + d := &utils.Deps{ + Client: client, + Ctx: ctx, } - return d.handler(request) + return handler(d, request) } func TestHandlerIntegration(t *testing.T) { diff --git a/health-check/main.go b/health-check/main.go index 6aa895d..1e6cd00 100644 --- a/health-check/main.go +++ b/health-check/main.go @@ -4,72 +4,74 @@ import ( "context" "fmt" "identity-service/layer/utils" - "log" - "net/http" - "sync" "time" - "cloud.google.com/go/firestore" - "github.com/aws/aws-lambda-go/events" - "github.com/aws/aws-lambda-go/lambda" "google.golang.org/api/iterator" ) -var wg sync.WaitGroup - -type deps struct { - client *firestore.Client - ctx context.Context -} - func callProfileHealth(userUrl string) { - - defer wg.Done() + logger := utils.GetLogger() // Skip if URL is empty if userUrl == "" { - fmt.Println("Empty profile URL, skipping health check") + logger.Warn("Empty profile URL, skipping health check", map[string]interface{}{ + "function": "callProfileHealth", + }) return } - httpClient := &http.Client{ - Timeout: 2 * time.Second, - } if userUrl[len(userUrl)-1] != '/' { userUrl = userUrl + "/" } requestURL := fmt.Sprintf("%shealth", userUrl) - req, _ := http.NewRequest("GET", requestURL, nil) - _, err1 := httpClient.Do(req) + _, err1 := utils.GetWithContext(context.Background(), requestURL, 2*time.Second) if err1 != nil { - fmt.Println("Service not running", err1) + logger.WarnWithError("Service not running", map[string]interface{}{ + "function": "callProfileHealth", + "profileURL": userUrl, + }, err1) } } -func (d *deps) handler(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { +func handler(d *utils.Deps, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { totalProfilesCalled := 0 - iter := d.client.Collection("users").Where("profileStatus", "==", "VERIFIED").Documents(d.ctx) + workerPool := utils.NewWorkerPool(20, 200) + defer workerPool.Close() + + iter := d.Client.Collection("users").Where("profileStatus", "==", "VERIFIED").Documents(d.Ctx) for { doc, err := iter.Next() if err == iterator.Done { break } if err != nil { - log.Fatalf("Failed to iterate: %v", err) + return events.APIGatewayProxyResponse{ + Body: fmt.Sprintf("Failed to iterate users: %v", err), + StatusCode: 500, + }, nil } - if str, ok := doc.Data()["profileURL"].(string); ok { - fmt.Println(str) - totalProfilesCalled += 1 - wg.Add(1) - go callProfileHealth(str) + + var user utils.User + err = doc.DataTo(&user) + if err == nil { + data := doc.Data() + profileURLVal, profileURLExists := data["profileURL"] + if profileURLExists { + totalProfilesCalled += 1 + if profileURL, ok := profileURLVal.(string); ok && profileURL != "" { + workerPool.Submit(func() { + callProfileHealth(profileURL) + }) + } + } } } - wg.Wait() + workerPool.Wait() return events.APIGatewayProxyResponse{ Body: fmt.Sprintf("Total Profiles called in session is %d", totalProfilesCalled), @@ -78,16 +80,5 @@ func (d *deps) handler(request events.APIGatewayProxyRequest) (events.APIGateway } func main() { - ctx := context.Background() - client, err := utils.InitializeFirestoreClient(ctx) - if err != nil { - log.Fatalf("Failed to initialize Firestore client: %v", err) - } - - d := deps{ - client: client, - ctx: ctx, - } - - lambda.Start(d.handler) + utils.InitializeLambdaWithFirestore("health-check", handler) } diff --git a/health-check/main_test.go b/health-check/main_test.go index 0d577e9..ca72893 100644 --- a/health-check/main_test.go +++ b/health-check/main_test.go @@ -18,6 +18,7 @@ import ( "github.com/aws/aws-lambda-go/events" "github.com/stretchr/testify/assert" + "identity-service/layer/utils" ) func TestCallProfileHealth(t *testing.T) { @@ -396,11 +397,11 @@ func newFirestoreMockClient(ctx context.Context) *firestore.Client { func handlerWithClient(request events.APIGatewayProxyRequest, client *firestore.Client) (events.APIGatewayProxyResponse, error) { ctx := context.Background() - d := deps{ - client: client, - ctx: ctx, + d := &utils.Deps{ + Client: client, + Ctx: ctx, } - return d.handler(request) + return handler(d, request) } func TestHandlerIntegration(t *testing.T) { diff --git a/layer/utils/auth.go b/layer/utils/auth.go index 69a770f..4a1a248 100644 --- a/layer/utils/auth.go +++ b/layer/utils/auth.go @@ -1,7 +1,6 @@ package utils import ( - "log" "os" "time" @@ -47,7 +46,10 @@ func getParameter(parameter string) string { Name: ¶meterName, }) if err != nil { - log.Print(err.Error()) + LogError("Failed to get parameter from SSM", err, map[string]interface{}{ + "function": "getParameter", + "parameterName": parameterName, + }) } return *results.Parameter.Value diff --git a/layer/utils/concurrency.go b/layer/utils/concurrency.go new file mode 100644 index 0000000..90a2847 --- /dev/null +++ b/layer/utils/concurrency.go @@ -0,0 +1,82 @@ +package utils + +import ( + "sync" +) + +func SafeGoroutine(wg *sync.WaitGroup, fn func()) { + wg.Add(1) + go func() { + defer wg.Done() + defer func() { + if r := recover(); r != nil { + logger := GetLogger() + logger.Error("Panic recovered in goroutine", nil, map[string]interface{}{ + "panic": r, + }) + } + }() + fn() + }() +} + +type WorkerPool struct { + workerCount int + jobChan chan func() + wg sync.WaitGroup + once sync.Once +} + +func NewWorkerPool(workerCount int, queueSize int) *WorkerPool { + if workerCount <= 0 { + workerCount = 10 + } + if queueSize <= 0 { + queueSize = 100 + } + + wp := &WorkerPool{ + workerCount: workerCount, + jobChan: make(chan func(), queueSize), + } + + // Start workers + for i := 0; i < workerCount; i++ { + wp.wg.Add(1) + go wp.worker() + } + + return wp +} + +func (wp *WorkerPool) worker() { + defer wp.wg.Done() + for job := range wp.jobChan { + func() { + defer func() { + if r := recover(); r != nil { + logger := GetLogger() + logger.Error("Panic recovered in worker pool", nil, map[string]interface{}{ + "panic": r, + }) + } + }() + job() + }() + } +} + +func (wp *WorkerPool) Submit(job func()) { + wp.jobChan <- job +} + +func (wp *WorkerPool) Wait() { + wp.once.Do(func() { + close(wp.jobChan) + }) + wp.wg.Wait() +} + +func (wp *WorkerPool) Close() { + wp.Wait() +} diff --git a/layer/utils/constants.go b/layer/utils/constants.go index 819e7a6..c1552a0 100644 --- a/layer/utils/constants.go +++ b/layer/utils/constants.go @@ -52,6 +52,14 @@ type Claims struct { jwt.RegisteredClaims } +type User struct { + ProfileURL string `firestore:"profileURL,omitempty"` + ProfileStatus string `firestore:"profileStatus,omitempty"` + Chaincode string `firestore:"chaincode,omitempty"` + DiscordID string `firestore:"discordId,omitempty"` + UpdatedAt int64 `firestore:"updated_at,omitempty"` +} + var Constants = map[string]string{ "ENV_DEVELOPMENT": "DEVELOPMENT", "ENV_PRODUCTION": "PRODUCTION", diff --git a/layer/utils/errors.go b/layer/utils/errors.go new file mode 100644 index 0000000..b7037ff --- /dev/null +++ b/layer/utils/errors.go @@ -0,0 +1,70 @@ +package utils + +import ( + "errors" + "fmt" + + "github.com/aws/aws-lambda-go/events" +) + +var ( + ErrInvalidUserID = errors.New("invalid user ID") + ErrProfileURLNotFound = errors.New("profile URL not found") + ErrChaincodeNotFound = errors.New("chaincode not found") + ErrChaincodeEmpty = errors.New("chaincode is empty") + ErrUserDataNotFound = errors.New("user data not found") + ErrServiceDown = errors.New("profile service is down") +) + +type ProfileError struct { + Code string + Message string + Err error +} + +func (e *ProfileError) Error() string { + if e.Err != nil { + return fmt.Sprintf("%s: %v", e.Message, e.Err) + } + return e.Message +} + +func (e *ProfileError) Unwrap() error { + return e.Err +} + +func NewProfileError(code, message string, err error) *ProfileError { + return &ProfileError{ + Code: code, + Message: message, + Err: err, + } +} + +func HandleLambdaError(err error) (events.APIGatewayProxyResponse, error) { + if err == nil { + return events.APIGatewayProxyResponse{ + Body: "Internal server error", + StatusCode: 500, + }, nil + } + + if profileErr, ok := err.(*ProfileError); ok { + return events.APIGatewayProxyResponse{ + Body: fmt.Sprintf("Profile Error: %s", profileErr.Message), + StatusCode: 400, + }, nil + } + + return events.APIGatewayProxyResponse{ + Body: fmt.Sprintf("Error: %v", err), + StatusCode: 500, + }, nil +} + +func HandleProfileSkippedError(reason string) events.APIGatewayProxyResponse { + return events.APIGatewayProxyResponse{ + Body: fmt.Sprintf("Profile Skipped: %s", reason), + StatusCode: 200, + } +} diff --git a/layer/utils/firestore.go b/layer/utils/firestore.go index 3a2e60a..73da719 100644 --- a/layer/utils/firestore.go +++ b/layer/utils/firestore.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "io" - "log" "net/http" "os" "time" @@ -37,7 +36,7 @@ func InitializeFirestoreClient(ctx context.Context) (*firestore.Client, error) { return client, nil } -func getLastDiff(client *firestore.Client, ctx context.Context, userId string, approval string) (Res, string) { +func getLastDiff(client *firestore.Client, ctx context.Context, userId string, approval string) (Res, string, error) { query := client.Collection("profileDiffs").Where("userId", "==", userId).Where("approval", "==", approval).OrderBy("timestamp", firestore.Desc).Limit(1).Documents(ctx) var lastdiff Diff var lastdiffId string @@ -47,15 +46,15 @@ func getLastDiff(client *firestore.Client, ctx context.Context, userId string, a break } if err != nil { - log.Fatal(err) + return Res{}, "", fmt.Errorf("failed to iterate profile diffs: %w", err) } err = Doc.DataTo(&lastdiff) if err != nil { - log.Fatal(err) + return Res{}, "", fmt.Errorf("failed to convert diff data: %w", err) } lastdiffId = Doc.Ref.ID } - return DiffToRes(lastdiff), lastdiffId + return DiffToRes(lastdiff), lastdiffId, nil } func generateAndStoreDiff(client *firestore.Client, ctx context.Context, res Res, userId string, sessionId string) error { @@ -90,11 +89,26 @@ func SetProfileStatusBlocked(client *firestore.Client, ctx context.Context, user responseBody := bytes.NewBuffer(postBody) - httpClient := &http.Client{} - req, _ := http.NewRequest("POST", os.Getenv(Constants["DISCORD_BOT_URL"])+"/profile/blocked", responseBody) - req.Header.Add("Content-Type", "application/json") - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", tokenString)) - httpClient.Do(req) + discordURL := os.Getenv(Constants["DISCORD_BOT_URL"]) + "/profile/blocked" + req, err := http.NewRequestWithContext(ctx, "POST", discordURL, responseBody) + if err != nil { + LogWarnWithError("Failed to create Discord bot request", map[string]interface{}{ + "function": "SetProfileStatusBlocked", + "userId": userId, + }, err) + } else { + req.Header.Add("Content-Type", "application/json") + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", tokenString)) + + httpClient := CreateHTTPClient(10 * time.Second) + _, err = DoRequestWithContext(ctx, httpClient, req) + if err != nil { + LogWarnWithError("Failed to notify Discord bot", map[string]interface{}{ + "function": "SetProfileStatusBlocked", + "userId": userId, + }, err) + } + } } newLog := Log{ @@ -112,85 +126,118 @@ func SetProfileStatusBlocked(client *firestore.Client, ctx context.Context, user client.Collection("logs").Add(ctx, newLog) } -func Getdata(client *firestore.Client, ctx context.Context, userId string, userUrl string, chaincode string, userData Res, sessionId string, discordId string) string { - var status string = "" +// Getdata retrieves and processes profile data from user service +// Returns an error if there's a problem, or nil if successful +// If the profile should be skipped (same data, etc.), it returns nil but logs the skip reason +func Getdata(client *firestore.Client, ctx context.Context, userId string, userUrl string, chaincode string, userData Res, sessionId string, discordId string) error { userUrl = userUrl + "profile" hashedChaincode, err := bcrypt.GenerateFromPassword([]byte(chaincode), bcrypt.DefaultCost) if err != nil { - LogProfileSkipped(client, ctx, userId, fmt.Sprintln(err), sessionId) - SetProfileStatusBlocked(client, ctx, userId, fmt.Sprintln(err), sessionId, discordId) - return "chaincode not encrypted" + errMsg := fmt.Sprintf("chaincode encryption failed: %v", err) + LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) + return fmt.Errorf("chaincode not encrypted: %w", err) } - httpClient := &http.Client{} - req, _ := http.NewRequest("GET", userUrl, nil) + httpClient := CreateHTTPClient(30 * time.Second) + req, err := http.NewRequestWithContext(ctx, "GET", userUrl, nil) + if err != nil { + errMsg := fmt.Sprintf("failed to create request: %v", err) + LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) + return fmt.Errorf("error creating request: %w", err) + } req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", string(hashedChaincode))) - resp, err := httpClient.Do(req) + resp, err := DoRequestWithContext(ctx, httpClient, req) if err != nil { - LogProfileSkipped(client, ctx, userId, fmt.Sprintln(err), sessionId) - SetProfileStatusBlocked(client, ctx, userId, fmt.Sprintln(err), sessionId, discordId) - return "error getting profile data" + errMsg := fmt.Sprintf("failed to get profile data: %v", err) + LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) + return fmt.Errorf("error getting profile data: %w", err) } + defer resp.Body.Close() + if resp.StatusCode == 401 { - LogProfileSkipped(client, ctx, userId, "Unauthenticated Access to Profile Data", sessionId) - SetProfileStatusBlocked(client, ctx, userId, "Unauthenticated Access to Profile Data", sessionId, discordId) - resp.Body.Close() - return "unauthenticated access to profile data" + errMsg := "Unauthenticated Access to Profile Data" + LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) + return NewProfileError("UNAUTHENTICATED", errMsg, nil) } if resp.StatusCode != 200 { - LogProfileSkipped(client, ctx, userId, "Error in getting Profile Data", sessionId) - SetProfileStatusBlocked(client, ctx, userId, "Error in getting Profile Data", sessionId, discordId) - resp.Body.Close() - return "error in getting profile data" + errMsg := "Error in getting Profile Data" + LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) + return fmt.Errorf("error in getting profile data: status code %d", resp.StatusCode) } - defer resp.Body.Close() - r, err := io.ReadAll(resp.Body) if err != nil { - LogProfileSkipped(client, ctx, userId, fmt.Sprintln(err), sessionId) - SetProfileStatusBlocked(client, ctx, userId, fmt.Sprintln(err), sessionId, discordId) - return "error reading profile data" + errMsg := fmt.Sprintf("failed to read response: %v", err) + LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) + return fmt.Errorf("error reading profile data: %w", err) } var res Res err = json.Unmarshal([]byte(r), &res) if err != nil { - LogProfileSkipped(client, ctx, userId, fmt.Sprintln(err), sessionId) - SetProfileStatusBlocked(client, ctx, userId, fmt.Sprintln(err), sessionId, discordId) - return "error converting data to json" + errMsg := fmt.Sprintf("failed to unmarshal JSON: %v", err) + LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) + return fmt.Errorf("error converting data to json: %w", err) } err = res.Validate() + if err != nil { + errMsg := fmt.Sprintf("validation failed: %v", err) + LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) + return fmt.Errorf("error in validation: %w", err) + } + lastPendingDiff, lastPendingDiffId, err := getLastDiff(client, ctx, userId, "PENDING") if err != nil { - LogProfileSkipped(client, ctx, userId, fmt.Sprintln(err), sessionId) - SetProfileStatusBlocked(client, ctx, userId, fmt.Sprintln(err), sessionId, discordId) - return fmt.Sprintf("error in validation: ", err) + // Log error but continue processing + LogWarnWithError("Failed to get last pending diff", map[string]interface{}{ + "function": "Getdata", + "userId": userId, + }, err) } - lastPendingDiff, lastPendingDiffId := getLastDiff(client, ctx, userId, "PENDING") if lastPendingDiff != res && userData != res { if lastPendingDiffId != "" { SetNotApproved(client, ctx, lastPendingDiffId) } - lastRejectedDiff, lastRejectedDiffId := getLastDiff(client, ctx, userId, Constants["NOT_APPROVED"]) + lastRejectedDiff, lastRejectedDiffId, err := getLastDiff(client, ctx, userId, Constants["NOT_APPROVED"]) + if err != nil { + LogWarnWithError("Failed to get last rejected diff", map[string]interface{}{ + "function": "Getdata", + "userId": userId, + }, err) + } if lastRejectedDiff != res { - generateAndStoreDiff(client, ctx, res, userId, sessionId) + err = generateAndStoreDiff(client, ctx, res, userId, sessionId) + if err != nil { + return fmt.Errorf("failed to generate and store diff: %w", err) + } } else { - status = "same last rejected diff " + lastRejectedDiffId LogProfileSkipped(client, ctx, userId, "Last Rejected Diff is same as New Profile Data. Rejected Diff Id: "+lastRejectedDiffId, sessionId) + // This is not an error, just a skip reason + return nil } } else if userData == res { - status = "same data exists" LogProfileSkipped(client, ctx, userId, "Current User Data is same as New Profile Data", sessionId) if lastPendingDiffId != "" { SetNotApproved(client, ctx, lastPendingDiffId) } + // This is not an error, just a skip reason + return nil } else { - status = "same last pending diff" LogProfileSkipped(client, ctx, userId, "Last Pending Diff is same as New Profile Data", sessionId) + // This is not an error, just a skip reason + return nil } - return status + + return nil } func GetDataFromBody(body []byte) (string, string) { @@ -228,46 +275,57 @@ Function to get the userData using userId func GetUserData(client *firestore.Client, ctx context.Context, userId string) (string, string, string, error) { dsnap, err := client.Collection("users").Doc(userId).Get(ctx) - var profileURL string - var profileStatus string - var chaincode string if err != nil { return "", "", "", err } - if str, ok := dsnap.Data()["profileURL"].(string); ok { - profileURL = str - } else { - return "", "", "", errors.New("profile url is not a string") - } - if str, ok := dsnap.Data()["profileStatus"].(string); ok { - profileStatus = str - } else { - profileStatus = "" - } - if str, ok := dsnap.Data()["chaincode"].(string); ok { - if str != "" { - chaincode = str + var user User + err = dsnap.DataTo(&user) + if err != nil { + data := dsnap.Data() + + if profileURLVal, exists := data["profileURL"]; exists && profileURLVal != nil { + if _, ok := profileURLVal.(string); !ok { + return "", "", "", errors.New("profile url is not a string") + } } else { - newLog := Log{ - Type: "VERIFICATION_BLOCKED", - Timestamp: time.Now(), - Meta: map[string]interface{}{ - "userId": userId, - }, - Body: map[string]interface{}{ - "userId": userId, - "reason": "Chaincode is empty. Generate new one.", - }, + return "", "", "", errors.New("profile url is not a string") + } + + if chaincodeVal, exists := data["chaincode"]; exists && chaincodeVal != nil { + if _, ok := chaincodeVal.(string); !ok { + return "", "", "", errors.New("chaincode is not a string") } - client.Collection("logs").Add(ctx, newLog) - return "", "", "", errors.New("chaincode is blocked") + } else { + return "", "", "", errors.New("chaincode is not a string") } - } else { - return "", "", "", errors.New("chaincode is not a string") + + return "", "", "", fmt.Errorf("failed to convert user data: %w", err) + } + + if user.ProfileURL == "" { + return "", "", "", errors.New("profile url is not a string") + } + + if user.Chaincode == "" { + newLog := Log{ + Type: "VERIFICATION_BLOCKED", + Timestamp: time.Now(), + Meta: map[string]interface{}{ + "userId": userId, + }, + Body: map[string]interface{}{ + "userId": userId, + "reason": "Chaincode is empty. Generate new one.", + }, + } + logCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + _, _, _ = client.Collection("logs").Add(logCtx, newLog) + cancel() + return "", "", "", errors.New("chaincode is blocked") } - return profileURL, profileStatus, chaincode, nil + return user.ProfileURL, user.ProfileStatus, user.Chaincode, nil } /* diff --git a/layer/utils/handler.go b/layer/utils/handler.go new file mode 100644 index 0000000..554fd23 --- /dev/null +++ b/layer/utils/handler.go @@ -0,0 +1,41 @@ +package utils + +import ( + "context" + + "cloud.google.com/go/firestore" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" +) + +type Deps struct { + Client *firestore.Client + Ctx context.Context +} + +type HandlerFunc func(*Deps, events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) + +func InitializeLambdaWithFirestore(functionName string, handlerFunc HandlerFunc) { + ctx := context.Background() + logger := GetLogger() + + client, err := InitializeFirestoreClient(ctx) + if err != nil { + logger.Error("Failed to initialize Firestore client", err, map[string]interface{}{ + "function": functionName, + }) + return + } + + deps := &Deps{ + Client: client, + Ctx: ctx, + } + + wrappedHandler := func(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { + return handlerFunc(deps, request) + } + + lambda.Start(wrappedHandler) +} diff --git a/layer/utils/http.go b/layer/utils/http.go new file mode 100644 index 0000000..9ec3447 --- /dev/null +++ b/layer/utils/http.go @@ -0,0 +1,58 @@ +package utils + +import ( + "context" + "io" + "net/http" + "time" +) + +const DefaultHTTPTimeout = 30 * time.Second + +func CreateHTTPClient(timeout time.Duration) *http.Client { + if timeout == 0 { + timeout = DefaultHTTPTimeout + } + return &http.Client{ + Timeout: timeout, + } +} + +func DoRequestWithContext(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) { + return client.Do(req) +} + +func PostWithContext(ctx context.Context, url string, contentType string, body io.Reader, timeout time.Duration) (*http.Response, error) { + if timeout == 0 { + timeout = DefaultHTTPTimeout + } + + reqCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + client := CreateHTTPClient(timeout) + req, err := http.NewRequestWithContext(reqCtx, "POST", url, body) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", contentType) + + return client.Do(req) +} + +func GetWithContext(ctx context.Context, url string, timeout time.Duration) (*http.Response, error) { + if timeout == 0 { + timeout = DefaultHTTPTimeout + } + + reqCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + client := CreateHTTPClient(timeout) + req, err := http.NewRequestWithContext(reqCtx, "GET", url, nil) + if err != nil { + return nil, err + } + + return client.Do(req) +} diff --git a/layer/utils/logger.go b/layer/utils/logger.go new file mode 100644 index 0000000..ae11945 --- /dev/null +++ b/layer/utils/logger.go @@ -0,0 +1,139 @@ +package utils + +import ( + "encoding/json" + "fmt" + "os" + "time" +) + +type LogLevel string + +const ( + LogLevelDebug LogLevel = "DEBUG" + LogLevelInfo LogLevel = "INFO" + LogLevelWarn LogLevel = "WARN" + LogLevelError LogLevel = "ERROR" +) + +type Logger struct { + service string + env string +} + +type LogEntry struct { + Timestamp string `json:"timestamp"` + Level string `json:"level"` + Service string `json:"service"` + Message string `json:"message"` + Fields map[string]interface{} `json:"fields,omitempty"` + Error string `json:"error,omitempty"` +} + +func NewLogger(service string) *Logger { + env := os.Getenv("environment") + if env == "" { + env = "DEVELOPMENT" + } + return &Logger{ + service: service, + env: env, + } +} + +func (l *Logger) log(level LogLevel, message string, fields map[string]interface{}, err error) { + entry := LogEntry{ + Timestamp: time.Now().UTC().Format(time.RFC3339), + Level: string(level), + Service: l.service, + Message: message, + Fields: fields, + } + + if err != nil { + entry.Error = err.Error() + } + + jsonBytes, marshalErr := json.Marshal(entry) + if marshalErr != nil { + fmt.Printf("[%s] %s: %s", level, l.service, message) + if err != nil { + fmt.Printf(" - Error: %v", err) + } + if len(fields) > 0 { + fmt.Printf(" - Fields: %+v", fields) + } + fmt.Println() + return + } + + fmt.Println(string(jsonBytes)) +} + +func (l *Logger) Debug(message string, fields ...map[string]interface{}) { + mergedFields := mergeFields(fields...) + l.log(LogLevelDebug, message, mergedFields, nil) +} + +func (l *Logger) Info(message string, fields ...map[string]interface{}) { + mergedFields := mergeFields(fields...) + l.log(LogLevelInfo, message, mergedFields, nil) +} + +func (l *Logger) Warn(message string, fields ...map[string]interface{}) { + mergedFields := mergeFields(fields...) + l.log(LogLevelWarn, message, mergedFields, nil) +} + +func (l *Logger) WarnWithError(message string, fields map[string]interface{}, err error) { + l.log(LogLevelWarn, message, fields, err) +} + +func (l *Logger) Error(message string, err error, fields ...map[string]interface{}) { + mergedFields := mergeFields(fields...) + l.log(LogLevelError, message, mergedFields, err) +} + +func (l *Logger) WithFields(fields map[string]interface{}) *Logger { + return l +} + +func mergeFields(fields ...map[string]interface{}) map[string]interface{} { + result := make(map[string]interface{}) + for _, f := range fields { + for k, v := range f { + result[k] = v + } + } + return result +} + +var defaultLogger *Logger + +func init() { + defaultLogger = NewLogger("identity-service") +} + +func GetLogger() *Logger { + return defaultLogger +} + +func LogDebug(message string, fields ...map[string]interface{}) { + defaultLogger.Debug(message, fields...) +} + +func LogInfo(message string, fields ...map[string]interface{}) { + defaultLogger.Info(message, fields...) +} + +func LogWarn(message string, fields ...map[string]interface{}) { + defaultLogger.Warn(message, fields...) +} + +func LogWarnWithError(message string, fields map[string]interface{}, err error) { + defaultLogger.WarnWithError(message, fields, err) +} + +func LogError(message string, err error, fields ...map[string]interface{}) { + defaultLogger.Error(message, err, fields...) +} diff --git a/verify/helpers_test.go b/verify/helpers_test.go index 392a5e0..a79f084 100644 --- a/verify/helpers_test.go +++ b/verify/helpers_test.go @@ -210,7 +210,7 @@ func TestVerify(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - status, err := verify(server.URL+testCase.path, testCase.chaincode, testCase.salt) + status, err := verify(context.Background(), server.URL+testCase.path, testCase.chaincode, testCase.salt) assert.Equal(t, testCase.expectedStatus, status) assert.Equal(t, testCase.expectedErr, err) diff --git a/verify/main.go b/verify/main.go index 28b2025..512b3dd 100644 --- a/verify/main.go +++ b/verify/main.go @@ -8,35 +8,21 @@ import ( "fmt" "identity-service/layer/utils" "io" - "log" "math/rand" - "net/http" "time" "crypto/sha512" - "cloud.google.com/go/firestore" - "github.com/aws/aws-lambda-go/events" - "github.com/aws/aws-lambda-go/lambda" ) -/* -Structures -*/ - -type deps struct { - client *firestore.Client - ctx context.Context -} - /* Controller */ /* Function to verify the user */ -func verify(profileURL string, chaincode string, salt string) (string, error) { +func verify(ctx context.Context, profileURL string, chaincode string, salt string) (string, error) { type res struct { Hash string `json:"hash"` } @@ -46,7 +32,8 @@ func verify(profileURL string, chaincode string, salt string) (string, error) { }) responseBody := bytes.NewBuffer(postBody) - resp, err := http.Post(profileURL, "application/json", responseBody) + + resp, err := utils.PostWithContext(ctx, profileURL, "application/json", responseBody, 10*time.Second) if err != nil { return "BLOCKED", err } @@ -66,16 +53,13 @@ func verify(profileURL string, chaincode string, salt string) (string, error) { } } -/* -Main Handler Function -*/ -func (d *deps) handler(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { +func handler(d *utils.Deps, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { var userId string = utils.GetUserIdFromBody([]byte(request.Body)) if userId == "" { return events.APIGatewayProxyResponse{}, errors.New("no userId provided") } - profileURL, profileStatus, chaincode, err := utils.GetUserData(d.client, d.ctx, userId) + profileURL, profileStatus, chaincode, err := utils.GetUserData(d.Client, d.Ctx, userId) if err != nil { return events.APIGatewayProxyResponse{}, err } @@ -103,14 +87,14 @@ func (d *deps) handler(request events.APIGatewayProxyRequest) (events.APIGateway } var salt string = string(b) - status, err := verify(profileURL, chaincode, salt) + status, err := verify(d.Ctx, profileURL, chaincode, salt) if err != nil { - utils.LogVerification(d.client, d.ctx, status, profileURL, userId) - utils.SetProfileStatus(d.client, d.ctx, userId, status) + utils.LogVerification(d.Client, d.Ctx, status, profileURL, userId) + utils.SetProfileStatus(d.Client, d.Ctx, userId, status) return events.APIGatewayProxyResponse{}, err } - utils.LogVerification(d.client, d.ctx, status, profileURL, userId) - utils.SetProfileStatus(d.client, d.ctx, userId, status) + utils.LogVerification(d.Client, d.Ctx, status, profileURL, userId) + utils.SetProfileStatus(d.Client, d.Ctx, userId, status) return events.APIGatewayProxyResponse{ Body: "Verification Process Done", @@ -118,20 +102,6 @@ func (d *deps) handler(request events.APIGatewayProxyRequest) (events.APIGateway }, nil } -/* -Starts the lambda (Entry Point) -*/ func main() { - ctx := context.Background() - client, err := utils.InitializeFirestoreClient(ctx) - if err != nil { - log.Fatalf("Failed to initialize Firestore client: %v", err) - } - - d := deps{ - client: client, - ctx: ctx, - } - - lambda.Start(d.handler) + utils.InitializeLambdaWithFirestore("verify", handler) } diff --git a/verify/main_test.go b/verify/main_test.go index 4e9b152..65cf259 100644 --- a/verify/main_test.go +++ b/verify/main_test.go @@ -19,6 +19,7 @@ import ( "github.com/aws/aws-lambda-go/events" "github.com/stretchr/testify/assert" + "identity-service/layer/utils" ) func newFirestoreMockClient(ctx context.Context) *firestore.Client { @@ -141,11 +142,11 @@ func TestVerifyFunction(t *testing.T) { defer server.Close() if testCase.name == "network timeout" { - status, err := verify(server.URL+testCase.profileURL, testCase.chaincode, testCase.salt) + status, err := verify(context.Background(), server.URL+testCase.profileURL, testCase.chaincode, testCase.salt) assert.Equal(t, testCase.expectedStatus, status) assert.True(t, testCase.expectedError == (err != nil)) } else { - status, err := verify(server.URL+testCase.profileURL, testCase.chaincode, testCase.salt) + status, err := verify(context.Background(), server.URL+testCase.profileURL, testCase.chaincode, testCase.salt) assert.Equal(t, testCase.expectedStatus, status) assert.True(t, testCase.expectedError == (err != nil)) } @@ -255,14 +256,14 @@ func TestHandler(t *testing.T) { }, } - d := deps{ - client: client, - ctx: ctx, + d := &utils.Deps{ + Client: client, + Ctx: ctx, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - response, err := d.handler(testCase.request) + response, err := handler(d, testCase.request) if testCase.name == "non-existent user" { assert.Error(t, err) } else { @@ -346,16 +347,16 @@ func TestURLFormatting(t *testing.T) { {Path: "profileURL", Value: server.URL}, }) - d := deps{ - client: client, - ctx: ctx, + d := &utils.Deps{ + Client: client, + Ctx: ctx, } request := events.APIGatewayProxyRequest{ Body: fmt.Sprintf(`{ "userId": "%s" }`, userId), } - response, err := d.handler(request) + response, err := handler(d, request) assert.NoError(t, err) assert.Equal(t, 200, response.StatusCode) @@ -401,16 +402,16 @@ func TestSaltGeneration(t *testing.T) { {Path: "profileURL", Value: server.URL}, }) - d := deps{ - client: client, - ctx: ctx, + d := &utils.Deps{ + Client: client, + Ctx: ctx, } request := events.APIGatewayProxyRequest{ Body: fmt.Sprintf(`{ "userId": "%s" }`, userId), } - response, err := d.handler(request) + response, err := handler(d, request) assert.NoError(t, err) assert.Equal(t, 200, response.StatusCode) @@ -503,16 +504,16 @@ func TestHandlerEdgeCases(t *testing.T) { t.Fatalf("failed to add user: %v", err) } - d := deps{ - client: client, - ctx: ctx, + d := &utils.Deps{ + Client: client, + Ctx: ctx, } request := events.APIGatewayProxyRequest{ Body: testCase.requestBody, } - response, err := d.handler(request) + response, err := handler(d, request) assert.Error(t, err) assert.Contains(t, err.Error(), testCase.expectedError) @@ -528,7 +529,7 @@ func TestVerifyFunctionCompleteCoverage(t *testing.T) { })) defer server.Close() - status, err := verify(server.URL+"/verify", "testchaincode", "testsalt") + status, err := verify(context.Background(), server.URL+"/verify", "testchaincode", "testsalt") assert.Equal(t, "BLOCKED", status) assert.NoError(t, err) @@ -538,11 +539,11 @@ func TestVerifyFunctionCompleteCoverage(t *testing.T) { })) defer server2.Close() - status, err = verify(server2.URL+"/verify", "testchaincode", "testsalt") + status, err = verify(context.Background(), server2.URL+"/verify", "testchaincode", "testsalt") assert.Equal(t, "BLOCKED", status) assert.NoError(t, err) - status, err = verify("http://192.168.1.1:99999/verify", "testchaincode", "testsalt") + status, err = verify(context.Background(), "http://192.168.1.1:99999/verify", "testchaincode", "testsalt") assert.Equal(t, "BLOCKED", status) assert.Error(t, err) @@ -551,7 +552,7 @@ func TestVerifyFunctionCompleteCoverage(t *testing.T) { })) server3.Close() - status, err = verify(server3.URL+"/verify", "testchaincode", "testsalt") + status, err = verify(context.Background(), server3.URL+"/verify", "testchaincode", "testsalt") assert.Equal(t, "BLOCKED", status) assert.Error(t, err) } @@ -560,10 +561,10 @@ func TestMainFunctionComponents(t *testing.T) { ctx := context.Background() assert.NotNil(t, ctx) - d := deps{ - client: nil, - ctx: ctx, + d := &utils.Deps{ + Client: nil, + Ctx: ctx, } assert.NotNil(t, d) - assert.Equal(t, ctx, d.ctx) + assert.Equal(t, ctx, d.Ctx) } From 8493dcd2017c49b767a4d0f5c970e1f14f2ae8ae Mon Sep 17 00:00:00 2001 From: lakshayman Date: Mon, 10 Nov 2025 13:45:02 +0530 Subject: [PATCH 2/2] fix: coderrabit suggestions --- call-profile/main.go | 12 +++--- health-check/main.go | 4 +- layer/utils/errors.go | 15 +++++--- layer/utils/firestore.go | 60 +++++++++++++++++------------- layer/utils/handler.go | 24 ++++++++---- layer/utils/http.go | 79 +++++++++++++++++++++++++++++++--------- layer/utils/logger.go | 42 ++++++++++++++++----- 7 files changed, 162 insertions(+), 74 deletions(-) diff --git a/call-profile/main.go b/call-profile/main.go index 5d0ae61..a1f9da1 100644 --- a/call-profile/main.go +++ b/call-profile/main.go @@ -30,7 +30,7 @@ func handler(d *utils.Deps, request events.APIGatewayProxyRequest) (events.APIGa var user utils.User err = dsnap.DataTo(&user) if err != nil { - utils.LogProfileSkipped(d.Client, d.Ctx, userId, "UserData Type Error: "+fmt.Sprintln(err), sessionId) + utils.LogProfileSkipped(d.Client, d.Ctx, "UserData Type Error: "+fmt.Sprintln(err), userId, sessionId) return events.APIGatewayProxyResponse{ Body: "Profile Skipped No User Data", StatusCode: 200, @@ -40,7 +40,7 @@ func handler(d *utils.Deps, request events.APIGatewayProxyRequest) (events.APIGa discordId := user.DiscordID if user.ProfileURL == "" { - utils.LogProfileSkipped(d.Client, d.Ctx, userId, "Profile URL not available", sessionId) + utils.LogProfileSkipped(d.Client, d.Ctx, "Profile URL not available", userId, sessionId) utils.SetProfileStatusBlocked(d.Client, d.Ctx, userId, "Profile URL not available", sessionId, discordId) return events.APIGatewayProxyResponse{ Body: "Profile Skipped No Profile URL", @@ -50,7 +50,7 @@ func handler(d *utils.Deps, request events.APIGatewayProxyRequest) (events.APIGa _, chaincodeExists := data["chaincode"] if !chaincodeExists { - utils.LogProfileSkipped(d.Client, d.Ctx, userId, "Chaincode Not Found", sessionId) + utils.LogProfileSkipped(d.Client, d.Ctx, "Chaincode Not Found", userId, sessionId) utils.SetProfileStatusBlocked(d.Client, d.Ctx, userId, "Chaincode Not Found", sessionId, discordId) return events.APIGatewayProxyResponse{ Body: "Profile Skipped Chaincode Not Found", @@ -59,7 +59,7 @@ func handler(d *utils.Deps, request events.APIGatewayProxyRequest) (events.APIGa } if user.Chaincode == "" { - utils.LogProfileSkipped(d.Client, d.Ctx, userId, "Profile Service Blocked or Chaincode is empty", sessionId) + utils.LogProfileSkipped(d.Client, d.Ctx, "Profile Service Blocked or Chaincode is empty", userId, sessionId) utils.SetProfileStatusBlocked(d.Client, d.Ctx, userId, "Profile Service Blocked or Chaincode is empty", sessionId, discordId) return events.APIGatewayProxyResponse{ Body: "Profile Skipped Profile Service Blocked", @@ -73,7 +73,7 @@ func handler(d *utils.Deps, request events.APIGatewayProxyRequest) (events.APIGa var userData utils.Diff err = dsnap.DataTo(&userData) if err != nil { - utils.LogProfileSkipped(d.Client, d.Ctx, userId, "UserData Type Error: "+fmt.Sprintln(err), sessionId) + utils.LogProfileSkipped(d.Client, d.Ctx, "UserData Type Error: "+fmt.Sprintln(err), userId, sessionId) return events.APIGatewayProxyResponse{ Body: "Profile Skipped No User Data", StatusCode: 200, @@ -94,7 +94,7 @@ func handler(d *utils.Deps, request events.APIGatewayProxyRequest) (events.APIGa utils.LogHealth(d.Client, d.Ctx, userId, isServiceRunning, sessionId) if !isServiceRunning { - utils.LogProfileSkipped(d.Client, d.Ctx, userId, "Profile Service Down", sessionId) + utils.LogProfileSkipped(d.Client, d.Ctx, "Profile Service Down", userId, sessionId) utils.SetProfileStatusBlocked(d.Client, d.Ctx, userId, "Profile Service Down", sessionId, discordId) return events.APIGatewayProxyResponse{ Body: "Profile Skipped Service Down", diff --git a/health-check/main.go b/health-check/main.go index 1e6cd00..382a885 100644 --- a/health-check/main.go +++ b/health-check/main.go @@ -29,10 +29,10 @@ func callProfileHealth(userUrl string) { requestURL := fmt.Sprintf("%shealth", userUrl) _, err1 := utils.GetWithContext(context.Background(), requestURL, 2*time.Second) if err1 != nil { - logger.WarnWithError("Service not running", map[string]interface{}{ + logger.WarnWithError("Service not running", err1, map[string]interface{}{ "function": "callProfileHealth", "profileURL": userUrl, - }, err1) + }) } } diff --git a/layer/utils/errors.go b/layer/utils/errors.go index b7037ff..0aae88f 100644 --- a/layer/utils/errors.go +++ b/layer/utils/errors.go @@ -19,6 +19,7 @@ var ( type ProfileError struct { Code string Message string + Status int Err error } @@ -33,26 +34,28 @@ func (e *ProfileError) Unwrap() error { return e.Err } -func NewProfileError(code, message string, err error) *ProfileError { +func NewProfileError(code, message string, status int, err error) *ProfileError { return &ProfileError{ Code: code, Message: message, + Status: status, Err: err, } } func HandleLambdaError(err error) (events.APIGatewayProxyResponse, error) { if err == nil { - return events.APIGatewayProxyResponse{ - Body: "Internal server error", - StatusCode: 500, - }, nil + panic("HandleLambdaError called with nil error - this indicates a programming error") } if profileErr, ok := err.(*ProfileError); ok { + statusCode := profileErr.Status + if statusCode == 0 { + statusCode = 400 + } return events.APIGatewayProxyResponse{ Body: fmt.Sprintf("Profile Error: %s", profileErr.Message), - StatusCode: 400, + StatusCode: statusCode, }, nil } diff --git a/layer/utils/firestore.go b/layer/utils/firestore.go index 73da719..dc71b35 100644 --- a/layer/utils/firestore.go +++ b/layer/utils/firestore.go @@ -92,21 +92,28 @@ func SetProfileStatusBlocked(client *firestore.Client, ctx context.Context, user discordURL := os.Getenv(Constants["DISCORD_BOT_URL"]) + "/profile/blocked" req, err := http.NewRequestWithContext(ctx, "POST", discordURL, responseBody) if err != nil { - LogWarnWithError("Failed to create Discord bot request", map[string]interface{}{ + LogWarnWithError("Failed to create Discord bot request", err, map[string]interface{}{ "function": "SetProfileStatusBlocked", "userId": userId, - }, err) + }) } else { req.Header.Add("Content-Type", "application/json") req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", tokenString)) - httpClient := CreateHTTPClient(10 * time.Second) - _, err = DoRequestWithContext(ctx, httpClient, req) + // Create context with timeout and use client without timeout + reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + req = req.WithContext(reqCtx) + httpClient := &http.Client{} // No timeout - rely on context + resp, err := httpClient.Do(req) + if resp != nil && resp.Body != nil { + resp.Body.Close() + } if err != nil { - LogWarnWithError("Failed to notify Discord bot", map[string]interface{}{ + LogWarnWithError("Failed to notify Discord bot", err, map[string]interface{}{ "function": "SetProfileStatusBlocked", "userId": userId, - }, err) + }) } } } @@ -134,24 +141,27 @@ func Getdata(client *firestore.Client, ctx context.Context, userId string, userU hashedChaincode, err := bcrypt.GenerateFromPassword([]byte(chaincode), bcrypt.DefaultCost) if err != nil { errMsg := fmt.Sprintf("chaincode encryption failed: %v", err) - LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + LogProfileSkipped(client, ctx, errMsg, userId, sessionId) SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) return fmt.Errorf("chaincode not encrypted: %w", err) } - httpClient := CreateHTTPClient(30 * time.Second) - req, err := http.NewRequestWithContext(ctx, "GET", userUrl, nil) + // Create context with timeout and use client without timeout + reqCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(reqCtx, "GET", userUrl, nil) if err != nil { errMsg := fmt.Sprintf("failed to create request: %v", err) - LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + LogProfileSkipped(client, ctx, errMsg, userId, sessionId) SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) return fmt.Errorf("error creating request: %w", err) } req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", string(hashedChaincode))) - resp, err := DoRequestWithContext(ctx, httpClient, req) + httpClient := &http.Client{} // No timeout - rely on context + resp, err := httpClient.Do(req) if err != nil { errMsg := fmt.Sprintf("failed to get profile data: %v", err) - LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + LogProfileSkipped(client, ctx, errMsg, userId, sessionId) SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) return fmt.Errorf("error getting profile data: %w", err) } @@ -159,13 +169,13 @@ func Getdata(client *firestore.Client, ctx context.Context, userId string, userU if resp.StatusCode == 401 { errMsg := "Unauthenticated Access to Profile Data" - LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + LogProfileSkipped(client, ctx, errMsg, userId, sessionId) SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) - return NewProfileError("UNAUTHENTICATED", errMsg, nil) + return NewProfileError("UNAUTHENTICATED", errMsg, 401, nil) } if resp.StatusCode != 200 { errMsg := "Error in getting Profile Data" - LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + LogProfileSkipped(client, ctx, errMsg, userId, sessionId) SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) return fmt.Errorf("error in getting profile data: status code %d", resp.StatusCode) } @@ -173,7 +183,7 @@ func Getdata(client *firestore.Client, ctx context.Context, userId string, userU r, err := io.ReadAll(resp.Body) if err != nil { errMsg := fmt.Sprintf("failed to read response: %v", err) - LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + LogProfileSkipped(client, ctx, errMsg, userId, sessionId) SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) return fmt.Errorf("error reading profile data: %w", err) } @@ -181,7 +191,7 @@ func Getdata(client *firestore.Client, ctx context.Context, userId string, userU err = json.Unmarshal([]byte(r), &res) if err != nil { errMsg := fmt.Sprintf("failed to unmarshal JSON: %v", err) - LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + LogProfileSkipped(client, ctx, errMsg, userId, sessionId) SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) return fmt.Errorf("error converting data to json: %w", err) } @@ -189,7 +199,7 @@ func Getdata(client *firestore.Client, ctx context.Context, userId string, userU err = res.Validate() if err != nil { errMsg := fmt.Sprintf("validation failed: %v", err) - LogProfileSkipped(client, ctx, userId, errMsg, sessionId) + LogProfileSkipped(client, ctx, errMsg, userId, sessionId) SetProfileStatusBlocked(client, ctx, userId, errMsg, sessionId, discordId) return fmt.Errorf("error in validation: %w", err) } @@ -197,10 +207,10 @@ func Getdata(client *firestore.Client, ctx context.Context, userId string, userU lastPendingDiff, lastPendingDiffId, err := getLastDiff(client, ctx, userId, "PENDING") if err != nil { // Log error but continue processing - LogWarnWithError("Failed to get last pending diff", map[string]interface{}{ + LogWarnWithError("Failed to get last pending diff", err, map[string]interface{}{ "function": "Getdata", "userId": userId, - }, err) + }) } if lastPendingDiff != res && userData != res { @@ -209,10 +219,10 @@ func Getdata(client *firestore.Client, ctx context.Context, userId string, userU } lastRejectedDiff, lastRejectedDiffId, err := getLastDiff(client, ctx, userId, Constants["NOT_APPROVED"]) if err != nil { - LogWarnWithError("Failed to get last rejected diff", map[string]interface{}{ + LogWarnWithError("Failed to get last rejected diff", err, map[string]interface{}{ "function": "Getdata", "userId": userId, - }, err) + }) } if lastRejectedDiff != res { err = generateAndStoreDiff(client, ctx, res, userId, sessionId) @@ -220,19 +230,19 @@ func Getdata(client *firestore.Client, ctx context.Context, userId string, userU return fmt.Errorf("failed to generate and store diff: %w", err) } } else { - LogProfileSkipped(client, ctx, userId, "Last Rejected Diff is same as New Profile Data. Rejected Diff Id: "+lastRejectedDiffId, sessionId) + LogProfileSkipped(client, ctx, "Last Rejected Diff is same as New Profile Data. Rejected Diff Id: "+lastRejectedDiffId, userId, sessionId) // This is not an error, just a skip reason return nil } } else if userData == res { - LogProfileSkipped(client, ctx, userId, "Current User Data is same as New Profile Data", sessionId) + LogProfileSkipped(client, ctx, "Current User Data is same as New Profile Data", userId, sessionId) if lastPendingDiffId != "" { SetNotApproved(client, ctx, lastPendingDiffId) } // This is not an error, just a skip reason return nil } else { - LogProfileSkipped(client, ctx, userId, "Last Pending Diff is same as New Profile Data", sessionId) + LogProfileSkipped(client, ctx, "Last Pending Diff is same as New Profile Data", userId, sessionId) // This is not an error, just a skip reason return nil } diff --git a/layer/utils/handler.go b/layer/utils/handler.go index 554fd23..79d9f99 100644 --- a/layer/utils/handler.go +++ b/layer/utils/handler.go @@ -2,6 +2,7 @@ package utils import ( "context" + "time" "cloud.google.com/go/firestore" @@ -17,23 +18,32 @@ type Deps struct { type HandlerFunc func(*Deps, events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) func InitializeLambdaWithFirestore(functionName string, handlerFunc HandlerFunc) { - ctx := context.Background() + initCtx := context.Background() logger := GetLogger() - client, err := InitializeFirestoreClient(ctx) + client, err := InitializeFirestoreClient(initCtx) if err != nil { logger.Error("Failed to initialize Firestore client", err, map[string]interface{}{ "function": functionName, }) + lambda.Start(func(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { + return events.APIGatewayProxyResponse{ + StatusCode: 500, + Body: "Service initialization failed", + }, nil + }) return } - deps := &Deps{ - Client: client, - Ctx: ctx, - } - wrappedHandler := func(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { + reqCtx, cancel := context.WithTimeout(context.Background(), 55*time.Second) + defer cancel() + + deps := &Deps{ + Client: client, + Ctx: reqCtx, + } + return handlerFunc(deps, request) } diff --git a/layer/utils/http.go b/layer/utils/http.go index 9ec3447..58721de 100644 --- a/layer/utils/http.go +++ b/layer/utils/http.go @@ -4,55 +4,98 @@ import ( "context" "io" "net/http" + "sync" "time" ) const DefaultHTTPTimeout = 30 * time.Second +var ( + defaultClient *http.Client + defaultClientOnce sync.Once + + clientCache = make(map[time.Duration]*http.Client) + clientCacheLock sync.RWMutex + + // Shared HTTP client without timeout - relies on context for timeout control + sharedHTTPClient = &http.Client{} +) + +func getDefaultClient() *http.Client { + defaultClientOnce.Do(func() { + defaultClient = &http.Client{ + Timeout: DefaultHTTPTimeout, + } + }) + return defaultClient +} + func CreateHTTPClient(timeout time.Duration) *http.Client { if timeout == 0 { timeout = DefaultHTTPTimeout } - return &http.Client{ + + if timeout == DefaultHTTPTimeout { + return getDefaultClient() + } + + clientCacheLock.RLock() + if client, exists := clientCache[timeout]; exists { + clientCacheLock.RUnlock() + return client + } + clientCacheLock.RUnlock() + + clientCacheLock.Lock() + defer clientCacheLock.Unlock() + + if client, exists := clientCache[timeout]; exists { + return client + } + + client := &http.Client{ Timeout: timeout, } + clientCache[timeout] = client + return client } func DoRequestWithContext(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) { - return client.Do(req) + return client.Do(req.WithContext(ctx)) } -func PostWithContext(ctx context.Context, url string, contentType string, body io.Reader, timeout time.Duration) (*http.Response, error) { +func setupRequest(ctx context.Context, method string, url string, body io.Reader, timeout time.Duration) (context.CancelFunc, *http.Request, error) { if timeout == 0 { timeout = DefaultHTTPTimeout } reqCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - client := CreateHTTPClient(timeout) - req, err := http.NewRequestWithContext(reqCtx, "POST", url, body) + req, err := http.NewRequestWithContext(reqCtx, method, url, body) if err != nil { - return nil, err + cancel() + return nil, nil, err } - req.Header.Set("Content-Type", contentType) - return client.Do(req) + return cancel, req, nil } -func GetWithContext(ctx context.Context, url string, timeout time.Duration) (*http.Response, error) { - if timeout == 0 { - timeout = DefaultHTTPTimeout +func PostWithContext(ctx context.Context, url string, contentType string, body io.Reader, timeout time.Duration) (*http.Response, error) { + cancel, req, err := setupRequest(ctx, "POST", url, body, timeout) + if err != nil { + return nil, err } - - reqCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - client := CreateHTTPClient(timeout) - req, err := http.NewRequestWithContext(reqCtx, "GET", url, nil) + req.Header.Set("Content-Type", contentType) + return sharedHTTPClient.Do(req) +} + +func GetWithContext(ctx context.Context, url string, timeout time.Duration) (*http.Response, error) { + cancel, req, err := setupRequest(ctx, "GET", url, nil, timeout) if err != nil { return nil, err } + defer cancel() - return client.Do(req) + return sharedHTTPClient.Do(req) } diff --git a/layer/utils/logger.go b/layer/utils/logger.go index ae11945..f9c0f09 100644 --- a/layer/utils/logger.go +++ b/layer/utils/logger.go @@ -17,8 +17,9 @@ const ( ) type Logger struct { - service string - env string + service string + env string + contextFields map[string]interface{} } type LogEntry struct { @@ -36,18 +37,27 @@ func NewLogger(service string) *Logger { env = "DEVELOPMENT" } return &Logger{ - service: service, - env: env, + service: service, + env: env, + contextFields: make(map[string]interface{}), } } func (l *Logger) log(level LogLevel, message string, fields map[string]interface{}, err error) { + mergedFields := make(map[string]interface{}) + for k, v := range l.contextFields { + mergedFields[k] = v + } + for k, v := range fields { + mergedFields[k] = v + } + entry := LogEntry{ Timestamp: time.Now().UTC().Format(time.RFC3339), Level: string(level), Service: l.service, Message: message, - Fields: fields, + Fields: mergedFields, } if err != nil { @@ -85,8 +95,9 @@ func (l *Logger) Warn(message string, fields ...map[string]interface{}) { l.log(LogLevelWarn, message, mergedFields, nil) } -func (l *Logger) WarnWithError(message string, fields map[string]interface{}, err error) { - l.log(LogLevelWarn, message, fields, err) +func (l *Logger) WarnWithError(message string, err error, fields ...map[string]interface{}) { + mergedFields := mergeFields(fields...) + l.log(LogLevelWarn, message, mergedFields, err) } func (l *Logger) Error(message string, err error, fields ...map[string]interface{}) { @@ -95,7 +106,18 @@ func (l *Logger) Error(message string, err error, fields ...map[string]interface } func (l *Logger) WithFields(fields map[string]interface{}) *Logger { - return l + newLogger := &Logger{ + service: l.service, + env: l.env, + contextFields: make(map[string]interface{}), + } + for k, v := range l.contextFields { + newLogger.contextFields[k] = v + } + for k, v := range fields { + newLogger.contextFields[k] = v + } + return newLogger } func mergeFields(fields ...map[string]interface{}) map[string]interface{} { @@ -130,8 +152,8 @@ func LogWarn(message string, fields ...map[string]interface{}) { defaultLogger.Warn(message, fields...) } -func LogWarnWithError(message string, fields map[string]interface{}, err error) { - defaultLogger.WarnWithError(message, fields, err) +func LogWarnWithError(message string, err error, fields ...map[string]interface{}) { + defaultLogger.WarnWithError(message, err, fields...) } func LogError(message string, err error, fields ...map[string]interface{}) {