diff --git a/pkg/mcp/pftools/errors.go b/pkg/mcp/pftools/errors.go index 62e23e6..149c83f 100644 --- a/pkg/mcp/pftools/errors.go +++ b/pkg/mcp/pftools/errors.go @@ -16,6 +16,8 @@ package pftools import ( "errors" + "net" + "net/url" "strings" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" @@ -30,16 +32,99 @@ var ( // IsClusterUnhealthy checks if an error indicates cluster health issues func IsClusterUnhealthy(err error) bool { + if err == nil { + return false + } + if restErr, ok := err.(rest.Error); ok { - return restErr.Code == 503 && strings.Contains(restErr.Reason, "no healthy upstream") + if restErr.Code == 503 && strings.Contains(strings.ToLower(restErr.Reason), "no healthy upstream") { + return true + } } - return false + + return IsNetworkError(err) } // IsAuthError reports whether the error is an authorization error. func IsAuthError(err error) bool { + if err == nil { + return false + } + if restErr, ok := err.(rest.Error); ok { + return restErr.Code == 401 || restErr.Code == 403 + } + return isAuthErrorText(err.Error()) +} + +// IsNotFoundError reports whether the error is a not found error. +func IsNotFoundError(err error) bool { + if err == nil { + return false + } if restErr, ok := err.(rest.Error); ok { - return restErr.Code == 403 + return restErr.Code == 404 + } + return isNotFoundText(err.Error()) +} + +// IsNetworkError reports whether the error indicates a network failure. +func IsNetworkError(err error) bool { + if err == nil { + return false + } + var netErr net.Error + if errors.As(err, &netErr) { + return true + } + var urlErr *url.Error + if errors.As(err, &urlErr) { + return true + } + + errStr := strings.ToLower(err.Error()) + networkErrorPatterns := []string{ + "connection reset", + "connection refused", + "broken pipe", + "tls handshake timeout", + "i/o timeout", + "context deadline exceeded", + "timeout", + "eof", + "network is unreachable", + "no route to host", + } + for _, pattern := range networkErrorPatterns { + if strings.Contains(errStr, pattern) { + return true + } + } + return false +} + +func isAuthErrorText(text string) bool { + lowered := strings.ToLower(text) + authErrorPatterns := []string{ + "unauthorized", + "forbidden", + "token expired", + "expired token", + "invalid token", + "401", + "403", + } + for _, pattern := range authErrorPatterns { + if strings.Contains(lowered, pattern) { + return true + } + } + return false +} + +func isNotFoundText(text string) bool { + lowered := strings.ToLower(text) + if strings.Contains(lowered, "404") && strings.Contains(lowered, "not found") { + return true } return false } diff --git a/pkg/mcp/pftools/errors_test.go b/pkg/mcp/pftools/errors_test.go new file mode 100644 index 0000000..cc5cee5 --- /dev/null +++ b/pkg/mcp/pftools/errors_test.go @@ -0,0 +1,63 @@ +package pftools + +import ( + "errors" + "testing" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" +) + +func TestIsClusterUnhealthy(t *testing.T) { + t.Run("no healthy upstream", func(t *testing.T) { + err := rest.Error{Code: 503, Reason: "no healthy upstream"} + if !IsClusterUnhealthy(err) { + t.Fatalf("expected cluster unhealthy for 503 no healthy upstream") + } + }) + + t.Run("network error text", func(t *testing.T) { + err := errors.New("read tcp 127.0.0.1:1234->127.0.0.1:443: read: connection reset by peer") + if !IsClusterUnhealthy(err) { + t.Fatalf("expected cluster unhealthy for network error text") + } + }) + + t.Run("non cluster error", func(t *testing.T) { + err := rest.Error{Code: 500, Reason: "internal error"} + if IsClusterUnhealthy(err) { + t.Fatalf("did not expect cluster unhealthy for 500 internal error") + } + }) +} + +func TestIsAuthError(t *testing.T) { + t.Run("rest auth error", func(t *testing.T) { + err := rest.Error{Code: 401, Reason: "unauthorized"} + if !IsAuthError(err) { + t.Fatalf("expected auth error for 401") + } + }) + + t.Run("text auth error", func(t *testing.T) { + err := errors.New("token expired") + if !IsAuthError(err) { + t.Fatalf("expected auth error for token expired") + } + }) +} + +func TestIsNotFoundError(t *testing.T) { + t.Run("rest not found", func(t *testing.T) { + err := rest.Error{Code: 404, Reason: "Not Found"} + if !IsNotFoundError(err) { + t.Fatalf("expected not found for 404") + } + }) + + t.Run("text not found", func(t *testing.T) { + err := errors.New("code: 404 reason: 404 Not Found") + if !IsNotFoundError(err) { + t.Fatalf("expected not found for 404 text") + } + }) +} diff --git a/pkg/mcp/pftools/manager.go b/pkg/mcp/pftools/manager.go index a509c75..e150319 100644 --- a/pkg/mcp/pftools/manager.go +++ b/pkg/mcp/pftools/manager.go @@ -157,7 +157,7 @@ func (m *PulsarFunctionManager) updateFunctions() { log.Printf("Failed to get functions list: %v", err) // Check if this is a cluster health error and invoke callback if configured - if (IsClusterUnhealthy(err) || IsAuthError(err)) && m.clusterErrorHandler != nil { + if (IsClusterUnhealthy(err) || IsAuthError(err) || IsNotFoundError(err)) && m.clusterErrorHandler != nil { go m.clusterErrorHandler(m, err) } return