Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions api/stream/stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stream

import (
"log"
"net/http"
"net/url"
"regexp"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/gotify/server/v2/auth"
"github.com/gotify/server/v2/broker"
"github.com/gotify/server/v2/mode"
"github.com/gotify/server/v2/model"
)
Expand All @@ -22,6 +24,7 @@ type API struct {
pingPeriod time.Duration
pongTimeout time.Duration
upgrader *websocket.Upgrader
broker broker.MessageBroker
}

// New creates a new instance of API.
Expand All @@ -34,6 +37,18 @@ func New(pingPeriod, pongTimeout time.Duration, allowedWebSocketOrigins []string
pingPeriod: pingPeriod,
pongTimeout: pingPeriod + pongTimeout,
upgrader: newUpgrader(allowedWebSocketOrigins),
broker: broker.NewNoopBroker(),
}
}

// NewWithBroker creates a new instance of API with a message broker.
func NewWithBroker(pingPeriod, pongTimeout time.Duration, allowedWebSocketOrigins []string, msgBroker broker.MessageBroker) *API {
return &API{
clients: make(map[uint][]*client),
pingPeriod: pingPeriod,
pongTimeout: pingPeriod + pongTimeout,
upgrader: newUpgrader(allowedWebSocketOrigins),
broker: msgBroker,
}
}

