Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func NewClient(o *ClientOptions) Client {
}
c.fastReconnectCheckStartTime.Store(time.Now())
c.persist = c.options.Store
c.status = connectionStatus{logger: c.logger}
c.messageIds = messageIds{index: make(map[uint16]tokenCompletor), logger: c.logger}
c.msgRouter = newRouter(c.logger)
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
Expand Down Expand Up @@ -778,7 +779,7 @@ func (c *client) startCommsWorkers(conn net.Conn, connectionUp connCompletedFn,
continue
}
ERROR.Println(CLI, "Connect comms goroutine - error triggered", err)
c.logger.Error("Connect comms goroutine - error triggered", err.Error(), componentAttr(CLI))
c.logger.Error("Connect comms goroutine - error triggered", slog.String("error", err.Error()), componentAttr(CLI))
c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
continue
}
Expand Down
30 changes: 30 additions & 0 deletions status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package mqtt

import (
"errors"
"log/slog"
"sync"
)

Expand Down Expand Up @@ -121,6 +122,8 @@ type connectionStatus struct {
// `connecting`). `actionCompleted` will be set whenever we move into one of the above statues and the channel
// returned to anything else requesting a status change. The channel will be closed when the operation is complete.
actionCompleted chan struct{} // Only valid whilst status is Connecting or Reconnecting; will be closed when connection completed (success or failure)

logger *slog.Logger
}

// ConnectionStatus returns the connection status.
Expand Down Expand Up @@ -153,6 +156,9 @@ func (c *connectionStatus) Connecting() (connCompletedFn, error) {
return nil, errStatusMustBeDisconnected
}
c.status = connecting
if c.logger != nil {
c.logger.Error("Connecting() connection started", slog.String("to", connecting.String()), componentAttr(STA))
}
c.actionCompleted = make(chan struct{})
return c.connected, nil
}
Expand All @@ -173,8 +179,14 @@ func (c *connectionStatus) connected(success bool) error {
}
if success {
c.status = connected
if c.logger != nil {
c.logger.Error("connected() connection successful", slog.String("to", connected.String()), componentAttr(STA))
}
} else {
c.status = disconnected
if c.logger != nil {
c.logger.Error("connected() connection failed", slog.String("to", disconnected.String()), componentAttr(STA))
}
}
return nil
}
Expand All @@ -200,6 +212,9 @@ func (c *connectionStatus) Disconnecting() (disconnectCompletedFn, error) {

prevStatus := c.status
c.status = disconnecting
if c.logger != nil {
c.logger.Error("Disconnecting() disconnection started", slog.String("from", prevStatus.String()), slog.String("to", disconnecting.String()), componentAttr(STA))
}

// We may need to wait for connection/reconnection process to complete (they should regularly check the status)
if prevStatus == connecting || prevStatus == reconnecting {
Expand All @@ -222,6 +237,9 @@ func (c *connectionStatus) disconnectionCompleted() {
c.Lock()
defer c.Unlock()
c.status = disconnected
if c.logger != nil {
c.logger.Error("disconnectionCompleted() disconnection completed", slog.String("to", disconnected.String()), componentAttr(STA))
}
close(c.actionCompleted) // Alert anything waiting on the connection process to complete
c.actionCompleted = nil
}
Expand All @@ -245,6 +263,9 @@ func (c *connectionStatus) ConnectionLost(willReconnect bool) (connectionLostHan
c.willReconnect = willReconnect
prevStatus := c.status
c.status = disconnecting
if c.logger != nil {
c.logger.Error("ConnectionLost() connection lost", slog.String("from", prevStatus.String()), slog.String("to", disconnecting.String()), slog.Bool("willReconnect", willReconnect), componentAttr(STA))
}

// There is a slight possibility that a connection attempt is in progress (connection up and goroutines started but
// status not yet changed). By changing the status we ensure that process will exit cleanly
Expand Down Expand Up @@ -273,6 +294,9 @@ func (c *connectionStatus) getConnectionLostHandler(reconnectRequested bool) con
// `Disconnecting()` may have been called while the disconnection was being processed (this makes it permanent!)
if !c.willReconnect || !proceed {
c.status = disconnected
if c.logger != nil {
c.logger.Error("getConnectionLostHandler() disconnection completed", slog.String("to", disconnected.String()), componentAttr(STA))
}
close(c.actionCompleted) // Alert anything waiting on the connection process to complete
c.actionCompleted = nil
if !reconnectRequested || !proceed {
Expand All @@ -282,6 +306,9 @@ func (c *connectionStatus) getConnectionLostHandler(reconnectRequested bool) con
}

c.status = reconnecting
if c.logger != nil {
c.logger.Error("getConnectionLostHandler() reconnection started", slog.String("to", reconnecting.String()), componentAttr(STA))
}
return c.connected, nil // Note that c.actionCompleted is still live and will be closed in connected
}
}
Expand All @@ -293,4 +320,7 @@ func (c *connectionStatus) forceConnectionStatus(s status) {
c.Lock()
defer c.Unlock()
c.status = s
if c.logger != nil {
c.logger.Error("forceConnectionStatus() status forced", slog.String("to", s.String()), componentAttr(STA))
}
}
10 changes: 5 additions & 5 deletions unit_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func Test_AdvancedStatusOperations(t *testing.T) {
t.Parallel()

// Aborted connection (i.e. user triggered)
s := connectionStatus{}
s := connectionStatus{logger: noopSLogger}
if s.ConnectionStatus() != disconnected {
t.Fatalf("Expected disconnected; got: %v", s.ConnectionStatus())
}
Expand All @@ -105,7 +105,7 @@ func Test_AdvancedStatusOperations(t *testing.T) {
}

// Connection lost - no reconnection requested
s = connectionStatus{status: connected}
s = connectionStatus{status: connected, logger: noopSLogger}
rf, err := s.ConnectionLost(false)
if err != nil {
t.Fatalf("Error connecting: %v", err)
Expand All @@ -125,7 +125,7 @@ func Test_AdvancedStatusOperations(t *testing.T) {
}

// Aborted reconnection - stage 1 (i.e. user triggered whist disconnect in progress)
s = connectionStatus{status: connected}
s = connectionStatus{status: connected, logger: noopSLogger}
rf, err = s.ConnectionLost(true)
if err != nil {
t.Fatalf("Error connecting: %v", err)
Expand All @@ -145,7 +145,7 @@ func Test_AdvancedStatusOperations(t *testing.T) {
}

// Aborted reconnection - stage 2 (i.e. user triggered whist disconnect in progress)
s = connectionStatus{status: connected}
s = connectionStatus{status: connected, logger: noopSLogger}
rf, err = s.ConnectionLost(true)
if err != nil {
t.Fatalf("Error connecting: %v", err)
Expand Down Expand Up @@ -173,7 +173,7 @@ func Test_AdvancedStatusOperations(t *testing.T) {

func Test_AbortedConnection(t *testing.T) {
t.Parallel()
s := connectionStatus{}
s := connectionStatus{logger: noopSLogger}
if s.ConnectionStatus() != disconnected {
t.Fatalf("Expected disconnected; got: %v", s.ConnectionStatus())
}
Expand Down