From 10ccb9a5f20c652cdd26b0f379a458550cea6d14 Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Thu, 30 Oct 2025 09:59:47 +0700 Subject: [PATCH 1/3] [push] support multiple events per token per batch --- .../sms-gateway/modules/push/fcm/client.go | 18 ++--- internal/sms-gateway/modules/push/metrics.go | 17 ++--- internal/sms-gateway/modules/push/service.go | 65 +++++++++++-------- internal/sms-gateway/modules/push/types.go | 2 +- .../sms-gateway/modules/push/types/types.go | 5 ++ .../modules/push/upstream/client.go | 40 +++++++----- 6 files changed, 81 insertions(+), 66 deletions(-) diff --git a/internal/sms-gateway/modules/push/fcm/client.go b/internal/sms-gateway/modules/push/fcm/client.go index 3f3e294c..44649544 100644 --- a/internal/sms-gateway/modules/push/fcm/client.go +++ b/internal/sms-gateway/modules/push/fcm/client.go @@ -52,25 +52,25 @@ func (c *Client) Open(ctx context.Context) error { return nil } -func (c *Client) Send(ctx context.Context, messages map[string]types.Event) (map[string]error, error) { - errs := make(map[string]error, len(messages)) - for address, payload := range messages { - eventMap, err := eventToMap(payload) +func (c *Client) Send(ctx context.Context, messages []types.Message) ([]error, error) { + errs := make([]error, len(messages)) + + for i, message := range messages { + data, err := eventToMap(message.Event) if err != nil { - errs[address] = fmt.Errorf("can't marshal event: %w", err) + errs[i] = fmt.Errorf("can't marshal event: %w", err) continue } _, err = c.client.Send(ctx, &messaging.Message{ - Data: eventMap, + Data: data, Android: &messaging.AndroidConfig{ Priority: "high", }, - Token: address, + Token: message.Token, }) - if err != nil { - errs[address] = fmt.Errorf("can't send message to %s: %w", address, err) + errs[i] = fmt.Errorf("can't send message: %w", err) } } diff --git a/internal/sms-gateway/modules/push/metrics.go b/internal/sms-gateway/modules/push/metrics.go index a4d8d5d7..aae9cdf9 100644 --- a/internal/sms-gateway/modules/push/metrics.go +++ b/internal/sms-gateway/modules/push/metrics.go @@ -5,13 +5,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) -type RetryOutcome string - -const ( - RetryOutcomeRetried RetryOutcome = "retried" - RetryOutcomeMaxAttempts RetryOutcome = "max_attempts" -) - type BlacklistOperation string const ( @@ -21,7 +14,7 @@ const ( type metrics struct { enqueuedCounter *prometheus.CounterVec - retriesCounter *prometheus.CounterVec + retriesCounter prometheus.Counter blacklistCounter *prometheus.CounterVec errorsCounter *prometheus.CounterVec } @@ -35,12 +28,12 @@ func newMetrics() *metrics { Help: "Total number of messages enqueued", }, []string{"event"}), - retriesCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + retriesCounter: promauto.NewCounter(prometheus.CounterOpts{ Namespace: "sms", Subsystem: "push", Name: "retries_total", Help: "Total retry attempts", - }, []string{"outcome"}), + }), blacklistCounter: promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "sms", @@ -62,8 +55,8 @@ func (m *metrics) IncEnqueued(event string) { m.enqueuedCounter.WithLabelValues(event).Inc() } -func (m *metrics) IncRetry(outcome RetryOutcome) { - m.retriesCounter.WithLabelValues(string(outcome)).Inc() +func (m *metrics) IncRetry() { + m.retriesCounter.Inc() } func (m *metrics) IncBlacklist(operation BlacklistOperation) { diff --git a/internal/sms-gateway/modules/push/service.go b/internal/sms-gateway/modules/push/service.go index c5fca738..8150c08a 100644 --- a/internal/sms-gateway/modules/push/service.go +++ b/internal/sms-gateway/modules/push/service.go @@ -130,77 +130,90 @@ func (s *Service) sendAll(ctx context.Context) { return } - wrappers := lo.MapEntries( - rawEvents, - func(key string, value []byte) (string, *eventWrapper) { + wrappers := lo.FilterMap( + lo.Values(rawEvents), + func(value []byte, _ int) (*eventWrapper, bool) { wrapper := new(eventWrapper) if err := wrapper.deserialize(value); err != nil { s.metrics.IncError(1) - s.logger.Error("Failed to deserialize event wrapper", zap.String("key", key), zap.Binary("value", value), zap.Error(err)) - return "", nil + s.logger.Error("Failed to deserialize event wrapper", zap.Binary("value", value), zap.Error(err)) + return nil, false } - return wrapper.Token, wrapper + return wrapper, true }, ) - delete(wrappers, "") - messages := lo.MapValues( + messages := lo.Map( wrappers, - func(value *eventWrapper, key string) Event { - return value.Event + func(wrapper *eventWrapper, _ int) types.Message { + return types.Message{ + Token: wrapper.Token, + Event: wrapper.Event, + } }, ) - s.logger.Info("Sending messages", zap.Int("count", len(messages))) + totalMessages := len(messages) + if totalMessages == 0 { + return + } + + s.logger.Info("sending messages", zap.Int("total", totalMessages)) + sendCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) defer cancel() - errs, err := s.client.Send(sendCtx, messages) if len(errs) == 0 && err == nil { - s.logger.Info("Messages sent successfully", zap.Int("count", len(messages))) + s.logger.Info("messages sent successfully", zap.Int("total", totalMessages)) return } if err != nil { - s.metrics.IncError(len(messages)) - s.logger.Error("Can't send messages", zap.Error(err)) + s.metrics.IncError(totalMessages) + s.logger.Error("failed to send messages", zap.Int("total", totalMessages), zap.Error(err)) return } - s.metrics.IncError(len(errs)) + totalErrors := lo.CountBy(errs, func(err error) bool { return err != nil }) + s.metrics.IncError(totalErrors) + + for i, err := range errs { + if err == nil { + continue + } - for token, sendErr := range errs { - s.logger.Error("Can't send message", zap.Error(sendErr), zap.String("token", token)) + wrapper := wrappers[i] + token := wrapper.Token - wrapper := wrappers[token] wrapper.Retries++ if wrapper.Retries >= maxRetries { if err := s.blacklist.Set(ctx, token, []byte{}, cacheImpl.WithTTL(blacklistTimeout)); err != nil { - s.logger.Warn("Can't add to blacklist", zap.String("token", token), zap.Error(err)) + s.logger.Warn("failed to blacklist", zap.String("token", token), zap.Error(err)) + continue } s.metrics.IncBlacklist(BlacklistOperationAdded) - s.metrics.IncRetry(RetryOutcomeMaxAttempts) - s.logger.Warn("Retries exceeded, blacklisting token", + s.logger.Warn("retries exceeded, blacklisting token", zap.String("token", token), - zap.Duration("ttl", blacklistTimeout)) + zap.Duration("ttl", blacklistTimeout), + ) continue } wrapperData, err := wrapper.serialize() if err != nil { s.metrics.IncError(1) - s.logger.Error("Can't serialize event wrapper", zap.Error(err)) + s.logger.Error("failed to serialize event wrapper", zap.Error(err)) continue } if setErr := s.events.SetOrFail(ctx, wrapper.key(), wrapperData); setErr != nil { - s.logger.Warn("Can't set message to cache", zap.Error(setErr)) + s.logger.Warn("failed to set message to cache", zap.String("key", wrapper.key()), zap.Error(setErr)) continue } - s.metrics.IncRetry(RetryOutcomeRetried) + s.metrics.IncRetry() } } diff --git a/internal/sms-gateway/modules/push/types.go b/internal/sms-gateway/modules/push/types.go index eea6a518..72ef8da0 100644 --- a/internal/sms-gateway/modules/push/types.go +++ b/internal/sms-gateway/modules/push/types.go @@ -18,7 +18,7 @@ type Event = types.Event type client interface { Open(ctx context.Context) error - Send(ctx context.Context, messages map[string]Event) (map[string]error, error) + Send(ctx context.Context, messages []types.Message) ([]error, error) Close(ctx context.Context) error } diff --git a/internal/sms-gateway/modules/push/types/types.go b/internal/sms-gateway/modules/push/types/types.go index cdfb6d61..48312a59 100644 --- a/internal/sms-gateway/modules/push/types/types.go +++ b/internal/sms-gateway/modules/push/types/types.go @@ -4,6 +4,11 @@ import ( "github.com/android-sms-gateway/client-go/smsgateway" ) +type Message struct { + Token string + Event Event +} + type Event struct { Type smsgateway.PushEventType `json:"type"` Data map[string]string `json:"data"` diff --git a/internal/sms-gateway/modules/push/upstream/client.go b/internal/sms-gateway/modules/push/upstream/client.go index 8793508b..bb81bd18 100644 --- a/internal/sms-gateway/modules/push/upstream/client.go +++ b/internal/sms-gateway/modules/push/upstream/client.go @@ -11,7 +11,7 @@ import ( "github.com/android-sms-gateway/client-go/smsgateway" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types" - "github.com/capcom6/go-helpers/maps" + "github.com/samber/lo" ) const BASE_URL = "https://api.sms-gate.app/upstream/v1" @@ -42,18 +42,19 @@ func (c *Client) Open(ctx context.Context) error { return nil } -func (c *Client) Send(ctx context.Context, messages map[string]types.Event) (map[string]error, error) { - payload := make(smsgateway.UpstreamPushRequest, 0, len(messages)) +func (c *Client) Send(ctx context.Context, messages []types.Message) ([]error, error) { + payload := lo.Map( + messages, + func(item types.Message, _ int) smsgateway.PushNotification { + return smsgateway.PushNotification{ + Token: item.Token, + Event: item.Event.Type, + Data: item.Event.Data, + } + }, + ) - for address, data := range messages { - payload = append(payload, smsgateway.PushNotification{ - Token: address, - Event: data.Type, - Data: data.Data, - }) - } - - payloadBytes, err := json.Marshal(payload) + payloadBytes, err := json.Marshal(smsgateway.UpstreamPushRequest(payload)) if err != nil { return nil, fmt.Errorf("can't marshal payload: %w", err) @@ -65,11 +66,11 @@ func (c *Client) Send(ctx context.Context, messages map[string]types.Event) (map } req.Header.Set("Content-Type", "application/json") - req.Header.Set("User-Agent", "android-sms-gateway/1.x (server; golang)") + req.Header.Set("User-Agent", "sms-gate/1.x (server; golang)") resp, err := c.client.Do(req) if err != nil { - return c.mapErrors(messages, fmt.Errorf("can't send request: %w", err)), nil + return nil, fmt.Errorf("can't send request: %w", err) } defer func() { @@ -84,10 +85,13 @@ func (c *Client) Send(ctx context.Context, messages map[string]types.Event) (map return nil, nil } -func (c *Client) mapErrors(messages map[string]types.Event, err error) map[string]error { - return maps.MapValues(messages, func(e types.Event) error { - return err - }) +func (c *Client) mapErrors(messages []types.Message, err error) []error { + return lo.Map( + messages, + func(_ types.Message, _ int) error { + return err + }, + ) } func (c *Client) Close(ctx context.Context) error { From 3ee510300590e1313448e27553457ae0623e1419 Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Thu, 30 Oct 2025 14:39:17 +0700 Subject: [PATCH 2/3] [push] retry on infra error --- internal/sms-gateway/modules/push/service.go | 30 +++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/internal/sms-gateway/modules/push/service.go b/internal/sms-gateway/modules/push/service.go index 8150c08a..207b1b34 100644 --- a/internal/sms-gateway/modules/push/service.go +++ b/internal/sms-gateway/modules/push/service.go @@ -172,18 +172,34 @@ func (s *Service) sendAll(ctx context.Context) { if err != nil { s.metrics.IncError(totalMessages) s.logger.Error("failed to send messages", zap.Int("total", totalMessages), zap.Error(err)) + s.retry(ctx, wrappers) return } - totalErrors := lo.CountBy(errs, func(err error) bool { return err != nil }) - s.metrics.IncError(totalErrors) + failed := lo.Filter( + wrappers, + func(item *eventWrapper, index int) bool { + if err := errs[index]; err != nil { + s.logger.Error("failed to send message", zap.String("token", item.Token), zap.Error(err)) + return true + } - for i, err := range errs { - if err == nil { - continue - } + return false + }, + ) + + if len(failed) == 0 { + return + } + + s.metrics.IncError(len(failed)) + s.logger.Error("failed to send messages", zap.Int("total", totalMessages), zap.Int("failed", len(failed))) + + s.retry(ctx, failed) +} - wrapper := wrappers[i] +func (s *Service) retry(ctx context.Context, events []*eventWrapper) { + for _, wrapper := range events { token := wrapper.Token wrapper.Retries++ From 117b9ae070c2e987f0f05e2d44f7bdcc72fb6a05 Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Sat, 1 Nov 2025 14:42:57 +0700 Subject: [PATCH 3/3] [push] update Grafana Dashboard --- deployments/grafana/dashboards/push.json | 58 +++++++++++++++--------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/deployments/grafana/dashboards/push.json b/deployments/grafana/dashboards/push.json index ae37bbda..d2785a54 100644 --- a/deployments/grafana/dashboards/push.json +++ b/deployments/grafana/dashboards/push.json @@ -30,7 +30,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 2, - "id": 9, + "id": 0, "links": [], "panels": [ { @@ -82,7 +82,7 @@ "textMode": "auto", "wideLayout": true }, - "pluginVersion": "12.2.0-16711121739", + "pluginVersion": "12.2.0", "targets": [ { "datasource": { @@ -90,7 +90,7 @@ "uid": "edqp0a73uh2bka" }, "editorMode": "code", - "expr": "1 - (sum(rate(sms_push_retries_total{outcome='max_attempts'}[$__rate_interval])) / (sum(rate(sms_push_enqueued_total[$__rate_interval])) OR vector(0)))", + "expr": "1 - (sum(rate(sms_push_retries_total[$__rate_interval])) / (sum(rate(sms_push_enqueued_total[$__rate_interval])) OR vector(0)))", "range": true, "refId": "A", "unit": "percentunit" @@ -133,6 +133,7 @@ "type": "linear" }, "showPoints": "auto", + "showValues": false, "spanNulls": false, "stacking": { "group": "A", @@ -180,15 +181,17 @@ "sort": "none" } }, - "pluginVersion": "12.2.0-16711121739", + "pluginVersion": "12.2.0", "targets": [ { "datasource": { "type": "prometheus", "uid": "edqp0a73uh2bka" }, - "expr": "rate(sms_push_enqueued_total{event=~\"$event_type\"}[$__rate_interval])", + "editorMode": "code", + "expr": "sum by (event) (rate(sms_push_enqueued_total{event=~\"$event_type\"}[$__rate_interval]))", "legendFormat": "{{event}}", + "range": true, "refId": "A" } ], @@ -229,6 +232,7 @@ "type": "linear" }, "showPoints": "auto", + "showValues": false, "spanNulls": false, "stacking": { "group": "A", @@ -276,15 +280,18 @@ "sort": "none" } }, - "pluginVersion": "12.2.0-16711121739", + "pluginVersion": "12.2.0", "targets": [ { "datasource": { "type": "prometheus", "uid": "edqp0a73uh2bka" }, + "editorMode": "code", "expr": "sum(rate(sms_push_blacklist_total{operation=~\"added|skipped\"}[$__rate_interval])) by (operation)", "legend": "{{operation}}", + "legendFormat": "__auto", + "range": true, "refId": "A" } ], @@ -322,9 +329,12 @@ "id": 3, "options": { "legend": { - "displayMode": "list", + "displayMode": "table", "placement": "bottom", - "showLegend": true + "showLegend": true, + "values": [ + "percent" + ] }, "pieType": "pie", "reduceOptions": { @@ -334,13 +344,14 @@ "fields": "", "values": false }, + "sort": "desc", "tooltip": { "hideZeros": false, "mode": "single", "sort": "none" } }, - "pluginVersion": "12.2.0-16711121739", + "pluginVersion": "12.2.0", "targets": [ { "datasource": { @@ -348,7 +359,10 @@ "uid": "edqp0a73uh2bka" }, "editorMode": "code", + "exemplar": false, "expr": "sum by (event) (sms_push_enqueued_total)", + "instant": false, + "legendFormat": "__auto", "range": true, "refId": "A" } @@ -390,6 +404,7 @@ "type": "linear" }, "showPoints": "auto", + "showValues": false, "spanNulls": false, "stacking": { "group": "A", @@ -429,7 +444,7 @@ "calcs": [], "displayMode": "list", "placement": "bottom", - "showLegend": true + "showLegend": false }, "tooltip": { "hideZeros": false, @@ -437,15 +452,17 @@ "sort": "none" } }, - "pluginVersion": "12.2.0-16711121739", + "pluginVersion": "12.2.0", "targets": [ { "datasource": { "type": "prometheus", "uid": "edqp0a73uh2bka" }, - "expr": "rate(sms_push_retries_total[$__rate_interval])", + "editorMode": "code", + "expr": "sum(rate(sms_push_retries_total[$__rate_interval]))", "legendFormat": "{{outcome}}", + "range": true, "refId": "A" } ], @@ -486,6 +503,7 @@ "type": "linear" }, "showPoints": "auto", + "showValues": false, "spanNulls": false, "stacking": { "group": "A", @@ -525,7 +543,7 @@ "calcs": [], "displayMode": "list", "placement": "bottom", - "showLegend": true + "showLegend": false }, "tooltip": { "hideZeros": false, @@ -533,7 +551,7 @@ "sort": "none" } }, - "pluginVersion": "12.2.0-16711121739", + "pluginVersion": "12.2.0", "targets": [ { "datasource": { @@ -541,7 +559,7 @@ "uid": "edqp0a73uh2bka" }, "editorMode": "code", - "expr": "rate(sms_push_errors_total[$__rate_interval])", + "expr": "sum(rate(sms_push_errors_total[$__rate_interval]))", "legendFormat": "{{error}}", "range": true, "refId": "A" @@ -553,7 +571,7 @@ ], "preload": false, "refresh": "auto", - "schemaVersion": 41, + "schemaVersion": 42, "tags": [ "sms", "push" @@ -563,9 +581,7 @@ { "current": { "text": "All", - "value": [ - "$__all" - ] + "value": "$__all" }, "datasource": { "type": "prometheus", @@ -582,7 +598,7 @@ "query": "label_values(sms_push_enqueued_total, event)", "refId": "PrometheusVariableQueryEditor-VariableQuery" }, - "refresh": 1, + "refresh": 2, "regex": "", "type": "query" } @@ -596,5 +612,5 @@ "timezone": "", "title": "SMS Push Throughput", "uid": "17c2b3f3-e85b-4aaf-bba4-8aaa85b46569", - "version": 16 + "version": 22 } \ No newline at end of file