Expand Down Expand Up @@ -81,6 +96,17 @@ func (a *API) NotifyDeletedClient(userID uint, token string) {

// Notify notifies the clients with the given userID that a new messages was created.
func (a *API) Notify(userID uint, msg *model.MessageExternal) {
// Always notify local clients first for immediate delivery
a.notifyLocal(userID, msg)

// Also publish to broker for distribution to other pods (if Redis is enabled)
if err := a.broker.PublishMessage(userID, msg); err != nil {
log.Printf("Failed to publish message to broker: %v", err)
}
}

// notifyLocal notifies only the local WebSocket clients
func (a *API) notifyLocal(userID uint, msg *model.MessageExternal) {
a.lock.RLock()
defer a.lock.RUnlock()
if clients, ok := a.clients[userID]; ok {
Expand Down Expand Up @@ -153,11 +179,25 @@ func (a *API) Handle(ctx *gin.Context) {
go client.startWriteHandler(a.pingPeriod)
}

// StartRedisSubscriber starts the Redis message subscriber
func (a *API) StartRedisSubscriber() error {
return a.broker.Subscribe(func(userID uint, msg *model.MessageExternal) {
// This callback is called when a message is received from Redis
// Forward it to local WebSocket clients
a.notifyLocal(userID, msg)
})
}

// Close closes all client connections and stops answering new connections.
func (a *API) Close() {
a.lock.Lock()
defer a.lock.Unlock()

// Close the broker first
if err := a.broker.Close(); err != nil {
log.Printf("Error closing message broker: %v", err)
}

for _, clients := range a.clients {
for _, client := range clients {
client.Close()
Expand Down
15 changes: 15 additions & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package broker

import (
"github.com/gotify/server/v2/model"
)

// MessageBroker defines the interface for distributing messages across multiple instances
type MessageBroker interface {
// PublishMessage publishes a message to all subscribers
PublishMessage(userID uint, message *model.MessageExternal) error
// Subscribe starts listening for messages and calls the callback for each received message
Subscribe(callback func(userID uint, message *model.MessageExternal)) error
// Close closes the broker connection
Close() error
}
31 changes: 31 additions & 0 deletions broker/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package broker

import (
"github.com/gotify/server/v2/model"
)

// NoopBroker is a no-operation broker that does nothing
// Used when Redis is disabled and we fall back to local-only notifications
type NoopBroker struct{}

// NewNoopBroker creates a new no-op broker
func NewNoopBroker() *NoopBroker {
return &NoopBroker{}
}

// PublishMessage does nothing in the no-op broker
func (n *NoopBroker) PublishMessage(userID uint, message *model.MessageExternal) error {
// No-op: messages will be handled locally only
return nil
}

// Subscribe does nothing in the no-op broker
func (n *NoopBroker) Subscribe(callback func(userID uint, message *model.MessageExternal)) error {
// No-op: no external messages to subscribe to
return nil
}

// Close does nothing in the no-op broker
func (n *NoopBroker) Close() error {
return nil
}
155 changes: 155 additions & 0 deletions broker/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package broker

import (
"context"
"encoding/json"
"fmt"
"log"
"time"

"github.com/gotify/server/v2/model"
"github.com/redis/go-redis/v9"
)

const (
defaultChannel = "gotify:messages"
defaultReconnectDelay = 5 * time.Second
defaultPingInterval = 30 * time.Second
)

// RedisMessage represents the message format sent through Redis
type RedisMessage struct {
UserID uint `json:"user_id"`
Message *model.MessageExternal `json:"message"`
}

// RedisBroker implements MessageBroker using Redis pub/sub
type RedisBroker struct {
client *redis.Client
pubsub *redis.PubSub
channel string
ctx context.Context
cancel context.CancelFunc
closed bool
}

// NewRedisBroker creates a new Redis broker instance
func NewRedisBroker(redisURL, channelPrefix string) (*RedisBroker, error) {
opts, err := redis.ParseURL(redisURL)
if err != nil {
return nil, fmt.Errorf("invalid Redis URL: %v", err)
}

client := redis.NewClient(opts)

// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("failed to connect to Redis: %v", err)
}

ctx, cancel = context.WithCancel(context.Background())

channel := defaultChannel
if channelPrefix != "" {
channel = channelPrefix + ":messages"
}

return &RedisBroker{
client: client,
channel: channel,
ctx: ctx,
cancel: cancel,
closed: false,
}, nil
}

// PublishMessage publishes a message to Redis for distribution to all subscribers
func (r *RedisBroker) PublishMessage(userID uint, message *model.MessageExternal) error {
if r.closed {
return fmt.Errorf("broker is closed")
}

redisMsg := RedisMessage{
UserID: userID,
Message: message,
}

data, err := json.Marshal(redisMsg)
if err != nil {
return fmt.Errorf("failed to marshal message: %v", err)
}

ctx, cancel := context.WithTimeout(r.ctx, 5*time.Second)
defer cancel()

return r.client.Publish(ctx, r.channel, data).Err()
}

// Subscribe starts listening for messages on the Redis channel
func (r *RedisBroker) Subscribe(callback func(userID uint, message *model.MessageExternal)) error {
if r.closed {
return fmt.Errorf("broker is closed")
}

r.pubsub = r.client.Subscribe(r.ctx, r.channel)

// Wait for confirmation that subscription is created
_, err := r.pubsub.Receive(r.ctx)
if err != nil {
return fmt.Errorf("failed to subscribe to Redis channel: %v", err)
}

// Start processing messages in a goroutine
go r.processMessages(callback)

return nil
}

// processMessages handles incoming Redis messages
func (r *RedisBroker) processMessages(callback func(userID uint, message *model.MessageExternal)) {
ch := r.pubsub.Channel()

for {
select {
case <-r.ctx.Done():
return
case msg := <-ch:
if msg == nil {
continue
}

var redisMsg RedisMessage
if err := json.Unmarshal([]byte(msg.Payload), &redisMsg); err != nil {
log.Printf("Failed to unmarshal Redis message: %v", err)
continue
}

// Call the callback with the parsed message
callback(redisMsg.UserID, redisMsg.Message)
}
}
}

// Close closes the Redis connection and stops the subscriber
func (r *RedisBroker) Close() error {
if r.closed {
return nil
}

r.closed = true
r.cancel()

var err error
if r.pubsub != nil {
err = r.pubsub.Close()
}

if closeErr := r.client.Close(); closeErr != nil && err == nil {
err = closeErr
}

return err
}
43 changes: 43 additions & 0 deletions broker/redis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package broker

import (
"testing"
"time"

"github.com/gotify/server/v2/model"
"github.com/stretchr/testify/assert"
)

func TestNoopBroker(t *testing.T) {
broker := NewNoopBroker()
defer broker.Close()

priority := 1
// Test that operations don't fail
err := broker.PublishMessage(1, &model.MessageExternal{
ID: 1,
ApplicationID: 1,
Message: "test",
Title: "Test",
Priority: &priority,
Date: time.Now(),
})
assert.NoError(t, err)

err = broker.Subscribe(func(userID uint, message *model.MessageExternal) {})
assert.NoError(t, err)

err = broker.Close()
assert.NoError(t, err)
}

func TestRedisBroker_InvalidURL(t *testing.T) {
_, err := NewRedisBroker("invalid-url", "test")
assert.Error(t, err)
}

func TestRedisBroker_ConnectionFailure(t *testing.T) {
// This will fail to connect since there's no Redis server
_, err := NewRedisBroker("redis://localhost:9999/0", "test")
assert.Error(t, err)
}
48 changes: 48 additions & 0 deletions config.example.redis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Example configuration for Redis multi-pod setup
# Copy this to config.yml and modify as needed

# Enable Redis for multi-pod WebSocket support
redis:
enabled: true
url: "redis://localhost:6379/0" # Change to your Redis server URL
channelPrefix: "gotify" # Prefix for Redis channels

# Standard Gotify configuration
server:
keepalivePeriodSeconds: 0
listenAddr: ""
port: 80

ssl:
enabled: false
redirectToHTTPS: true
port: 443
certFile: ""
certKey: ""
letsEncrypt:
enabled: false
acceptTOS: false
cache: "data/certs"
hosts: []

stream:
pingPeriodSeconds: 45
allowedOrigins: []

cors:
allowOrigins: []
allowMethods: []
allowHeaders: []

database:
dialect: "sqlite3"
connection: "data/gotify.db"

defaultUser:
name: "admin"
pass: "admin"

passStrength: 10
uploadedImagesDir: "data/images"
pluginsDir: "data/plugins"
registration: false
Loading