Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 88 additions & 3 deletions pkg/mcp/pftools/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package pftools

import (
"errors"
"net"
"net/url"
"strings"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
Expand All @@ -30,16 +32,99 @@ var (

// IsClusterUnhealthy checks if an error indicates cluster health issues
func IsClusterUnhealthy(err error) bool {
if err == nil {
return false
}

if restErr, ok := err.(rest.Error); ok {
return restErr.Code == 503 && strings.Contains(restErr.Reason, "no healthy upstream")
if restErr.Code == 503 && strings.Contains(strings.ToLower(restErr.Reason), "no healthy upstream") {
return true
}
}
return false

return IsNetworkError(err)
}

// IsAuthError reports whether the error is an authorization error.
func IsAuthError(err error) bool {
if err == nil {
return false
}
if restErr, ok := err.(rest.Error); ok {
return restErr.Code == 401 || restErr.Code == 403
}
return isAuthErrorText(err.Error())
}

// IsNotFoundError reports whether the error is a not found error.
func IsNotFoundError(err error) bool {
if err == nil {
return false
}
if restErr, ok := err.(rest.Error); ok {
return restErr.Code == 403
return restErr.Code == 404
}
return isNotFoundText(err.Error())
}

// IsNetworkError reports whether the error indicates a network failure.
func IsNetworkError(err error) bool {
if err == nil {
return false
}
var netErr net.Error
if errors.As(err, &netErr) {
return true
}
var urlErr *url.Error
if errors.As(err, &urlErr) {
return true
}

errStr := strings.ToLower(err.Error())
networkErrorPatterns := []string{
"connection reset",
"connection refused",
"broken pipe",
"tls handshake timeout",
"i/o timeout",
"context deadline exceeded",
"timeout",
"eof",
"network is unreachable",
"no route to host",
}
for _, pattern := range networkErrorPatterns {
if strings.Contains(errStr, pattern) {
return true
}
}
return false
}

func isAuthErrorText(text string) bool {
lowered := strings.ToLower(text)
authErrorPatterns := []string{
"unauthorized",
"forbidden",
"token expired",
"expired token",
"invalid token",
"401",
"403",
}
for _, pattern := range authErrorPatterns {
if strings.Contains(lowered, pattern) {
return true
}
}
return false
}

func isNotFoundText(text string) bool {
lowered := strings.ToLower(text)
if strings.Contains(lowered, "404") && strings.Contains(lowered, "not found") {
return true
}
return false
}
63 changes: 63 additions & 0 deletions pkg/mcp/pftools/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package pftools

import (
"errors"
"testing"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
)

func TestIsClusterUnhealthy(t *testing.T) {
t.Run("no healthy upstream", func(t *testing.T) {
err := rest.Error{Code: 503, Reason: "no healthy upstream"}
if !IsClusterUnhealthy(err) {
t.Fatalf("expected cluster unhealthy for 503 no healthy upstream")
}
})

t.Run("network error text", func(t *testing.T) {
err := errors.New("read tcp 127.0.0.1:1234->127.0.0.1:443: read: connection reset by peer")
if !IsClusterUnhealthy(err) {
t.Fatalf("expected cluster unhealthy for network error text")
}
})

t.Run("non cluster error", func(t *testing.T) {
err := rest.Error{Code: 500, Reason: "internal error"}
if IsClusterUnhealthy(err) {
t.Fatalf("did not expect cluster unhealthy for 500 internal error")
}
})
}

func TestIsAuthError(t *testing.T) {
t.Run("rest auth error", func(t *testing.T) {
err := rest.Error{Code: 401, Reason: "unauthorized"}
if !IsAuthError(err) {
t.Fatalf("expected auth error for 401")
}
})

t.Run("text auth error", func(t *testing.T) {
err := errors.New("token expired")
if !IsAuthError(err) {
t.Fatalf("expected auth error for token expired")
}
})
}

func TestIsNotFoundError(t *testing.T) {
t.Run("rest not found", func(t *testing.T) {
err := rest.Error{Code: 404, Reason: "Not Found"}
if !IsNotFoundError(err) {
t.Fatalf("expected not found for 404")
}
})

t.Run("text not found", func(t *testing.T) {
err := errors.New("code: 404 reason: 404 Not Found")
if !IsNotFoundError(err) {
t.Fatalf("expected not found for 404 text")
}
})
}
2 changes: 1 addition & 1 deletion pkg/mcp/pftools/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (m *PulsarFunctionManager) updateFunctions() {
log.Printf("Failed to get functions list: %v", err)

// Check if this is a cluster health error and invoke callback if configured
if (IsClusterUnhealthy(err) || IsAuthError(err)) && m.clusterErrorHandler != nil {
if (IsClusterUnhealthy(err) || IsAuthError(err) || IsNotFoundError(err)) && m.clusterErrorHandler != nil {
go m.clusterErrorHandler(m, err)
}
return
Expand Down