From 35e27185e2f8ccc29552c917426b93a86823ef02 Mon Sep 17 00:00:00 2001 From: Maluki Muthusi Date: Sun, 17 Aug 2025 20:00:22 +0300 Subject: [PATCH] Adds Redis pub/sub for multi-pod WebSocket support Implements a message broker system with Redis backend to enable WebSocket notifications across multiple Gotify server instances. Introduces a broker interface with Redis and no-op implementations, allowing deployments to scale horizontally while maintaining real-time message delivery. Falls back gracefully to local-only mode when Redis is disabled or unavailable. Includes configuration options for Redis connection and channel prefix customization. --- api/stream/stream.go | 40 ++++++++++ broker/broker.go | 15 ++++ broker/noop.go | 31 ++++++++ broker/redis.go | 155 +++++++++++++++++++++++++++++++++++++++ broker/redis_test.go | 43 +++++++++++ config.example.redis.yml | 48 ++++++++++++ config/config.go | 5 ++ go.mod | 3 + go.sum | 6 ++ router/router.go | 27 ++++++- runner/runner.go | 4 +- ui/package.json | 3 +- 12 files changed, 375 insertions(+), 5 deletions(-) create mode 100644 broker/broker.go create mode 100644 broker/noop.go create mode 100644 broker/redis.go create mode 100644 broker/redis_test.go create mode 100644 config.example.redis.yml diff --git a/api/stream/stream.go b/api/stream/stream.go index 9ad54ee0..8307a98e 100644 --- a/api/stream/stream.go +++ b/api/stream/stream.go @@ -1,6 +1,7 @@ package stream import ( + "log" "net/http" "net/url" "regexp" @@ -11,6 +12,7 @@ import ( "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/gotify/server/v2/auth" + "github.com/gotify/server/v2/broker" "github.com/gotify/server/v2/mode" "github.com/gotify/server/v2/model" ) @@ -22,6 +24,7 @@ type API struct { pingPeriod time.Duration pongTimeout time.Duration upgrader *websocket.Upgrader + broker broker.MessageBroker } // New creates a new instance of API. @@ -34,6 +37,18 @@ func New(pingPeriod, pongTimeout time.Duration, allowedWebSocketOrigins []string pingPeriod: pingPeriod, pongTimeout: pingPeriod + pongTimeout, upgrader: newUpgrader(allowedWebSocketOrigins), + broker: broker.NewNoopBroker(), + } +} + +// NewWithBroker creates a new instance of API with a message broker. +func NewWithBroker(pingPeriod, pongTimeout time.Duration, allowedWebSocketOrigins []string, msgBroker broker.MessageBroker) *API { + return &API{ + clients: make(map[uint][]*client), + pingPeriod: pingPeriod, + pongTimeout: pingPeriod + pongTimeout, + upgrader: newUpgrader(allowedWebSocketOrigins), + broker: msgBroker, } } @@ -81,6 +96,17 @@ func (a *API) NotifyDeletedClient(userID uint, token string) { // Notify notifies the clients with the given userID that a new messages was created. func (a *API) Notify(userID uint, msg *model.MessageExternal) { + // Always notify local clients first for immediate delivery + a.notifyLocal(userID, msg) + + // Also publish to broker for distribution to other pods (if Redis is enabled) + if err := a.broker.PublishMessage(userID, msg); err != nil { + log.Printf("Failed to publish message to broker: %v", err) + } +} + +// notifyLocal notifies only the local WebSocket clients +func (a *API) notifyLocal(userID uint, msg *model.MessageExternal) { a.lock.RLock() defer a.lock.RUnlock() if clients, ok := a.clients[userID]; ok { @@ -153,11 +179,25 @@ func (a *API) Handle(ctx *gin.Context) { go client.startWriteHandler(a.pingPeriod) } +// StartRedisSubscriber starts the Redis message subscriber +func (a *API) StartRedisSubscriber() error { + return a.broker.Subscribe(func(userID uint, msg *model.MessageExternal) { + // This callback is called when a message is received from Redis + // Forward it to local WebSocket clients + a.notifyLocal(userID, msg) + }) +} + // Close closes all client connections and stops answering new connections. func (a *API) Close() { a.lock.Lock() defer a.lock.Unlock() + // Close the broker first + if err := a.broker.Close(); err != nil { + log.Printf("Error closing message broker: %v", err) + } + for _, clients := range a.clients { for _, client := range clients { client.Close() diff --git a/broker/broker.go b/broker/broker.go new file mode 100644 index 00000000..c9994640 --- /dev/null +++ b/broker/broker.go @@ -0,0 +1,15 @@ +package broker + +import ( + "github.com/gotify/server/v2/model" +) + +// MessageBroker defines the interface for distributing messages across multiple instances +type MessageBroker interface { + // PublishMessage publishes a message to all subscribers + PublishMessage(userID uint, message *model.MessageExternal) error + // Subscribe starts listening for messages and calls the callback for each received message + Subscribe(callback func(userID uint, message *model.MessageExternal)) error + // Close closes the broker connection + Close() error +} \ No newline at end of file diff --git a/broker/noop.go b/broker/noop.go new file mode 100644 index 00000000..d5716131 --- /dev/null +++ b/broker/noop.go @@ -0,0 +1,31 @@ +package broker + +import ( + "github.com/gotify/server/v2/model" +) + +// NoopBroker is a no-operation broker that does nothing +// Used when Redis is disabled and we fall back to local-only notifications +type NoopBroker struct{} + +// NewNoopBroker creates a new no-op broker +func NewNoopBroker() *NoopBroker { + return &NoopBroker{} +} + +// PublishMessage does nothing in the no-op broker +func (n *NoopBroker) PublishMessage(userID uint, message *model.MessageExternal) error { + // No-op: messages will be handled locally only + return nil +} + +// Subscribe does nothing in the no-op broker +func (n *NoopBroker) Subscribe(callback func(userID uint, message *model.MessageExternal)) error { + // No-op: no external messages to subscribe to + return nil +} + +// Close does nothing in the no-op broker +func (n *NoopBroker) Close() error { + return nil +} \ No newline at end of file diff --git a/broker/redis.go b/broker/redis.go new file mode 100644 index 00000000..05c8daff --- /dev/null +++ b/broker/redis.go @@ -0,0 +1,155 @@ +package broker + +import ( + "context" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/gotify/server/v2/model" + "github.com/redis/go-redis/v9" +) + +const ( + defaultChannel = "gotify:messages" + defaultReconnectDelay = 5 * time.Second + defaultPingInterval = 30 * time.Second +) + +// RedisMessage represents the message format sent through Redis +type RedisMessage struct { + UserID uint `json:"user_id"` + Message *model.MessageExternal `json:"message"` +} + +// RedisBroker implements MessageBroker using Redis pub/sub +type RedisBroker struct { + client *redis.Client + pubsub *redis.PubSub + channel string + ctx context.Context + cancel context.CancelFunc + closed bool +} + +// NewRedisBroker creates a new Redis broker instance +func NewRedisBroker(redisURL, channelPrefix string) (*RedisBroker, error) { + opts, err := redis.ParseURL(redisURL) + if err != nil { + return nil, fmt.Errorf("invalid Redis URL: %v", err) + } + + client := redis.NewClient(opts) + + // Test connection + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := client.Ping(ctx).Err(); err != nil { + return nil, fmt.Errorf("failed to connect to Redis: %v", err) + } + + ctx, cancel = context.WithCancel(context.Background()) + + channel := defaultChannel + if channelPrefix != "" { + channel = channelPrefix + ":messages" + } + + return &RedisBroker{ + client: client, + channel: channel, + ctx: ctx, + cancel: cancel, + closed: false, + }, nil +} + +// PublishMessage publishes a message to Redis for distribution to all subscribers +func (r *RedisBroker) PublishMessage(userID uint, message *model.MessageExternal) error { + if r.closed { + return fmt.Errorf("broker is closed") + } + + redisMsg := RedisMessage{ + UserID: userID, + Message: message, + } + + data, err := json.Marshal(redisMsg) + if err != nil { + return fmt.Errorf("failed to marshal message: %v", err) + } + + ctx, cancel := context.WithTimeout(r.ctx, 5*time.Second) + defer cancel() + + return r.client.Publish(ctx, r.channel, data).Err() +} + +// Subscribe starts listening for messages on the Redis channel +func (r *RedisBroker) Subscribe(callback func(userID uint, message *model.MessageExternal)) error { + if r.closed { + return fmt.Errorf("broker is closed") + } + + r.pubsub = r.client.Subscribe(r.ctx, r.channel) + + // Wait for confirmation that subscription is created + _, err := r.pubsub.Receive(r.ctx) + if err != nil { + return fmt.Errorf("failed to subscribe to Redis channel: %v", err) + } + + // Start processing messages in a goroutine + go r.processMessages(callback) + + return nil +} + +// processMessages handles incoming Redis messages +func (r *RedisBroker) processMessages(callback func(userID uint, message *model.MessageExternal)) { + ch := r.pubsub.Channel() + + for { + select { + case <-r.ctx.Done(): + return + case msg := <-ch: + if msg == nil { + continue + } + + var redisMsg RedisMessage + if err := json.Unmarshal([]byte(msg.Payload), &redisMsg); err != nil { + log.Printf("Failed to unmarshal Redis message: %v", err) + continue + } + + // Call the callback with the parsed message + callback(redisMsg.UserID, redisMsg.Message) + } + } +} + +// Close closes the Redis connection and stops the subscriber +func (r *RedisBroker) Close() error { + if r.closed { + return nil + } + + r.closed = true + r.cancel() + + var err error + if r.pubsub != nil { + err = r.pubsub.Close() + } + + if closeErr := r.client.Close(); closeErr != nil && err == nil { + err = closeErr + } + + return err +} \ No newline at end of file diff --git a/broker/redis_test.go b/broker/redis_test.go new file mode 100644 index 00000000..cfdd9d91 --- /dev/null +++ b/broker/redis_test.go @@ -0,0 +1,43 @@ +package broker + +import ( + "testing" + "time" + + "github.com/gotify/server/v2/model" + "github.com/stretchr/testify/assert" +) + +func TestNoopBroker(t *testing.T) { + broker := NewNoopBroker() + defer broker.Close() + + priority := 1 + // Test that operations don't fail + err := broker.PublishMessage(1, &model.MessageExternal{ + ID: 1, + ApplicationID: 1, + Message: "test", + Title: "Test", + Priority: &priority, + Date: time.Now(), + }) + assert.NoError(t, err) + + err = broker.Subscribe(func(userID uint, message *model.MessageExternal) {}) + assert.NoError(t, err) + + err = broker.Close() + assert.NoError(t, err) +} + +func TestRedisBroker_InvalidURL(t *testing.T) { + _, err := NewRedisBroker("invalid-url", "test") + assert.Error(t, err) +} + +func TestRedisBroker_ConnectionFailure(t *testing.T) { + // This will fail to connect since there's no Redis server + _, err := NewRedisBroker("redis://localhost:9999/0", "test") + assert.Error(t, err) +} \ No newline at end of file diff --git a/config.example.redis.yml b/config.example.redis.yml new file mode 100644 index 00000000..43e3d584 --- /dev/null +++ b/config.example.redis.yml @@ -0,0 +1,48 @@ +# Example configuration for Redis multi-pod setup +# Copy this to config.yml and modify as needed + +# Enable Redis for multi-pod WebSocket support +redis: + enabled: true + url: "redis://localhost:6379/0" # Change to your Redis server URL + channelPrefix: "gotify" # Prefix for Redis channels + +# Standard Gotify configuration +server: + keepalivePeriodSeconds: 0 + listenAddr: "" + port: 80 + + ssl: + enabled: false + redirectToHTTPS: true + port: 443 + certFile: "" + certKey: "" + letsEncrypt: + enabled: false + acceptTOS: false + cache: "data/certs" + hosts: [] + + stream: + pingPeriodSeconds: 45 + allowedOrigins: [] + + cors: + allowOrigins: [] + allowMethods: [] + allowHeaders: [] + +database: + dialect: "sqlite3" + connection: "data/gotify.db" + +defaultUser: + name: "admin" + pass: "admin" + +passStrength: 10 +uploadedImagesDir: "data/images" +pluginsDir: "data/plugins" +registration: false \ No newline at end of file diff --git a/config/config.go b/config/config.go index 52489305..3a247404 100644 --- a/config/config.go +++ b/config/config.go @@ -54,6 +54,11 @@ type Configuration struct { UploadedImagesDir string `default:"data/images"` PluginsDir string `default:"data/plugins"` Registration bool `default:"false"` + Redis struct { + Enabled bool `default:"false"` + URL string `default:"redis://localhost:6379/0"` + ChannelPrefix string `default:"gotify"` + } } func configFiles() []string { diff --git a/go.mod b/go.mod index 4f9b5a7a..b819eb58 100644 --- a/go.mod +++ b/go.mod @@ -22,8 +22,10 @@ require ( github.com/BurntSushi/toml v1.2.0 // indirect github.com/bytedance/sonic v1.13.3 // indirect github.com/bytedance/sonic/loader v0.2.4 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.5 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/gabriel-vasile/mimetype v1.4.9 // indirect github.com/gin-contrib/sse v1.1.0 // indirect github.com/go-playground/locales v0.14.1 // indirect @@ -42,6 +44,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/redis/go-redis/v9 v9.12.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.3.0 // indirect golang.org/x/arch v0.18.0 // indirect diff --git a/go.sum b/go.sum index 8ffaa03f..9844a1ce 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/bytedance/sonic v1.13.3/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1 github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCNan80NzY= github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= @@ -16,6 +18,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd h1:83Wprp6ROGeiHFAP8WJdI2RoxALQYgdllERc3N5N2DM= github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 h1:Yzb9+7DPaBjB8zlTR87/ElzFsnQfuHnVUVqpZZIcV5Y= github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= @@ -98,6 +102,8 @@ github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0 github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.12.1 h1:k5iquqv27aBtnTm2tIkROUDp8JBXhXZIVu1InSgvovg= +github.com/redis/go-redis/v9 v9.12.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= diff --git a/router/router.go b/router/router.go index b7ebfe0c..26883da3 100644 --- a/router/router.go +++ b/router/router.go @@ -2,6 +2,7 @@ package router import ( "fmt" + "log" "net/http" "path/filepath" "regexp" @@ -14,6 +15,7 @@ import ( "github.com/gotify/server/v2/api" "github.com/gotify/server/v2/api/stream" "github.com/gotify/server/v2/auth" + "github.com/gotify/server/v2/broker" "github.com/gotify/server/v2/config" "github.com/gotify/server/v2/database" "github.com/gotify/server/v2/docs" @@ -63,8 +65,29 @@ func Create(db *database.GormDatabase, vInfo *model.VersionInfo, conf *config.Co ctx.Abort() }) } - streamHandler := stream.New( - time.Duration(conf.Server.Stream.PingPeriodSeconds)*time.Second, 15*time.Second, conf.Server.Stream.AllowedOrigins) + var streamHandler *stream.API + if conf.Redis.Enabled { + // Create Redis broker + redisBroker, err := broker.NewRedisBroker(conf.Redis.URL, conf.Redis.ChannelPrefix) + if err != nil { + log.Printf("Failed to create Redis broker: %v, falling back to local-only mode", err) + streamHandler = stream.New( + time.Duration(conf.Server.Stream.PingPeriodSeconds)*time.Second, 15*time.Second, conf.Server.Stream.AllowedOrigins) + } else { + streamHandler = stream.NewWithBroker( + time.Duration(conf.Server.Stream.PingPeriodSeconds)*time.Second, 15*time.Second, conf.Server.Stream.AllowedOrigins, redisBroker) + + // Start Redis subscriber + if err := streamHandler.StartRedisSubscriber(); err != nil { + log.Printf("Failed to start Redis subscriber: %v", err) + } else { + log.Println("Redis pub/sub enabled for multi-pod WebSocket support") + } + } + } else { + streamHandler = stream.New( + time.Duration(conf.Server.Stream.PingPeriodSeconds)*time.Second, 15*time.Second, conf.Server.Stream.AllowedOrigins) + } go func() { ticker := time.NewTicker(5 * time.Minute) for range ticker.C { diff --git a/runner/runner.go b/runner/runner.go index 4ddf1996..7a0b640a 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -88,8 +88,8 @@ func startListening(connectionType, listenAddr string, port, keepAlive int) (net } func getNetworkAndAddr(listenAddr string, port int) (string, string) { - if strings.HasPrefix(listenAddr, "unix:") { - return "unix", strings.TrimPrefix(listenAddr, "unix:") + if after, ok := strings.CutPrefix(listenAddr, "unix:"); ok { + return "unix", after } return "tcp", fmt.Sprintf("%s:%d", listenAddr, port) } diff --git a/ui/package.json b/ui/package.json index fd1b8e36..2e7af6cb 100644 --- a/ui/package.json +++ b/ui/package.json @@ -70,5 +70,6 @@ "last 1 firefox version", "last 1 safari version" ] - } + }, + "packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e" }