From 1844ec7d2cf05f0bacf2f633d15dd24a4b7fe629 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Mon, 12 Jan 2026 12:42:56 +0800 Subject: [PATCH] chore: upgrade golangci-lint to v2 and modernize codebase --- .github/workflows/lint.yaml | 4 +- .golangci.yml | 19 ++-- cmd/streamnative-mcp-server/main.go | 1 + go.mod | 2 +- go.sum | 3 +- pkg/auth/auth.go | 11 ++- pkg/auth/authorization_tokenretriever.go | 7 +- pkg/auth/cache/cache.go | 2 + pkg/auth/client_credentials_flow.go | 5 ++ pkg/auth/client_credentials_provider.go | 17 +++- pkg/auth/oidc_endpoint_provider.go | 2 +- pkg/auth/store/keyring.go | 7 ++ pkg/cmd/mcp/mcp.go | 8 +- pkg/cmd/mcp/server.go | 4 +- pkg/cmd/mcp/sse.go | 26 +++--- pkg/cmd/mcp/stdio.go | 4 +- pkg/common/utils.go | 18 +++- pkg/config/apiclient.go | 1 + pkg/config/auth.go | 8 +- pkg/config/config.go | 12 ++- pkg/config/external_kafka.go | 1 + pkg/config/external_pulsar.go | 1 + pkg/config/options.go | 99 ++++++++++++--------- pkg/kafka/connection.go | 10 ++- pkg/kafka/kafkaconnect.go | 14 +-- pkg/log/io.go | 1 + pkg/mcp/auth_utils.go | 2 + pkg/mcp/builders/base.go | 1 + pkg/mcp/builders/kafka/connect.go | 1 + pkg/mcp/builders/pulsar/brokers.go | 1 + pkg/mcp/builders/pulsar/consume.go | 7 +- pkg/mcp/builders/pulsar/schema.go | 3 +- pkg/mcp/builders/registry_test.go | 54 ++++------- pkg/mcp/features.go | 2 + pkg/mcp/instructions.go | 3 + pkg/mcp/internal/context/ctx.go | 4 +- pkg/mcp/kafka_admin_connect_tools.go | 1 + pkg/mcp/kafka_admin_groups_tools.go | 1 + pkg/mcp/kafka_admin_partitions_tools.go | 1 + pkg/mcp/kafka_admin_sr_tools.go | 1 + pkg/mcp/kafka_admin_topics_tools.go | 1 + pkg/mcp/pftools/circuit_breaker.go | 1 + pkg/mcp/pftools/errors.go | 5 +- pkg/mcp/pftools/manager.go | 7 +- pkg/mcp/pftools/schema.go | 2 + pkg/mcp/pftools/types.go | 8 ++ pkg/mcp/prompts.go | 11 ++- pkg/mcp/pulsar_admin_packages_tools.go | 19 ++-- pkg/mcp/pulsar_admin_tenant_tools.go | 1 + pkg/mcp/pulsar_functions_as_tools.go | 2 + pkg/mcp/server.go | 4 + pkg/mcp/session/context.go | 1 + pkg/mcp/sncontext_tools.go | 1 + pkg/mcp/sncontext_utils.go | 20 +++-- pkg/mcp/streamnative_resources_log_tools.go | 6 +- pkg/mcp/streamnative_resources_tools.go | 15 ++-- pkg/pulsar/connection.go | 10 ++- pkg/schema/avro.go | 6 ++ pkg/schema/avro_core.go | 2 +- pkg/schema/avro_test.go | 5 +- pkg/schema/boolean.go | 3 + pkg/schema/common.go | 2 +- pkg/schema/converter.go | 4 + pkg/schema/number.go | 3 + pkg/schema/string.go | 3 + 65 files changed, 335 insertions(+), 176 deletions(-) diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index a37813e..7b2451f 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -24,7 +24,7 @@ jobs: go mod verify go mod download - LINT_VERSION=1.64.8 + LINT_VERSION=2.7.2 curl -fsSL https://github.com/golangci/golangci-lint/releases/download/v${LINT_VERSION}/golangci-lint-${LINT_VERSION}-linux-amd64.tar.gz | \ tar xz --strip-components 1 --wildcards \*/golangci-lint mkdir -p bin && mv golangci-lint bin/ @@ -45,6 +45,6 @@ jobs: assert-nothing-changed go fmt ./... assert-nothing-changed go mod tidy - bin/golangci-lint run --out-format=colored-line-number --timeout=3m || STATUS=$? + bin/golangci-lint run --timeout=3m || STATUS=$? exit $STATUS diff --git a/.golangci.yml b/.golangci.yml index 43e3d62..54324e5 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,3 +1,5 @@ +version: "2" + run: timeout: 5m tests: true @@ -8,13 +10,9 @@ linters: - govet - errcheck - staticcheck - - gofmt - - goimports - revive - ineffassign - - typecheck - unused - - gosimple - misspell - nakedret - bodyclose @@ -22,7 +20,14 @@ linters: - makezero - gosec +formatters: + enable: + - gofmt + - goimports + output: - formats: colored-line-number - print-issued-lines: true - print-linter-name: true + formats: + text: + path: stdout + print-issued-lines: true + print-linter-name: true diff --git a/cmd/streamnative-mcp-server/main.go b/cmd/streamnative-mcp-server/main.go index 42253af..4686310 100644 --- a/cmd/streamnative-mcp-server/main.go +++ b/cmd/streamnative-mcp-server/main.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package main is the entry point for the StreamNative MCP server. package main import ( diff --git a/go.mod b/go.mod index 606d990..3d63a7b 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.24.4 require ( github.com/99designs/keyring v1.2.2 github.com/apache/pulsar-client-go v0.13.1 - github.com/dgrijalva/jwt-go v3.2.0+incompatible + github.com/golang-jwt/jwt v3.2.1+incompatible github.com/google/go-cmp v0.7.0 github.com/hamba/avro/v2 v2.28.0 github.com/mark3labs/mcp-go v0.43.2 diff --git a/go.sum b/go.sum index 8426de5..1156150 100644 --- a/go.sum +++ b/go.sum @@ -48,8 +48,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= -github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= @@ -85,6 +83,7 @@ github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+ github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= diff --git a/pkg/auth/auth.go b/pkg/auth/auth.go index 0938d5a..1bc1b14 100644 --- a/pkg/auth/auth.go +++ b/pkg/auth/auth.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package auth provides authentication and authorization functionality for StreamNative MCP Server. +// It implements OAuth 2.0 flows including client credentials and device authorization grants. package auth import ( @@ -20,12 +22,13 @@ import ( "io" "time" - "github.com/dgrijalva/jwt-go" + "github.com/golang-jwt/jwt" "golang.org/x/oauth2" "k8s.io/utils/clock" ) const ( + // ClaimNameUserName is the JWT claim name for the username. ClaimNameUserName = "https://streamnative.io/username" ) @@ -42,6 +45,7 @@ type AuthorizationGrantRefresher interface { Refresh(grant *AuthorizationGrant) (*AuthorizationGrant, error) } +// AuthorizationGrantType defines the supported OAuth2 grant types. type AuthorizationGrantType string const ( @@ -139,17 +143,18 @@ func ExtractUserName(token oauth2.Token) (string, error) { return "", fmt.Errorf("access token doesn't contain a recognizable user claim") } +// DumpToken outputs token information to the provided writer for debugging. func DumpToken(out io.Writer, token oauth2.Token) { p := jwt.Parser{} claims := jwt.MapClaims{} if _, _, err := p.ParseUnverified(token.AccessToken, claims); err != nil { - fmt.Fprintf(out, "Unable to parse token. Err: %v\n", err) + _, _ = fmt.Fprintf(out, "Unable to parse token. Err: %v\n", err) return } text, err := json.MarshalIndent(claims, "", " ") if err != nil { - fmt.Fprintf(out, "Unable to print token. Err: %v\n", err) + _, _ = fmt.Fprintf(out, "Unable to print token. Err: %v\n", err) } _, _ = out.Write(text) _, _ = fmt.Fprintln(out, "") diff --git a/pkg/auth/authorization_tokenretriever.go b/pkg/auth/authorization_tokenretriever.go index 99c2353..730e9ff 100644 --- a/pkg/auth/authorization_tokenretriever.go +++ b/pkg/auth/authorization_tokenretriever.go @@ -82,6 +82,7 @@ type TokenErrorResponse struct { ErrorDescription string `json:"error_description"` } +// TokenError represents an error response from the token endpoint. type TokenError struct { ErrorCode string ErrorDescription string @@ -222,6 +223,7 @@ func (ce *TokenRetriever) ExchangeCode(req AuthorizationCodeExchangeRequest) (*T if err != nil { return nil, err } + defer func() { _ = response.Body.Close() }() return ce.handleAuthTokensResponse(response) } @@ -230,7 +232,7 @@ func (ce *TokenRetriever) ExchangeCode(req AuthorizationCodeExchangeRequest) (*T // auth tokens for errors and parsing the raw body to a TokenResult struct func (ce *TokenRetriever) handleAuthTokensResponse(resp *http.Response) (*TokenResult, error) { if resp.Body != nil { - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() } if resp.StatusCode < 200 || resp.StatusCode > 299 { @@ -272,6 +274,7 @@ func (ce *TokenRetriever) ExchangeDeviceCode(ctx context.Context, req DeviceCode if err != nil { return nil, err } + defer func() { _ = response.Body.Close() }() token, err := ce.handleAuthTokensResponse(response) if err == nil { return token, nil @@ -314,6 +317,7 @@ func (ce *TokenRetriever) ExchangeRefreshToken(req RefreshTokenExchangeRequest) if err != nil { return nil, err } + defer func() { _ = response.Body.Close() }() return ce.handleAuthTokensResponse(response) } @@ -330,6 +334,7 @@ func (ce *TokenRetriever) ExchangeClientCredentials(req ClientCredentialsExchang if err != nil { return nil, err } + defer func() { _ = response.Body.Close() }() return ce.handleAuthTokensResponse(response) } diff --git a/pkg/auth/cache/cache.go b/pkg/auth/cache/cache.go index 70bc133..6e56dda 100644 --- a/pkg/auth/cache/cache.go +++ b/pkg/auth/cache/cache.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package cache provides cached token sources for authentication flows. package cache import ( @@ -51,6 +52,7 @@ type tokenCache struct { token *oauth2.Token } +// NewDefaultTokenCache creates a default token cache with the given store and refresher. func NewDefaultTokenCache(store store.Store, audience string, refresher auth.AuthorizationGrantRefresher) (CachingTokenSource, error) { cache := &tokenCache{ diff --git a/pkg/auth/client_credentials_flow.go b/pkg/auth/client_credentials_flow.go index 7de8bd4..39704e8 100644 --- a/pkg/auth/client_credentials_flow.go +++ b/pkg/auth/client_credentials_flow.go @@ -41,6 +41,7 @@ type ClientCredentialsExchanger interface { ExchangeClientCredentials(req ClientCredentialsExchangeRequest) (*TokenResult, error) } +// NewClientCredentialsFlow creates a new client credentials flow with the given components. func NewClientCredentialsFlow( issuerData Issuer, provider ClientCredentialsProvider, @@ -98,6 +99,7 @@ func NewDefaultClientCredentialsFlowWithKeyFileStruct(issuerData Issuer, keyFile var _ Flow = &ClientCredentialsFlow{} +// Authorize requests an authorization grant using the client credentials flow. func (c *ClientCredentialsFlow) Authorize() (*AuthorizationGrant, error) { keyFile, err := c.provider.GetClientCredentials() if err != nil { @@ -121,12 +123,14 @@ func (c *ClientCredentialsFlow) Authorize() (*AuthorizationGrant, error) { return grant, nil } +// ClientCredentialsGrantRefresher refreshes client-credentials grants using the token endpoint. type ClientCredentialsGrantRefresher struct { issuerData Issuer exchanger ClientCredentialsExchanger clock clock.Clock } +// NewDefaultClientCredentialsGrantRefresher creates a default client credentials grant refresher. func NewDefaultClientCredentialsGrantRefresher(issuerData Issuer, clock clock.Clock) (*ClientCredentialsGrantRefresher, error) { wellKnownEndpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(issuerData.IssuerEndpoint) @@ -147,6 +151,7 @@ func NewDefaultClientCredentialsGrantRefresher(issuerData Issuer, var _ AuthorizationGrantRefresher = &ClientCredentialsGrantRefresher{} +// Refresh exchanges the client credentials for a fresh authorization grant. func (g *ClientCredentialsGrantRefresher) Refresh(grant *AuthorizationGrant) (*AuthorizationGrant, error) { if grant.Type != GrantTypeClientCredentials { return nil, errors.New("unsupported grant type") diff --git a/pkg/auth/client_credentials_provider.go b/pkg/auth/client_credentials_provider.go index 2ab652d..f562e48 100644 --- a/pkg/auth/client_credentials_provider.go +++ b/pkg/auth/client_credentials_provider.go @@ -18,19 +18,25 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "strings" ) const ( + // KeyFileTypeServiceAccount identifies service account key files. KeyFileTypeServiceAccount = "sn_service_account" - FILE = "file://" - DATA = "data://" + // FILE indicates a file:// key file reference. + FILE = "file://" + // DATA indicates a data:// inline key file reference. + DATA = "data://" ) +// KeyFileProvider provides client credentials from a key file path. type KeyFileProvider struct { KeyFile string } +// KeyFile holds service account credentials from a JSON key file. type KeyFile struct { Type string `json:"type"` ClientID string `json:"client_id"` @@ -38,6 +44,7 @@ type KeyFile struct { ClientEmail string `json:"client_email"` } +// NewClientCredentialsProviderFromKeyFile creates a provider from a key file path. func NewClientCredentialsProviderFromKeyFile(keyFile string) *KeyFileProvider { return &KeyFileProvider{ KeyFile: keyFile, @@ -46,13 +53,14 @@ func NewClientCredentialsProviderFromKeyFile(keyFile string) *KeyFileProvider { var _ ClientCredentialsProvider = &KeyFileProvider{} +// GetClientCredentials loads client credentials from the configured key file source. func (k *KeyFileProvider) GetClientCredentials() (*KeyFile, error) { var keyFile []byte var err error switch { case strings.HasPrefix(k.KeyFile, FILE): filename := strings.TrimPrefix(k.KeyFile, FILE) - keyFile, err = os.ReadFile(filename) + keyFile, err = os.ReadFile(filepath.Clean(filename)) case strings.HasPrefix(k.KeyFile, DATA): keyFile = []byte(strings.TrimPrefix(k.KeyFile, DATA)) case strings.HasPrefix(k.KeyFile, "data:"): @@ -80,10 +88,12 @@ func (k *KeyFileProvider) GetClientCredentials() (*KeyFile, error) { return &v, nil } +// KeyFileStructProvider provides client credentials from an in-memory KeyFile struct. type KeyFileStructProvider struct { KeyFile *KeyFile } +// GetClientCredentials returns the client credentials from the in-memory KeyFile. func (k *KeyFileStructProvider) GetClientCredentials() (*KeyFile, error) { if k.KeyFile == nil { return nil, fmt.Errorf("key file is nil") @@ -91,6 +101,7 @@ func (k *KeyFileStructProvider) GetClientCredentials() (*KeyFile, error) { return k.KeyFile, nil } +// NewClientCredentialsProviderFromKeyFileStruct creates a provider from an in-memory KeyFile. func NewClientCredentialsProviderFromKeyFileStruct(keyFile *KeyFile) *KeyFileStructProvider { return &KeyFileStructProvider{ KeyFile: keyFile, diff --git a/pkg/auth/oidc_endpoint_provider.go b/pkg/auth/oidc_endpoint_provider.go index 69c9ef1..62ecf1d 100644 --- a/pkg/auth/oidc_endpoint_provider.go +++ b/pkg/auth/oidc_endpoint_provider.go @@ -43,7 +43,7 @@ func GetOIDCWellKnownEndpointsFromIssuerURL(issuerURL string) (*OIDCWellKnownEnd if err != nil { return nil, errors.Wrapf(err, "could not get well known endpoints from url %s", u.String()) } - defer r.Body.Close() + defer func() { _ = r.Body.Close() }() var wkEndpoints OIDCWellKnownEndpoints err = json.NewDecoder(r.Body).Decode(&wkEndpoints) diff --git a/pkg/auth/store/keyring.go b/pkg/auth/store/keyring.go index ae09e2d..f2c47f2 100644 --- a/pkg/auth/store/keyring.go +++ b/pkg/auth/store/keyring.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package store provides token storage implementations for authentication credentials. +// It includes a KeyringStore implementation that uses the system keyring for secure storage. package store import ( @@ -25,6 +27,7 @@ import ( "k8s.io/utils/clock" ) +// KeyringStore provides secure token storage using the system keyring. type KeyringStore struct { kr keyring.Keyring clock clock.Clock @@ -48,6 +51,7 @@ func NewKeyringStore(kr keyring.Keyring) (*KeyringStore, error) { var _ Store = &KeyringStore{} +// SaveGrant saves an authorization grant to the keyring. func (f *KeyringStore) SaveGrant(audience string, grant auth.AuthorizationGrant) error { f.lock.Lock() defer f.lock.Unlock() @@ -75,6 +79,7 @@ func (f *KeyringStore) SaveGrant(audience string, grant auth.AuthorizationGrant) return nil } +// LoadGrant loads an authorization grant from the keyring. func (f *KeyringStore) LoadGrant(audience string) (*auth.AuthorizationGrant, error) { f.lock.Lock() defer f.lock.Unlock() @@ -97,6 +102,7 @@ func (f *KeyringStore) LoadGrant(audience string) (*auth.AuthorizationGrant, err return &item.Grant, nil } +// WhoAmI returns the username associated with the grant for the given audience. func (f *KeyringStore) WhoAmI(audience string) (string, error) { f.lock.Lock() defer f.lock.Unlock() @@ -132,6 +138,7 @@ func (f *KeyringStore) WhoAmI(audience string) (string, error) { return label, err } +// Logout removes all stored grants from the keyring. func (f *KeyringStore) Logout() error { f.lock.Lock() defer f.lock.Unlock() diff --git a/pkg/cmd/mcp/mcp.go b/pkg/cmd/mcp/mcp.go index 0080f7c..f457cb0 100644 --- a/pkg/cmd/mcp/mcp.go +++ b/pkg/cmd/mcp/mcp.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package mcp provides CLI commands for the MCP server. package mcp import ( @@ -38,18 +39,20 @@ type ServerOptions struct { *config.Options } +// NewMcpServerOptions creates ServerOptions with the provided config. func NewMcpServerOptions(configOpts *config.Options) *ServerOptions { return &ServerOptions{ Options: configOpts, } } +// Complete finalizes server options after loading configuration. func (o *ServerOptions) Complete() error { if err := o.Options.Complete(); err != nil { return err } - snConfig := o.Options.LoadConfigOrDie() + snConfig := o.LoadConfigOrDie() // If the key file is provided, use it to authenticate to StreamNative Cloud switch { @@ -68,7 +71,7 @@ func (o *ServerOptions) Complete() error { } // persist the authorization data - if err = o.Options.SaveGrant(issuer.Audience, *grant); err != nil { + if err = o.SaveGrant(issuer.Audience, *grant); err != nil { return errors.Wrap(err, "Unable to store the authorization data") } @@ -107,6 +110,7 @@ func (o *ServerOptions) Complete() error { return nil } +// AddFlags registers command flags for the server options. func (o *ServerOptions) AddFlags(cmd *cobra.Command) { cmd.PersistentFlags().BoolVarP(&o.ReadOnly, "read-only", "r", false, "Read-only mode") cmd.PersistentFlags().StringVar(&o.LogFile, "log-file", "", "Path to log file") diff --git a/pkg/cmd/mcp/server.go b/pkg/cmd/mcp/server.go index 5de74c8..1e37132 100644 --- a/pkg/cmd/mcp/server.go +++ b/pkg/cmd/mcp/server.go @@ -29,14 +29,14 @@ import ( ) func newMcpServer(_ context.Context, configOpts *ServerOptions, logrusLogger *logrus.Logger) (*mcp.Server, error) { - snConfig := configOpts.Options.LoadConfigOrDie() + snConfig := configOpts.LoadConfigOrDie() var s *server.MCPServer var mcpServer *mcp.Server switch { case snConfig.KeyFile != "": { issuer := snConfig.Auth.Issuer() - userName, err := configOpts.Options.WhoAmI(issuer.Audience) + userName, err := configOpts.WhoAmI(issuer.Audience) if err != nil { stdlog.Fatalf("failed to get user name: %v", err) os.Exit(1) diff --git a/pkg/cmd/mcp/sse.go b/pkg/cmd/mcp/sse.go index 2620f3a..769f27b 100644 --- a/pkg/cmd/mcp/sse.go +++ b/pkg/cmd/mcp/sse.go @@ -29,12 +29,12 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/streamnative/streamnative-mcp-server/pkg/common" - "github.com/streamnative/streamnative-mcp-server/pkg/mcp" - context2 "github.com/streamnative/streamnative-mcp-server/pkg/mcp" + mcpctx "github.com/streamnative/streamnative-mcp-server/pkg/mcp" "github.com/streamnative/streamnative-mcp-server/pkg/mcp/session" "github.com/streamnative/streamnative-mcp-server/pkg/pulsar" ) +// NewCmdMcpSseServer builds the SSE server command. func NewCmdMcpSseServer(configOpts *ServerOptions) *cobra.Command { sseCmd := &cobra.Command{ Use: "sse", @@ -76,12 +76,12 @@ func runSseServer(configOpts *ServerOptions) error { } // 4. Set the context - ctx = context2.WithSNCloudSession(ctx, mcpServer.SNCloudSession) - ctx = context2.WithPulsarSession(ctx, mcpServer.PulsarSession) - ctx = context2.WithKafkaSession(ctx, mcpServer.KafkaSession) - if configOpts.Options.KeyFile != "" { - if configOpts.Options.PulsarInstance != "" && configOpts.Options.PulsarCluster != "" { - err = mcp.SetContext(ctx, configOpts.Options, configOpts.Options.PulsarInstance, configOpts.Options.PulsarCluster) + ctx = mcpctx.WithSNCloudSession(ctx, mcpServer.SNCloudSession) + ctx = mcpctx.WithPulsarSession(ctx, mcpServer.PulsarSession) + ctx = mcpctx.WithKafkaSession(ctx, mcpServer.KafkaSession) + if configOpts.KeyFile != "" { + if configOpts.PulsarInstance != "" && configOpts.PulsarCluster != "" { + err = mcpctx.SetContext(ctx, configOpts.Options, configOpts.PulsarInstance, configOpts.PulsarCluster) if err != nil { return errors.Wrap(err, "failed to set StreamNative Cloud context") } @@ -94,7 +94,7 @@ func runSseServer(configOpts *ServerOptions) error { // Create Pulsar session manager for multi-session support (only for external Pulsar mode) var pulsarSessionManager *session.PulsarSessionManager - snConfig := configOpts.Options.LoadConfigOrDie() + snConfig := configOpts.LoadConfigOrDie() if snConfig.ExternalPulsar != nil && configOpts.MultiSessionPulsar { managerConfig := &session.PulsarSessionManagerConfig{ MaxSessions: configOpts.SessionCacheSize, @@ -120,15 +120,15 @@ func runSseServer(configOpts *ServerOptions) error { server.WithStaticBasePath(configOpts.HTTPPath), server.WithSSEContextFunc(func(ctx context.Context, r *http.Request) context.Context { c := context.WithValue(ctx, common.OptionsKey, configOpts.Options) - c = context2.WithKafkaSession(c, mcpServer.KafkaSession) - c = context2.WithSNCloudSession(c, mcpServer.SNCloudSession) + c = mcpctx.WithKafkaSession(c, mcpServer.KafkaSession) + c = mcpctx.WithSNCloudSession(c, mcpServer.SNCloudSession) // Handle per-user Pulsar sessions if pulsarSessionManager != nil { token := session.ExtractBearerToken(r) // Token is already validated in auth middleware, this should always succeed if pulsarSession, err := pulsarSessionManager.GetOrCreateSession(ctx, token); err == nil { - c = context2.WithPulsarSession(c, pulsarSession) + c = mcpctx.WithPulsarSession(c, pulsarSession) if token != "" { c = session.WithUserTokenHash(c, pulsarSessionManager.HashTokenForLog(token)) } @@ -138,7 +138,7 @@ func runSseServer(configOpts *ServerOptions) error { // Don't set PulsarSession - tool handlers will fail gracefully with "session not found" } } else { - c = context2.WithPulsarSession(c, mcpServer.PulsarSession) + c = mcpctx.WithPulsarSession(c, mcpServer.PulsarSession) } return c diff --git a/pkg/cmd/mcp/stdio.go b/pkg/cmd/mcp/stdio.go index b06ecb6..e13232e 100644 --- a/pkg/cmd/mcp/stdio.go +++ b/pkg/cmd/mcp/stdio.go @@ -20,6 +20,7 @@ import ( "io" "os" "os/signal" + "path/filepath" "syscall" stdlog "log" @@ -31,6 +32,7 @@ import ( "github.com/streamnative/streamnative-mcp-server/pkg/log" ) +// NewCmdMcpStdioServer builds the stdio server command. func NewCmdMcpStdioServer(configOpts *ServerOptions) *cobra.Command { stdioCmd := &cobra.Command{ Use: "stdio", @@ -110,7 +112,7 @@ func initLogger(filePath string) (*logrus.Logger, error) { return logrus.New(), nil } - fd, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + fd, err := os.OpenFile(filepath.Clean(filePath), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) if err != nil { return nil, fmt.Errorf("failed to open log file: %w", err) } diff --git a/pkg/common/utils.go b/pkg/common/utils.go index 3d9d080..a119f75 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -package common +// Package common provides shared helpers for StreamNative MCP Server. +package common //nolint:revive import ( "context" @@ -25,8 +26,10 @@ import ( sncloud "github.com/streamnative/streamnative-mcp-server/sdk/sdk-apiserver" ) +// ContextKey defines typed keys for context values. type ContextKey string +// Common constants used for context keys and token handling. const ( OptionsKey ContextKey = "snmcp-options" AnnotationStreamNativeCloudEngine = "cloud.streamnative.io/engine" @@ -73,7 +76,7 @@ func RequiredParam[T comparable](arguments map[string]interface{}, p string) (T, return arguments[p].(T), nil } -// Helper function to get an optional parameter from the request +// OptionalParam returns an optional parameter from the request. func OptionalParam[T any](arguments map[string]interface{}, paramName string) (T, bool) { var empty T param, ok := arguments[paramName] @@ -89,7 +92,7 @@ func OptionalParam[T any](arguments map[string]interface{}, paramName string) (T return value, true } -// Helper function to get a required array parameter from the request +// RequiredParamArray returns a required array parameter from the request. func RequiredParamArray[T any](arguments map[string]interface{}, paramName string) ([]T, error) { var empty []T param, ok := arguments[paramName] @@ -114,6 +117,7 @@ func RequiredParamArray[T any](arguments map[string]interface{}, paramName strin return result, nil } +// OptionalParamArray returns an optional array parameter from the request. func OptionalParamArray[T any](arguments map[string]interface{}, paramName string) ([]T, bool) { var empty []T param, ok := arguments[paramName] @@ -161,7 +165,7 @@ func OptionalParamConfigs(arguments map[string]interface{}, paramName string) ([ return result, true } -// RequiredParamObject gets a required object parameter from the request +// RequiredParamObject returns a required object parameter from the request. func RequiredParamObject(arguments map[string]interface{}, name string) (map[string]interface{}, error) { // Get the parameter value paramValue, found := arguments[name] @@ -177,6 +181,7 @@ func RequiredParamObject(arguments map[string]interface{}, name string) (map[str return nil, fmt.Errorf("%s parameter must be an object", name) } +// OptionalParamObject returns an optional object parameter from the request. func OptionalParamObject(arguments map[string]interface{}, name string) (map[string]interface{}, bool) { paramValue, found := arguments[name] if !found || paramValue == nil { @@ -190,6 +195,7 @@ func OptionalParamObject(arguments map[string]interface{}, name string) (map[str return nil, false } +// GetOptions extracts the config options from the context. func GetOptions(ctx context.Context) *config.Options { return ctx.Value(OptionsKey).(*config.Options) } @@ -220,6 +226,7 @@ func GetEngineType(cluster sncloud.ComGithubStreamnativeCloudApiServerPkgApisClo return "classic" } +// ConvertToMapInterface converts a map of strings to a map of interface values. func ConvertToMapInterface(m map[string]string) map[string]interface{} { result := make(map[string]interface{}) for k, v := range m { @@ -228,6 +235,7 @@ func ConvertToMapInterface(m map[string]string) map[string]interface{} { return result } +// ConvertToMapString converts a map of interface values to a map of strings. func ConvertToMapString(m map[string]interface{}) map[string]string { result := make(map[string]string) for k, v := range m { @@ -244,6 +252,7 @@ func IsInstanceValid(instance sncloud.ComGithubStreamnativeCloudApiServerPkgApis instance.Status.Auth.Oauth2.Audience != "" } +// HasCachedValidToken returns whether the cached grant has a valid token. func HasCachedValidToken(cachedGrant *auth.AuthorizationGrant) (bool, error) { if cachedGrant == nil || cachedGrant.Token == nil { return false, nil @@ -253,6 +262,7 @@ func HasCachedValidToken(cachedGrant *auth.AuthorizationGrant) (bool, error) { return cachedGrant.Token.Valid(), nil } +// IsTokenAboutToExpire reports whether the token expires within the window. func IsTokenAboutToExpire(cachedGrant *auth.AuthorizationGrant, window time.Duration) (bool, error) { if cachedGrant == nil || cachedGrant.Token == nil { return true, nil diff --git a/pkg/config/apiclient.go b/pkg/config/apiclient.go index 7f122d2..8393fc8 100644 --- a/pkg/config/apiclient.go +++ b/pkg/config/apiclient.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package config provides configuration loading and client setup helpers. package config import ( diff --git a/pkg/config/auth.go b/pkg/config/auth.go index e605a50..c7656b9 100644 --- a/pkg/config/auth.go +++ b/pkg/config/auth.go @@ -24,10 +24,13 @@ import ( ) const ( - ServiceName = "StreamNativeMCP" + // ServiceName is the name used for keyring service. + ServiceName = "StreamNativeMCP" + // KeychainName is the name of the macOS keychain. KeychainName = "snmcp" ) +// AuthOptions provides configuration options for authentication. type AuthOptions struct { BackendOverride string storage Storage @@ -37,15 +40,18 @@ type AuthOptions struct { store.Store } +// NewDefaultAuthOptions creates a new AuthOptions with default values. func NewDefaultAuthOptions() AuthOptions { return AuthOptions{} } +// AddFlags registers authentication flags on the command. func (o *AuthOptions) AddFlags(cmd *cobra.Command) { cmd.PersistentFlags().StringVar(&o.BackendOverride, "keyring-backend", "", "If present, the backend to use") } +// Complete initializes the auth backend using the provided storage. func (o *AuthOptions) Complete(storage Storage) error { o.storage = storage kr, err := o.makeKeyring() diff --git a/pkg/config/config.go b/pkg/config/config.go index 2084e0f..e21de1d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -21,6 +21,7 @@ import ( "github.com/streamnative/streamnative-mcp-server/pkg/auth" ) +// SnConfig holds the StreamNative MCP Server configuration. type SnConfig struct { // the API server endpoint Server string `yaml:"server"` @@ -40,6 +41,7 @@ type SnConfig struct { ExternalPulsar *ExternalPulsar `yaml:"external-pulsar"` } +// Auth holds authentication configuration for the StreamNative API. type Auth struct { // the OAuth 2.0 issuer endpoint IssuerEndpoint string `yaml:"issuer"` @@ -49,11 +51,12 @@ type Auth struct { ClientID string `yaml:"client-id"` } +// Validate validates the auth configuration fields. func (a *Auth) Validate() error { - if !(isValidIssuer(a.IssuerEndpoint) && isValidClientID(a.ClientID) && isValidAudience(a.Audience)) { - return errors.New("configuration error: auth section is incomplete or invalid") + if isValidIssuer(a.IssuerEndpoint) && isValidClientID(a.ClientID) && isValidAudience(a.Audience) { + return nil } - return nil + return errors.New("configuration error: auth section is incomplete or invalid") } func isValidIssuer(iss string) bool { @@ -69,6 +72,7 @@ func isValidAudience(aud string) bool { return aud != "" } +// Issuer builds an auth.Issuer from the configuration. func (a *Auth) Issuer() auth.Issuer { return auth.Issuer{ IssuerEndpoint: a.IssuerEndpoint, @@ -77,12 +81,14 @@ func (a *Auth) Issuer() auth.Issuer { } } +// Context holds the default context for cluster connections. type Context struct { Organization string `yaml:"organization,omitempty"` PulsarInstance string `yaml:"pulsar-instance,omitempty"` PulsarCluster string `yaml:"pulsar-cluster,omitempty"` } +// Storage defines the interface for persisting configuration and credentials. type Storage interface { // Gets the config directory for configuration files, credentials and caches GetConfigDirectory() string diff --git a/pkg/config/external_kafka.go b/pkg/config/external_kafka.go index 19d73ec..a87c9bb 100644 --- a/pkg/config/external_kafka.go +++ b/pkg/config/external_kafka.go @@ -14,6 +14,7 @@ package config +// ExternalKafka holds connection settings for an external Kafka cluster. type ExternalKafka struct { BootstrapServers string AuthType string diff --git a/pkg/config/external_pulsar.go b/pkg/config/external_pulsar.go index 19487dd..0e9ff32 100644 --- a/pkg/config/external_pulsar.go +++ b/pkg/config/external_pulsar.go @@ -14,6 +14,7 @@ package config +// ExternalPulsar holds connection settings for an external Pulsar cluster. type ExternalPulsar struct { ServiceURL string WebServiceURL string diff --git a/pkg/config/options.go b/pkg/config/options.go index bd5cbcc..c018512 100644 --- a/pkg/config/options.go +++ b/pkg/config/options.go @@ -27,15 +27,22 @@ import ( ) const ( - EnvConfigDir = "SNMCP_CONFIG_DIR" - GlobalDefaultIssuer = "https://auth.streamnative.cloud/" - GlobalDefaultClientID = "AJYEdHWi9EFekEaUXkPWA2MqQ3lq1NrI" - GlobalDefaultAudience = "https://api.streamnative.cloud" - GlobalDefaultAPIServer = "https://api.streamnative.cloud" + // EnvConfigDir overrides the default config directory. + EnvConfigDir = "SNMCP_CONFIG_DIR" + // GlobalDefaultIssuer is the default OAuth2 issuer. + GlobalDefaultIssuer = "https://auth.streamnative.cloud/" + // GlobalDefaultClientID is the default OAuth2 client ID. + GlobalDefaultClientID = "AJYEdHWi9EFekEaUXkPWA2MqQ3lq1NrI" + // GlobalDefaultAudience is the default OAuth2 audience. + GlobalDefaultAudience = "https://api.streamnative.cloud" + // GlobalDefaultAPIServer is the default API server URL. + GlobalDefaultAPIServer = "https://api.streamnative.cloud" + // GlobalDefaultProxyLocation is the default proxy URL. GlobalDefaultProxyLocation = "https://proxy.streamnative.cloud" - GlobalDefaultLogLocation = "https://log.streamnative.cloud" + // GlobalDefaultLogLocation is the default log API URL. + GlobalDefaultLogLocation = "https://log.streamnative.cloud" - // Environment variable prefix + // EnvPrefix is the environment variable prefix. EnvPrefix = "SNMCP" ) @@ -162,42 +169,42 @@ func (o *Options) AddFlags(cmd *cobra.Command) { o.AuthOptions.AddFlags(cmd) // Bind command line flags to viper - viper.BindPFlag("config-dir", cmd.PersistentFlags().Lookup("config-dir")) - viper.BindPFlag("key-file", cmd.PersistentFlags().Lookup("key-file")) - viper.BindPFlag("server", cmd.PersistentFlags().Lookup("server")) - viper.BindPFlag("issuer", cmd.PersistentFlags().Lookup("issuer")) - viper.BindPFlag("audience", cmd.PersistentFlags().Lookup("audience")) - viper.BindPFlag("client-id", cmd.PersistentFlags().Lookup("client-id")) - viper.BindPFlag("organization", cmd.PersistentFlags().Lookup("organization")) - viper.BindPFlag("proxy-location", cmd.PersistentFlags().Lookup("proxy-location")) - viper.BindPFlag("log-location", cmd.PersistentFlags().Lookup("log-location")) - viper.BindPFlag("pulsar-instance", cmd.PersistentFlags().Lookup("pulsar-instance")) - viper.BindPFlag("pulsar-cluster", cmd.PersistentFlags().Lookup("pulsar-cluster")) - viper.BindPFlag("use-external-kafka", cmd.PersistentFlags().Lookup("use-external-kafka")) - viper.BindPFlag("use-external-pulsar", cmd.PersistentFlags().Lookup("use-external-pulsar")) - viper.BindPFlag("kafka-bootstrap-servers", cmd.PersistentFlags().Lookup("kafka-bootstrap-servers")) - viper.BindPFlag("kafka-schema-registry-url", cmd.PersistentFlags().Lookup("kafka-schema-registry-url")) - viper.BindPFlag("kafka-auth-type", cmd.PersistentFlags().Lookup("kafka-auth-type")) - viper.BindPFlag("kafka-auth-mechanism", cmd.PersistentFlags().Lookup("kafka-auth-mechanism")) - viper.BindPFlag("kafka-auth-user", cmd.PersistentFlags().Lookup("kafka-auth-user")) - viper.BindPFlag("kafka-auth-pass", cmd.PersistentFlags().Lookup("kafka-auth-pass")) - viper.BindPFlag("kafka-use-tls", cmd.PersistentFlags().Lookup("kafka-use-tls")) - viper.BindPFlag("kafka-client-key-file", cmd.PersistentFlags().Lookup("kafka-client-key-file")) - viper.BindPFlag("kafka-client-cert-file", cmd.PersistentFlags().Lookup("kafka-client-cert-file")) - viper.BindPFlag("kafka-ca-file", cmd.PersistentFlags().Lookup("kafka-ca-file")) - viper.BindPFlag("kafka-schema-registry-auth-user", cmd.PersistentFlags().Lookup("kafka-schema-registry-auth-user")) - viper.BindPFlag("kafka-schema-registry-auth-pass", cmd.PersistentFlags().Lookup("kafka-schema-registry-auth-pass")) - viper.BindPFlag("kafka-schema-registry-bearer-token", cmd.PersistentFlags().Lookup("kafka-schema-registry-bearer-token")) - viper.BindPFlag("pulsar-web-service-url", cmd.PersistentFlags().Lookup("pulsar-web-service-url")) - viper.BindPFlag("pulsar-service-url", cmd.PersistentFlags().Lookup("pulsar-service-url")) - viper.BindPFlag("pulsar-auth-plugin", cmd.PersistentFlags().Lookup("pulsar-auth-plugin")) - viper.BindPFlag("pulsar-auth-params", cmd.PersistentFlags().Lookup("pulsar-auth-params")) - viper.BindPFlag("pulsar-tls-allow-insecure-connection", cmd.PersistentFlags().Lookup("pulsar-tls-allow-insecure-connection")) - viper.BindPFlag("pulsar-tls-enable-hostname-verification", cmd.PersistentFlags().Lookup("pulsar-tls-enable-hostname-verification")) - viper.BindPFlag("pulsar-tls-trust-certs-file-path", cmd.PersistentFlags().Lookup("pulsar-tls-trust-certs-file-path")) - viper.BindPFlag("pulsar-tls-cert-file", cmd.PersistentFlags().Lookup("pulsar-tls-cert-file")) - viper.BindPFlag("pulsar-tls-key-file", cmd.PersistentFlags().Lookup("pulsar-tls-key-file")) - viper.BindPFlag("pulsar-token", cmd.PersistentFlags().Lookup("pulsar-token")) + _ = viper.BindPFlag("config-dir", cmd.PersistentFlags().Lookup("config-dir")) + _ = viper.BindPFlag("key-file", cmd.PersistentFlags().Lookup("key-file")) + _ = viper.BindPFlag("server", cmd.PersistentFlags().Lookup("server")) + _ = viper.BindPFlag("issuer", cmd.PersistentFlags().Lookup("issuer")) + _ = viper.BindPFlag("audience", cmd.PersistentFlags().Lookup("audience")) + _ = viper.BindPFlag("client-id", cmd.PersistentFlags().Lookup("client-id")) + _ = viper.BindPFlag("organization", cmd.PersistentFlags().Lookup("organization")) + _ = viper.BindPFlag("proxy-location", cmd.PersistentFlags().Lookup("proxy-location")) + _ = viper.BindPFlag("log-location", cmd.PersistentFlags().Lookup("log-location")) + _ = viper.BindPFlag("pulsar-instance", cmd.PersistentFlags().Lookup("pulsar-instance")) + _ = viper.BindPFlag("pulsar-cluster", cmd.PersistentFlags().Lookup("pulsar-cluster")) + _ = viper.BindPFlag("use-external-kafka", cmd.PersistentFlags().Lookup("use-external-kafka")) + _ = viper.BindPFlag("use-external-pulsar", cmd.PersistentFlags().Lookup("use-external-pulsar")) + _ = viper.BindPFlag("kafka-bootstrap-servers", cmd.PersistentFlags().Lookup("kafka-bootstrap-servers")) + _ = viper.BindPFlag("kafka-schema-registry-url", cmd.PersistentFlags().Lookup("kafka-schema-registry-url")) + _ = viper.BindPFlag("kafka-auth-type", cmd.PersistentFlags().Lookup("kafka-auth-type")) + _ = viper.BindPFlag("kafka-auth-mechanism", cmd.PersistentFlags().Lookup("kafka-auth-mechanism")) + _ = viper.BindPFlag("kafka-auth-user", cmd.PersistentFlags().Lookup("kafka-auth-user")) + _ = viper.BindPFlag("kafka-auth-pass", cmd.PersistentFlags().Lookup("kafka-auth-pass")) + _ = viper.BindPFlag("kafka-use-tls", cmd.PersistentFlags().Lookup("kafka-use-tls")) + _ = viper.BindPFlag("kafka-client-key-file", cmd.PersistentFlags().Lookup("kafka-client-key-file")) + _ = viper.BindPFlag("kafka-client-cert-file", cmd.PersistentFlags().Lookup("kafka-client-cert-file")) + _ = viper.BindPFlag("kafka-ca-file", cmd.PersistentFlags().Lookup("kafka-ca-file")) + _ = viper.BindPFlag("kafka-schema-registry-auth-user", cmd.PersistentFlags().Lookup("kafka-schema-registry-auth-user")) + _ = viper.BindPFlag("kafka-schema-registry-auth-pass", cmd.PersistentFlags().Lookup("kafka-schema-registry-auth-pass")) + _ = viper.BindPFlag("kafka-schema-registry-bearer-token", cmd.PersistentFlags().Lookup("kafka-schema-registry-bearer-token")) + _ = viper.BindPFlag("pulsar-web-service-url", cmd.PersistentFlags().Lookup("pulsar-web-service-url")) + _ = viper.BindPFlag("pulsar-service-url", cmd.PersistentFlags().Lookup("pulsar-service-url")) + _ = viper.BindPFlag("pulsar-auth-plugin", cmd.PersistentFlags().Lookup("pulsar-auth-plugin")) + _ = viper.BindPFlag("pulsar-auth-params", cmd.PersistentFlags().Lookup("pulsar-auth-params")) + _ = viper.BindPFlag("pulsar-tls-allow-insecure-connection", cmd.PersistentFlags().Lookup("pulsar-tls-allow-insecure-connection")) + _ = viper.BindPFlag("pulsar-tls-enable-hostname-verification", cmd.PersistentFlags().Lookup("pulsar-tls-enable-hostname-verification")) + _ = viper.BindPFlag("pulsar-tls-trust-certs-file-path", cmd.PersistentFlags().Lookup("pulsar-tls-trust-certs-file-path")) + _ = viper.BindPFlag("pulsar-tls-cert-file", cmd.PersistentFlags().Lookup("pulsar-tls-cert-file")) + _ = viper.BindPFlag("pulsar-tls-key-file", cmd.PersistentFlags().Lookup("pulsar-tls-key-file")) + _ = viper.BindPFlag("pulsar-token", cmd.PersistentFlags().Lookup("pulsar-token")) } // Complete completes options from the provided values @@ -212,7 +219,7 @@ func (o *Options) Complete() error { o.ConfigDir = filepath.Join(home, ".snmcp") } if _, err := os.Stat(o.ConfigDir); os.IsNotExist(err) { - err := os.MkdirAll(o.ConfigDir, 0755) + err := os.MkdirAll(o.ConfigDir, 0750) if err != nil { return fmt.Errorf("failed to create config directory: %w", err) } @@ -405,10 +412,12 @@ func (o *Options) Complete() error { return nil } +// GetConfigDirectory returns the directory used for configuration data. func (o *Options) GetConfigDirectory() string { return o.ConfigDir } +// LoadConfig loads configuration from the current in-memory options. func (o *Options) LoadConfig() (*SnConfig, error) { config := &SnConfig{ Server: o.Server, @@ -435,11 +444,13 @@ func (o *Options) LoadConfig() (*SnConfig, error) { return config, nil } +// LoadConfigOrDie loads configuration and ignores errors. func (o *Options) LoadConfigOrDie() *SnConfig { cfg, _ := o.LoadConfig() return cfg } +// SaveConfig persists configuration to disk. func (o *Options) SaveConfig(config *SnConfig) error { data, err := yaml.Marshal(config) if err != nil { diff --git a/pkg/kafka/connection.go b/pkg/kafka/connection.go index 6db0d48..380a4d2 100644 --- a/pkg/kafka/connection.go +++ b/pkg/kafka/connection.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package kafka provides Kafka connection and client helpers. package kafka import ( @@ -80,12 +81,14 @@ func NewSession(ctx KafkaContext) (*Session, error) { return session, nil } +// SASLConfig holds SASL authentication configuration. type SASLConfig struct { Mechanism string Username string Password string } +// TLSConfig holds TLS configuration for Kafka connections. type TLSConfig struct { Enabled bool ClientKeyFile string @@ -116,7 +119,7 @@ func tlsOpt(config *TLSConfig, opts []kgo.Opt) ([]kgo.Opt, error) { func saslOpt(config *SASLConfig, opts []kgo.Opt) ([]kgo.Opt, error) { if config.Mechanism != "" || config.Username != "" || config.Password != "" { if config.Mechanism == "" || config.Username == "" || config.Password == "" { - return nil, fmt.Errorf("All of Mechanism, Username, and Password must be specified if any are") + return nil, fmt.Errorf("all of Mechanism, Username, and Password must be specified if any are") } method := strings.ToLower(config.Mechanism) method = strings.ReplaceAll(method, "-", "") @@ -144,6 +147,7 @@ func saslOpt(config *SASLConfig, opts []kgo.Opt) ([]kgo.Opt, error) { return opts, nil } +// SetKafkaContext initializes Kafka clients using the provided context. func (s *Session) SetKafkaContext(ctx KafkaContext) error { s.Ctx = ctx kc := &s.Ctx @@ -205,6 +209,7 @@ func (s *Session) SetKafkaContext(ctx KafkaContext) error { return nil } +// GetClient returns a Kafka client with optional overrides. func (s *Session) GetClient(opts ...kgo.Opt) (*kgo.Client, error) { s.mutex.Lock() defer s.mutex.Unlock() @@ -230,6 +235,7 @@ func (s *Session) GetClient(opts ...kgo.Opt) (*kgo.Client, error) { return s.Client, nil } +// GetAdminClient returns the Kafka admin client. func (s *Session) GetAdminClient() (*kadm.Client, error) { s.mutex.Lock() defer s.mutex.Unlock() @@ -248,6 +254,7 @@ func (s *Session) GetAdminClient() (*kadm.Client, error) { return s.AdminClient, nil } +// GetSchemaRegistryClient returns the schema registry client. func (s *Session) GetSchemaRegistryClient() (*sr.Client, error) { if s.Ctx.SchemaRegistryURL == "" { return nil, fmt.Errorf("schema registry not enabled on the current context") @@ -276,6 +283,7 @@ func (s *Session) GetSchemaRegistryClient() (*sr.Client, error) { return s.SchemaRegistryClient, nil } +// GetConnectClient returns the Kafka Connect client. func (s *Session) GetConnectClient() (Connect, error) { if s.Ctx.ConnectURL == "" { return nil, fmt.Errorf("kafka connect not enabled on the current context") diff --git a/pkg/kafka/kafkaconnect.go b/pkg/kafka/kafkaconnect.go index 8819aa2..bd4c467 100644 --- a/pkg/kafka/kafkaconnect.go +++ b/pkg/kafka/kafkaconnect.go @@ -192,7 +192,7 @@ func (c *connectImpl) ListConnectors(_ context.Context) ([]string, error) { if err != nil { return nil, fmt.Errorf("failed to list connectors: %w", err) } - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() // Parse response body body, err := io.ReadAll(resp.Body) @@ -247,7 +247,7 @@ func (c *connectImpl) CreateConnector(_ context.Context, name string, config map if err != nil { return nil, fmt.Errorf("failed to create connector: %w", err) } - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() // Parse response body body, err := io.ReadAll(resp.Body) @@ -298,7 +298,7 @@ func (c *connectImpl) UpdateConnector(_ context.Context, name string, config map if err != nil { return nil, fmt.Errorf("failed to update connector: %w", err) } - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() // Parse response body body, err := io.ReadAll(resp.Body) @@ -393,7 +393,7 @@ func (c *connectImpl) GetConnectorStatus(_ context.Context, name string) (*Conne if err != nil { return nil, fmt.Errorf("failed to get connector status: %w", err) } - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() // Parse response body body, err := io.ReadAll(resp.Body) @@ -448,7 +448,7 @@ func (c *connectImpl) GetConnectorTasks(_ context.Context, name string) ([]TaskI if err != nil { return nil, fmt.Errorf("failed to get connector tasks: %w", err) } - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() // Parse response body body, err := io.ReadAll(resp.Body) @@ -489,7 +489,7 @@ func (c *connectImpl) ListPlugins(_ context.Context) ([]PluginInfo, error) { if err != nil { return nil, fmt.Errorf("failed to list connector plugins: %w", err) } - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() // Parse response body body, err := io.ReadAll(resp.Body) @@ -536,7 +536,7 @@ func (c *connectImpl) ValidateConfig(_ context.Context, pluginClass string, conf if err != nil { return nil, fmt.Errorf("failed to validate connector config: %w", err) } - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() // Parse response body body, err := io.ReadAll(resp.Body) diff --git a/pkg/log/io.go b/pkg/log/io.go index 4c831fe..8ebcb3a 100644 --- a/pkg/log/io.go +++ b/pkg/log/io.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package log provides logging utilities for StreamNative MCP Server. package log import ( diff --git a/pkg/mcp/auth_utils.go b/pkg/mcp/auth_utils.go index a5204f1..319c48d 100644 --- a/pkg/mcp/auth_utils.go +++ b/pkg/mcp/auth_utils.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package mcp contains core MCP server integrations and tools. package mcp import ( @@ -22,6 +23,7 @@ import ( ) const ( + // DefaultPulsarPort is the default Pulsar protocol port. DefaultPulsarPort = 6651 ) diff --git a/pkg/mcp/builders/base.go b/pkg/mcp/builders/base.go index bab722a..61411a8 100644 --- a/pkg/mcp/builders/base.go +++ b/pkg/mcp/builders/base.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package builders provides common interfaces and helpers for MCP tool builders. package builders import ( diff --git a/pkg/mcp/builders/kafka/connect.go b/pkg/mcp/builders/kafka/connect.go index 43d1218..8ba16b8 100644 --- a/pkg/mcp/builders/kafka/connect.go +++ b/pkg/mcp/builders/kafka/connect.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package kafka provides Kafka MCP tool builders. package kafka import ( diff --git a/pkg/mcp/builders/pulsar/brokers.go b/pkg/mcp/builders/pulsar/brokers.go index 233b1fb..6a81fd3 100644 --- a/pkg/mcp/builders/pulsar/brokers.go +++ b/pkg/mcp/builders/pulsar/brokers.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package pulsar provides MCP tool builders for Pulsar admin operations. package pulsar import ( diff --git a/pkg/mcp/builders/pulsar/consume.go b/pkg/mcp/builders/pulsar/consume.go index 52d7763..e625532 100644 --- a/pkg/mcp/builders/pulsar/consume.go +++ b/pkg/mcp/builders/pulsar/consume.go @@ -245,12 +245,7 @@ func (b *PulsarClientConsumeToolBuilder) buildConsumeHandler(_ bool) func(contex messageCount := 0 // Consume messages - for { - // Check if we've consumed the requested number of messages - if numMessages > 0 && messageCount >= numMessages { - break - } - + for numMessages <= 0 || messageCount < numMessages { // Receive message with timeout msg, err := consumer.Receive(consumeCtx) if err != nil { diff --git a/pkg/mcp/builders/pulsar/schema.go b/pkg/mcp/builders/pulsar/schema.go index 8bf4de3..13fc41a 100644 --- a/pkg/mcp/builders/pulsar/schema.go +++ b/pkg/mcp/builders/pulsar/schema.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "strings" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" @@ -264,7 +265,7 @@ func (b *PulsarAdminSchemaToolBuilder) handleSchemaUpload(admin cmdutils.Client, // Read and parse the schema file var payload utils.PostSchemaPayload - file, err := os.ReadFile(filename) + file, err := os.ReadFile(filepath.Clean(filename)) if err != nil { return mcp.NewToolResultError(fmt.Sprintf("Failed to read schema file '%s': %v", filename, err)), nil } diff --git a/pkg/mcp/builders/registry_test.go b/pkg/mcp/builders/registry_test.go index a4de66b..766c266 100644 --- a/pkg/mcp/builders/registry_test.go +++ b/pkg/mcp/builders/registry_test.go @@ -148,8 +148,7 @@ func TestToolRegistry(t *testing.T) { registry := NewToolRegistry() builder := NewMockToolBuilder("panic_tool", []string{"feature"}) - ///nolint:errcheck - registry.Register(builder) // First registration + _ = registry.Register(builder) // First registration assert.Panics(t, func() { registry.MustRegister(builder) // Duplicate registration should panic }) @@ -158,8 +157,7 @@ func TestToolRegistry(t *testing.T) { t.Run("GetBuilder_Success", func(t *testing.T) { registry := NewToolRegistry() builder := NewMockToolBuilder("get_test_tool", []string{"feature"}) - ///nolint:errcheck - registry.Register(builder) + _ = registry.Register(builder) retrieved, exists := registry.GetBuilder("get_test_tool") assert.True(t, exists) @@ -178,10 +176,8 @@ func TestToolRegistry(t *testing.T) { builder1 := NewMockToolBuilder("tool1", []string{"feature1"}) builder2 := NewMockToolBuilder("tool2", []string{"feature2"}) - ///nolint:errcheck - registry.Register(builder1) - ///nolint:errcheck - registry.Register(builder2) + _ = registry.Register(builder1) + _ = registry.Register(builder2) names := registry.ListBuilders() assert.Len(t, names, 2) @@ -194,8 +190,7 @@ func TestToolRegistry(t *testing.T) { t.Run("GetMetadata", func(t *testing.T) { registry := NewToolRegistry() builder := NewMockToolBuilder("metadata_tool", []string{"feature"}) - ///nolint:errcheck - registry.Register(builder) + _ = registry.Register(builder) metadata, exists := registry.GetMetadata("metadata_tool") assert.True(t, exists) @@ -208,10 +203,8 @@ func TestToolRegistry(t *testing.T) { builder1 := NewMockToolBuilder("tool1", []string{"feature1"}) builder2 := NewMockToolBuilder("tool2", []string{"feature2"}) - ///nolint:errcheck - registry.Register(builder1) - ///nolint:errcheck - registry.Register(builder2) + _ = registry.Register(builder1) + _ = registry.Register(builder2) metadata := registry.ListMetadata() assert.Len(t, metadata, 2) @@ -222,8 +215,7 @@ func TestToolRegistry(t *testing.T) { t.Run("BuildSingle_Success", func(t *testing.T) { registry := NewToolRegistry() builder := NewMockToolBuilder("single_tool", []string{"test_feature"}) - ///nolint:errcheck - registry.Register(builder) + _ = registry.Register(builder) config := ToolBuildConfig{ Features: []string{"test_feature"}, @@ -250,8 +242,7 @@ func TestToolRegistry(t *testing.T) { t.Run("BuildSingle_ValidationFailed", func(t *testing.T) { registry := NewToolRegistry() builder := NewMockToolBuilder("validation_tool", []string{"required_feature"}) - ///nolint:errcheck - registry.Register(builder) + _ = registry.Register(builder) config := ToolBuildConfig{ Features: []string{"wrong_feature"}, @@ -267,10 +258,8 @@ func TestToolRegistry(t *testing.T) { builder1 := NewMockToolBuilder("tool1", []string{"feature1"}) builder2 := NewMockToolBuilder("tool2", []string{"feature2"}) - ///nolint:errcheck - registry.Register(builder1) - ///nolint:errcheck - registry.Register(builder2) + _ = registry.Register(builder1) + _ = registry.Register(builder2) configs := map[string]ToolBuildConfig{ "tool1": {Features: []string{"feature1"}}, @@ -289,10 +278,8 @@ func TestToolRegistry(t *testing.T) { builder2.SetError(fmt.Errorf("build error")) - ///nolint:errcheck - registry.Register(builder1) - ///nolint:errcheck - registry.Register(builder2) + _ = registry.Register(builder1) + _ = registry.Register(builder2) configs := map[string]ToolBuildConfig{ "tool1": {Features: []string{"feature1"}}, @@ -310,12 +297,9 @@ func TestToolRegistry(t *testing.T) { builder2 := NewMockToolBuilder("tool2", []string{"feature2"}) builder3 := NewMockToolBuilder("tool3", []string{"feature3"}) - ///nolint:errcheck - registry.Register(builder1) - ///nolint:errcheck - registry.Register(builder2) - ///nolint:errcheck - registry.Register(builder3) + _ = registry.Register(builder1) + _ = registry.Register(builder2) + _ = registry.Register(builder3) // Only provide feature1 and feature2 tools, err := registry.BuildAllWithFeatures(false, []string{"feature1", "feature2"}) @@ -326,8 +310,7 @@ func TestToolRegistry(t *testing.T) { t.Run("Clear", func(t *testing.T) { registry := NewToolRegistry() builder := NewMockToolBuilder("clear_tool", []string{"feature"}) - ///nolint:errcheck - registry.Register(builder) + _ = registry.Register(builder) assert.Equal(t, 1, registry.Count()) @@ -338,8 +321,7 @@ func TestToolRegistry(t *testing.T) { t.Run("Unregister", func(t *testing.T) { registry := NewToolRegistry() builder := NewMockToolBuilder("unregister_tool", []string{"feature"}) - ///nolint:errcheck - registry.Register(builder) + _ = registry.Register(builder) assert.Equal(t, 1, registry.Count()) diff --git a/pkg/mcp/features.go b/pkg/mcp/features.go index cfb0686..ddc2ea1 100644 --- a/pkg/mcp/features.go +++ b/pkg/mcp/features.go @@ -14,8 +14,10 @@ package mcp +// Feature represents a named capability flag for MCP tools. type Feature string +// Feature flags used to register MCP tools. const ( FeatureAll Feature = "all" FeatureAllKafka Feature = "all-kafka" diff --git a/pkg/mcp/instructions.go b/pkg/mcp/instructions.go index 8c670d1..4cc4825 100644 --- a/pkg/mcp/instructions.go +++ b/pkg/mcp/instructions.go @@ -20,6 +20,7 @@ import ( "github.com/streamnative/streamnative-mcp-server/pkg/config" ) +// GetStreamNativeCloudServerInstructions renders instructions for StreamNative Cloud. func GetStreamNativeCloudServerInstructions(userName string, snConfig *config.SnConfig) string { contextInformation := "" if snConfig.Context.PulsarCluster != "" && snConfig.Context.PulsarInstance != "" { @@ -85,12 +86,14 @@ func GetStreamNativeCloudServerInstructions(userName string, snConfig *config.Sn Logged in as %s. %s`, userName, contextInformation) } +// GetExternalKafkaServerInstructions renders instructions for external Kafka. func GetExternalKafkaServerInstructions(bootstrapServers string) string { return fmt.Sprintf(`StreamNative Cloud MCP Server provides resources and tools for AI agents to interact with Kafka resources and services. Bootstrap servers: %s`, bootstrapServers) } +// GetExternalPulsarServerInstructions renders instructions for external Pulsar. func GetExternalPulsarServerInstructions(webServiceURL string) string { return fmt.Sprintf(`StreamNative Cloud MCP Server provides resources and tools for AI agents to interact with Pulsar resources and services. diff --git a/pkg/mcp/internal/context/ctx.go b/pkg/mcp/internal/context/ctx.go index 27d6cc2..3008d4f 100644 --- a/pkg/mcp/internal/context/ctx.go +++ b/pkg/mcp/internal/context/ctx.go @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -package context +// Package context provides internal context helpers for MCP sessions. +package context //nolint:revive import ( "context" @@ -24,6 +25,7 @@ import ( type contextKey string +// Context keys for StreamNative sessions and identifiers. const ( SNCloudOrganizationContextKey contextKey = "sncloud_organization" SNCloudInstanceContextKey contextKey = "sncloud_instance" diff --git a/pkg/mcp/kafka_admin_connect_tools.go b/pkg/mcp/kafka_admin_connect_tools.go index e85fa10..d5a31ca 100644 --- a/pkg/mcp/kafka_admin_connect_tools.go +++ b/pkg/mcp/kafka_admin_connect_tools.go @@ -22,6 +22,7 @@ import ( kafkabuilders "github.com/streamnative/streamnative-mcp-server/pkg/mcp/builders/kafka" ) +// KafkaAdminAddKafkaConnectTools registers Kafka Connect admin tools. func KafkaAdminAddKafkaConnectTools(s *server.MCPServer, readOnly bool, features []string) { // Use the new builder pattern builder := kafkabuilders.NewKafkaConnectToolBuilder() diff --git a/pkg/mcp/kafka_admin_groups_tools.go b/pkg/mcp/kafka_admin_groups_tools.go index 007622a..143fd9c 100644 --- a/pkg/mcp/kafka_admin_groups_tools.go +++ b/pkg/mcp/kafka_admin_groups_tools.go @@ -22,6 +22,7 @@ import ( kafkabuilders "github.com/streamnative/streamnative-mcp-server/pkg/mcp/builders/kafka" ) +// KafkaAdminAddGroupsTools registers Kafka admin group tools. func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []string) { // Use the new builder pattern builder := kafkabuilders.NewKafkaGroupsToolBuilder() diff --git a/pkg/mcp/kafka_admin_partitions_tools.go b/pkg/mcp/kafka_admin_partitions_tools.go index 3010e81..9d510c8 100644 --- a/pkg/mcp/kafka_admin_partitions_tools.go +++ b/pkg/mcp/kafka_admin_partitions_tools.go @@ -22,6 +22,7 @@ import ( kafkabuilders "github.com/streamnative/streamnative-mcp-server/pkg/mcp/builders/kafka" ) +// KafkaAdminAddPartitionsTools registers Kafka admin partition tools. func KafkaAdminAddPartitionsTools(s *server.MCPServer, readOnly bool, features []string) { // Use the new builder pattern builder := kafkabuilders.NewKafkaPartitionsToolBuilder() diff --git a/pkg/mcp/kafka_admin_sr_tools.go b/pkg/mcp/kafka_admin_sr_tools.go index d6590cf..c6e01d5 100644 --- a/pkg/mcp/kafka_admin_sr_tools.go +++ b/pkg/mcp/kafka_admin_sr_tools.go @@ -22,6 +22,7 @@ import ( kafkabuilders "github.com/streamnative/streamnative-mcp-server/pkg/mcp/builders/kafka" ) +// KafkaAdminAddSchemaRegistryTools registers Kafka Schema Registry tools. func KafkaAdminAddSchemaRegistryTools(s *server.MCPServer, readOnly bool, features []string) { // Use the new builder pattern builder := kafkabuilders.NewKafkaSchemaRegistryToolBuilder() diff --git a/pkg/mcp/kafka_admin_topics_tools.go b/pkg/mcp/kafka_admin_topics_tools.go index 9075245..0d5be44 100644 --- a/pkg/mcp/kafka_admin_topics_tools.go +++ b/pkg/mcp/kafka_admin_topics_tools.go @@ -22,6 +22,7 @@ import ( kafkabuilders "github.com/streamnative/streamnative-mcp-server/pkg/mcp/builders/kafka" ) +// KafkaAdminAddTopicTools registers Kafka admin topic tools. func KafkaAdminAddTopicTools(s *server.MCPServer, readOnly bool, features []string) { // Use the new builder pattern builder := kafkabuilders.NewKafkaTopicsToolBuilder() diff --git a/pkg/mcp/pftools/circuit_breaker.go b/pkg/mcp/pftools/circuit_breaker.go index a2f0e38..d64ff4c 100644 --- a/pkg/mcp/pftools/circuit_breaker.go +++ b/pkg/mcp/pftools/circuit_breaker.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package pftools provides Pulsar Functions tooling for MCP. package pftools import ( diff --git a/pkg/mcp/pftools/errors.go b/pkg/mcp/pftools/errors.go index 9c56805..62e23e6 100644 --- a/pkg/mcp/pftools/errors.go +++ b/pkg/mcp/pftools/errors.go @@ -22,8 +22,10 @@ import ( ) var ( + // ErrFunctionNotFound indicates the function was not found. ErrFunctionNotFound = errors.New("function not found") - ErrNotOurMessage = errors.New("not our message") + // ErrNotOurMessage indicates a message that should be ignored. + ErrNotOurMessage = errors.New("not our message") ) // IsClusterUnhealthy checks if an error indicates cluster health issues @@ -34,6 +36,7 @@ func IsClusterUnhealthy(err error) bool { return false } +// IsAuthError reports whether the error is an authorization error. func IsAuthError(err error) bool { if restErr, ok := err.(rest.Error); ok { return restErr.Code == 403 diff --git a/pkg/mcp/pftools/manager.go b/pkg/mcp/pftools/manager.go index 0fef260..a509c75 100644 --- a/pkg/mcp/pftools/manager.go +++ b/pkg/mcp/pftools/manager.go @@ -35,10 +35,13 @@ import ( ) const ( - CustomRuntimeOptionsEnvMcpToolNameKey = "MCP_TOOL_NAME" + // CustomRuntimeOptionsEnvMcpToolNameKey is the env var name for tool names. + CustomRuntimeOptionsEnvMcpToolNameKey = "MCP_TOOL_NAME" + // CustomRuntimeOptionsEnvMcpToolDescriptionKey is the env var name for tool descriptions. CustomRuntimeOptionsEnvMcpToolDescriptionKey = "MCP_TOOL_DESCRIPTION" ) +// DefaultStringSchemaInfo defines the default schema info for string payloads. var DefaultStringSchemaInfo = &SchemaInfo{ Type: "STRING", Definition: map[string]interface{}{ @@ -61,7 +64,7 @@ type Server struct { func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerOptions, sessionID string) (*PulsarFunctionManager, error) { // Get Pulsar client and admin client if snServer.PulsarSession == nil { - return nil, fmt.Errorf("Pulsar session not found in context") + return nil, fmt.Errorf("pulsar session not found in context") } // Get Pulsar client from session using type-safe interface diff --git a/pkg/mcp/pftools/schema.go b/pkg/mcp/pftools/schema.go index 6fe20f2..62ae155 100644 --- a/pkg/mcp/pftools/schema.go +++ b/pkg/mcp/pftools/schema.go @@ -24,6 +24,7 @@ import ( "github.com/streamnative/pulsarctl/pkg/cmdutils" ) +// DefaultStringSchema defines the default MCP input schema for string payloads. var DefaultStringSchema = &mcp.ToolInputSchema{ Type: "object", Properties: map[string]interface{}{ @@ -123,6 +124,7 @@ func convertComplexSchemaToToolInput(schemaInfo *SchemaInfo) (*mcp.ToolInputSche }, nil } +// GetPulsarTypeSchema converts SchemaInfo into a Pulsar schema. func GetPulsarTypeSchema(schemaInfo *SchemaInfo) (pulsar.Schema, error) { if schemaInfo == nil || schemaInfo.Definition == nil { return pulsar.NewStringSchema(nil), nil diff --git a/pkg/mcp/pftools/types.go b/pkg/mcp/pftools/types.go index 93c5b4f..77ad7fe 100644 --- a/pkg/mcp/pftools/types.go +++ b/pkg/mcp/pftools/types.go @@ -48,6 +48,7 @@ type PulsarFunctionManager struct { clusterErrorHandler ClusterErrorHandler } +// FunctionTool represents a Pulsar function exposed as an MCP tool. type FunctionTool struct { Name string Function *utils.FunctionConfig @@ -59,12 +60,14 @@ type FunctionTool struct { SchemaFetchSuccess bool } +// SchemaInfo represents schema metadata for Pulsar functions. type SchemaInfo struct { Type string Definition map[string]interface{} PulsarSchemaInfo *utils.SchemaInfo } +// CircuitBreaker guards function invocations to prevent repeated failures. type CircuitBreaker struct { failureCount int failureThreshold int @@ -74,16 +77,20 @@ type CircuitBreaker struct { mutex sync.RWMutex } +// CircuitState represents the circuit breaker state. type CircuitState int +// Circuit breaker states. const ( StateOpen CircuitState = iota StateHalfOpen StateClosed ) +// ClusterErrorHandler handles cluster errors for Pulsar function managers. type ClusterErrorHandler func(*PulsarFunctionManager, error) +// ManagerOptions configures PulsarFunctionManager behavior. type ManagerOptions struct { PollInterval time.Duration DefaultTimeout time.Duration @@ -94,6 +101,7 @@ type ManagerOptions struct { ClusterErrorHandler ClusterErrorHandler } +// DefaultManagerOptions returns default manager options. func DefaultManagerOptions() *ManagerOptions { return &ManagerOptions{ PollInterval: 30 * time.Second, diff --git a/pkg/mcp/prompts.go b/pkg/mcp/prompts.go index edec048..cb0b530 100644 --- a/pkg/mcp/prompts.go +++ b/pkg/mcp/prompts.go @@ -28,6 +28,7 @@ import ( "k8s.io/utils/ptr" ) +// ServerlessPoolMember describes a serverless pool option. type ServerlessPoolMember struct { Provider string Namespace string @@ -36,6 +37,7 @@ type ServerlessPoolMember struct { } var ( + // ServerlessPoolMemberList defines the supported serverless pools. ServerlessPoolMemberList = []ServerlessPoolMember{ { Provider: "azure", @@ -56,9 +58,11 @@ var ( // Location: "us-central1", // }, } + // AvailableProviders lists supported cloud providers. AvailableProviders = []string{"azure", "aws", "gcloud"} ) +// RegisterPrompts registers prompt handlers on the server. func RegisterPrompts(s *server.MCPServer) { s.AddPrompt(mcp.NewPrompt("list-sncloud-clusters", mcp.WithPromptDescription("List all clusters from the StreamNative Cloud"), @@ -78,6 +82,7 @@ func RegisterPrompts(s *server.MCPServer) { ) } +// HandleListPulsarClusters handles listing StreamNative Cloud clusters. func HandleListPulsarClusters(ctx context.Context, _ mcp.GetPromptRequest) (*mcp.GetPromptResult, error) { // Get API client from session session := context2.GetSNCloudSession(ctx) @@ -94,7 +99,7 @@ func HandleListPulsarClusters(ctx context.Context, _ mcp.GetPromptRequest) (*mcp if err != nil { return nil, fmt.Errorf("failed to list pulsar clusters: %v", err) } - defer clustersBody.Body.Close() + defer func() { _ = clustersBody.Body.Close() }() var messages = make( []mcp.PromptMessage, @@ -170,7 +175,7 @@ func handleReadPulsarCluster(ctx context.Context, request mcp.GetPromptRequest) if err != nil { return nil, fmt.Errorf("failed to list pulsar clusters: %v", err) } - defer clustersBody.Body.Close() + defer func() { _ = clustersBody.Body.Close() }() var cluster sncloud.ComGithubStreamnativeCloudApiServerPkgApisCloudV1alpha1PulsarCluster for _, c := range clusters.Items { if *c.Metadata.Name == name { @@ -248,7 +253,7 @@ func handleBuildServerlessPulsarCluster(ctx context.Context, request mcp.GetProm if err != nil { return nil, fmt.Errorf("failed to list pool options: %v", err) } - defer poolOptionsBody.Body.Close() + defer func() { _ = poolOptionsBody.Body.Close() }() if poolOptions == nil { return nil, fmt.Errorf("no pool options found") } diff --git a/pkg/mcp/pulsar_admin_packages_tools.go b/pkg/mcp/pulsar_admin_packages_tools.go index bfa929d..afc44cd 100644 --- a/pkg/mcp/pulsar_admin_packages_tools.go +++ b/pkg/mcp/pulsar_admin_packages_tools.go @@ -24,18 +24,27 @@ import ( ) const ( - HTTP = "http" - FILE = "file" + // HTTP represents the HTTP package URL scheme. + HTTP = "http" + // FILE represents the file package URL scheme. + FILE = "file" + // BUILTIN represents the builtin package scheme. BUILTIN = "builtin" + // FUNCTION represents a function package type. FUNCTION = "function" - SINK = "sink" - SOURCE = "source" + // SINK represents a sink package type. + SINK = "sink" + // SOURCE represents a source package type. + SOURCE = "source" - PublicTenant = "public" + // PublicTenant is the default public tenant name. + PublicTenant = "public" + // DefaultNamespace is the default namespace name. DefaultNamespace = "default" ) +// IsPackageURLSupported reports whether the package URL scheme is supported. func IsPackageURLSupported(functionPkgURL string) bool { return functionPkgURL != "" && (strings.HasPrefix(functionPkgURL, HTTP) || strings.HasPrefix(functionPkgURL, FILE) || diff --git a/pkg/mcp/pulsar_admin_tenant_tools.go b/pkg/mcp/pulsar_admin_tenant_tools.go index 480ca24..3ab45a4 100644 --- a/pkg/mcp/pulsar_admin_tenant_tools.go +++ b/pkg/mcp/pulsar_admin_tenant_tools.go @@ -22,6 +22,7 @@ import ( pulsarbuilders "github.com/streamnative/streamnative-mcp-server/pkg/mcp/builders/pulsar" ) +// PulsarAdminAddTenantTools registers Pulsar admin tenant tools. func PulsarAdminAddTenantTools(s *server.MCPServer, readOnly bool, features []string) { // Use the new builder pattern builder := pulsarbuilders.NewPulsarAdminTenantToolBuilder() diff --git a/pkg/mcp/pulsar_functions_as_tools.go b/pkg/mcp/pulsar_functions_as_tools.go index 4442fe7..1c76afb 100644 --- a/pkg/mcp/pulsar_functions_as_tools.go +++ b/pkg/mcp/pulsar_functions_as_tools.go @@ -31,6 +31,7 @@ var ( functionManagersLock sync.RWMutex ) +// StopAllPulsarFunctionManagers stops and removes all function managers. func StopAllPulsarFunctionManagers() { functionManagersLock.Lock() defer functionManagersLock.Unlock() @@ -48,6 +49,7 @@ func StopAllPulsarFunctionManagers() { log.Println("All Pulsar Function managers stopped") } +// PulsarFunctionManagedMcpTools registers Pulsar Functions-as-tools handlers. func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string, sessionID string) { if !slices.Contains(features, string(FeatureAll)) && !slices.Contains(features, string(FeatureFunctionsAsTools)) && diff --git a/pkg/mcp/server.go b/pkg/mcp/server.go index 21674ca..c922883 100644 --- a/pkg/mcp/server.go +++ b/pkg/mcp/server.go @@ -22,6 +22,7 @@ import ( "github.com/streamnative/streamnative-mcp-server/pkg/pulsar" ) +// Server wraps MCP server state and StreamNative sessions. type Server struct { MCPServer *server.MCPServer KafkaSession *kafka.Session @@ -30,6 +31,7 @@ type Server struct { logger *logrus.Logger } +// NewServer creates a new MCP server with StreamNative integrations. func NewServer(name, version string, logger *logrus.Logger, opts ...server.ServerOption) *Server { // Create a new MCP server opts = AddOpts(opts...) @@ -38,6 +40,7 @@ func NewServer(name, version string, logger *logrus.Logger, opts ...server.Serve return mcpserver } +// AddOpts merges default server options with custom options. func AddOpts(opts ...server.ServerOption) []server.ServerOption { defaultOpts := []server.ServerOption{ server.WithResourceCapabilities(true, true), @@ -48,6 +51,7 @@ func AddOpts(opts ...server.ServerOption) []server.ServerOption { return opts } +// CreateSNCloudMCPServer constructs a Server wrapper for StreamNative Cloud. func CreateSNCloudMCPServer(s *server.MCPServer, logger *logrus.Logger) *Server { mcpserver := &Server{ MCPServer: s, diff --git a/pkg/mcp/session/context.go b/pkg/mcp/session/context.go index 5cf895f..30659d4 100644 --- a/pkg/mcp/session/context.go +++ b/pkg/mcp/session/context.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package session provides context helpers for MCP session management. package session import ( diff --git a/pkg/mcp/sncontext_tools.go b/pkg/mcp/sncontext_tools.go index 8658cb7..e1a853a 100644 --- a/pkg/mcp/sncontext_tools.go +++ b/pkg/mcp/sncontext_tools.go @@ -26,6 +26,7 @@ import ( "github.com/streamnative/streamnative-mcp-server/pkg/common" ) +// RegisterContextTools registers context-related tools on the server. func RegisterContextTools(s *server.MCPServer, features []string, skipContextTools bool) { if !slices.Contains(features, string(FeatureStreamNativeCloud)) && !slices.Contains(features, string(FeatureAll)) { return diff --git a/pkg/mcp/sncontext_utils.go b/pkg/mcp/sncontext_utils.go index 562c0cd..6865a70 100644 --- a/pkg/mcp/sncontext_utils.go +++ b/pkg/mcp/sncontext_utils.go @@ -26,11 +26,13 @@ import ( sncloud "github.com/streamnative/streamnative-mcp-server/sdk/sdk-apiserver" ) +// DefaultKafkaPort is the default Kafka port for StreamNative Cloud. const DefaultKafkaPort = 9093 +// SetContext resolves and stores StreamNative Cloud context in memory. func SetContext(ctx context.Context, options *config.Options, instanceName, clusterName string) error { snConfig := options.LoadConfigOrDie() - myselfGrant, err := options.AuthOptions.LoadGrant(snConfig.Auth.Audience) + myselfGrant, err := options.LoadGrant(snConfig.Auth.Audience) if err != nil || myselfGrant == nil { return fmt.Errorf("failed to auth to StreamNative Cloud: %v", err) } @@ -50,7 +52,7 @@ func SetContext(ctx context.Context, options *config.Options, instanceName, clus if err != nil { return fmt.Errorf("failed to list pulsar instances: %v", err) } - defer instancesBody.Body.Close() + defer func() { _ = instancesBody.Body.Close() }() var instance sncloud.ComGithubStreamnativeCloudApiServerPkgApisCloudV1alpha1PulsarInstance foundInstance := false @@ -61,18 +63,18 @@ func SetContext(ctx context.Context, options *config.Options, instanceName, clus foundInstance = true break } - return fmt.Errorf("Pulsar instance %s is not valid", instanceName) + return fmt.Errorf("pulsar instance %s is not valid", instanceName) } } if !foundInstance { - return fmt.Errorf("Pulsar instance %s not found in organization %s", instanceName, options.Organization) + return fmt.Errorf("pulsar instance %s not found in organization %s", instanceName, options.Organization) } clusters, clustersBody, err := apiClient.CloudStreamnativeIoV1alpha1Api.ListCloudStreamnativeIoV1alpha1NamespacedPulsarCluster(ctx, options.Organization).Execute() if err != nil { return fmt.Errorf("failed to list pulsar clusters: %v", err) } - defer clustersBody.Body.Close() + defer func() { _ = clustersBody.Body.Close() }() var cluster sncloud.ComGithubStreamnativeCloudApiServerPkgApisCloudV1alpha1PulsarCluster foundCluster := false for _, c := range clusters.Items { @@ -82,11 +84,11 @@ func SetContext(ctx context.Context, options *config.Options, instanceName, clus foundCluster = true break } - return fmt.Errorf("Pulsar cluster %s is not available", clusterName) + return fmt.Errorf("pulsar cluster %s is not available", clusterName) } } if !foundCluster { - return fmt.Errorf("Pulsar cluster %s not found", clusterName) + return fmt.Errorf("pulsar cluster %s not found", clusterName) } clusterUID := string(*cluster.Metadata.Uid) @@ -111,7 +113,7 @@ func SetContext(ctx context.Context, options *config.Options, instanceName, clus accessToken := "" refreshToken := true - cachedGrant, err := options.AuthOptions.LoadGrant(tokenKey) + cachedGrant, err := options.LoadGrant(tokenKey) if err == nil && cachedGrant != nil { cacheHasValidToken, err := common.HasCachedValidToken(cachedGrant) @@ -144,7 +146,7 @@ func SetContext(ctx context.Context, options *config.Options, instanceName, clus } if newGrant.Token != nil { - _ = options.AuthOptions.SaveGrant(tokenKey, *newGrant) + _ = options.SaveGrant(tokenKey, *newGrant) accessToken = newGrant.Token.AccessToken } } diff --git a/pkg/mcp/streamnative_resources_log_tools.go b/pkg/mcp/streamnative_resources_log_tools.go index e3d2a0f..8e35b01 100644 --- a/pkg/mcp/streamnative_resources_log_tools.go +++ b/pkg/mcp/streamnative_resources_log_tools.go @@ -31,6 +31,7 @@ import ( context2 "github.com/streamnative/streamnative-mcp-server/pkg/mcp/internal/context" ) +// FunctionConnectorList lists supported log components. var FunctionConnectorList = []string{"sink", "source", "function", "kafka-connect"} // StreamNativeAddLogTools adds log tools @@ -79,6 +80,7 @@ func StreamNativeAddLogTools(s *server.MCPServer, _ bool, features []string) { s.AddTool(logTool, handleStreamNativeResourcesLog) } +// LogOptions captures parameters for StreamNative log queries. type LogOptions struct { ServiceURL string Organization string @@ -97,11 +99,13 @@ type LogOptions struct { InsecureSkipTLSVerifyBackend bool } +// LogResult represents a log query response. type LogResult struct { Total int `json:"total"` Data []LogContent `json:"data"` } +// LogContent represents a single log entry. type LogContent struct { Message string `json:"message"` Position int64 `json:"position"` @@ -241,7 +245,7 @@ func (o *LogOptions) getLogs(client *http.Client, position int64, if err != nil { return results, fmt.Errorf("failed to request logs (%s): %v", url, err) } - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() var logResult LogResult var body []byte diff --git a/pkg/mcp/streamnative_resources_tools.go b/pkg/mcp/streamnative_resources_tools.go index 8798def..6ee9b88 100644 --- a/pkg/mcp/streamnative_resources_tools.go +++ b/pkg/mcp/streamnative_resources_tools.go @@ -71,7 +71,7 @@ func StreamNativeAddResourceTools(s *server.MCPServer, readOnly bool, features [ } } -// Define simple resource structure for parsing YAML documents +// Resource represents a StreamNative resource manifest. type Resource struct { APIVersion string `json:"apiVersion" yaml:"apiVersion"` Kind string `json:"kind" yaml:"kind"` @@ -79,6 +79,7 @@ type Resource struct { Spec map[string]interface{} `json:"spec" yaml:"spec"` } +// Metadata holds standard resource metadata. type Metadata struct { Name string `json:"name" yaml:"name"` Namespace string `json:"namespace" yaml:"namespace"` @@ -190,7 +191,7 @@ func applyPulsarInstance(ctx context.Context, apiClient *sncloud.APIClient, json if name != "" { // Try to get existing resource existingInstance, bdy, err := apiClient.CloudStreamnativeIoV1alpha1Api.ReadCloudStreamnativeIoV1alpha1NamespacedPulsarInstance(ctx, name, organization).Execute() - defer bdy.Body.Close() + defer func() { _ = bdy.Body.Close() }() if err == nil { exists = true if existingInstance.Metadata != nil && existingInstance.Metadata.ResourceVersion != nil { @@ -223,7 +224,7 @@ func applyPulsarInstance(ctx context.Context, apiClient *sncloud.APIClient, json request = request.DryRun(dryRunStr) } _, bdy, err = request.Execute() - defer bdy.Body.Close() + defer func() { _ = bdy.Body.Close() }() } else { verb = "created" // Create new resource @@ -233,7 +234,7 @@ func applyPulsarInstance(ctx context.Context, apiClient *sncloud.APIClient, json request = request.DryRun(dryRunStr) } _, bdy, err = request.Execute() - defer bdy.Body.Close() + defer func() { _ = bdy.Body.Close() }() } if err != nil { @@ -277,7 +278,7 @@ func applyPulsarCluster(ctx context.Context, apiClient *sncloud.APIClient, jsonC if name != "" { // Try to get existing resource existingCluster, bdy, err := apiClient.CloudStreamnativeIoV1alpha1Api.ReadCloudStreamnativeIoV1alpha1NamespacedPulsarCluster(ctx, name, organization).Execute() - defer bdy.Body.Close() + defer func() { _ = bdy.Body.Close() }() if err == nil { exists = true if existingCluster.Metadata != nil && existingCluster.Metadata.ResourceVersion != nil { @@ -311,7 +312,7 @@ func applyPulsarCluster(ctx context.Context, apiClient *sncloud.APIClient, jsonC } _, bdy, err = request.Execute() - defer bdy.Body.Close() + defer func() { _ = bdy.Body.Close() }() } else { verb = "created" // Create new resource @@ -321,7 +322,7 @@ func applyPulsarCluster(ctx context.Context, apiClient *sncloud.APIClient, jsonC request = request.DryRun(dryRunStr) } _, bdy, err = request.Execute() - defer bdy.Body.Close() + defer func() { _ = bdy.Body.Close() }() } if err != nil { diff --git a/pkg/pulsar/connection.go b/pkg/pulsar/connection.go index 578dff7..61f96f0 100644 --- a/pkg/pulsar/connection.go +++ b/pkg/pulsar/connection.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package pulsar provides Pulsar connection helpers. package pulsar import ( @@ -25,11 +26,12 @@ import ( ) const ( + // DefaultClientTimeout is the default timeout for Pulsar client operations. DefaultClientTimeout = 30 * time.Second ) -//nolint:revive -type PulsarContext struct { +// PulsarContext holds configuration for connecting to a Pulsar cluster. +type PulsarContext struct { //nolint:revive ServiceURL string WebServiceURL string Token string @@ -67,6 +69,7 @@ func NewSession(ctx PulsarContext) (*Session, error) { return session, nil } +// SetPulsarContext initializes Pulsar clients using the provided context. func (s *Session) SetPulsarContext(ctx PulsarContext) error { s.mutex.Lock() defer s.mutex.Unlock() @@ -161,6 +164,7 @@ func (s *Session) SetPulsarContext(ctx PulsarContext) error { return nil } +// GetAdminClient returns the Pulsar admin v2 client. func (s *Session) GetAdminClient() (cmdutils.Client, error) { s.mutex.RLock() defer s.mutex.RUnlock() @@ -171,6 +175,7 @@ func (s *Session) GetAdminClient() (cmdutils.Client, error) { return s.AdminClient, nil } +// GetAdminV3Client returns the Pulsar admin v3 client. func (s *Session) GetAdminV3Client() (cmdutils.Client, error) { s.mutex.RLock() defer s.mutex.RUnlock() @@ -181,6 +186,7 @@ func (s *Session) GetAdminV3Client() (cmdutils.Client, error) { return s.AdminV3Client, nil } +// GetPulsarClient returns the Pulsar data client. func (s *Session) GetPulsarClient() (pulsar.Client, error) { s.mutex.RLock() defer s.mutex.RUnlock() diff --git a/pkg/schema/avro.go b/pkg/schema/avro.go index 5ab6530..d212607 100644 --- a/pkg/schema/avro.go +++ b/pkg/schema/avro.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package schema provides schema converters for MCP tool payloads. package schema import ( @@ -22,14 +23,17 @@ import ( "github.com/mark3labs/mcp-go/mcp" ) +// AvroConverter converts AVRO schemas to MCP tool definitions and payloads. type AvroConverter struct { BaseConverter } +// NewAvroConverter creates a new AvroConverter. func NewAvroConverter() *AvroConverter { return &AvroConverter{} } +// ToMCPToolInputSchemaProperties converts AVRO schema info into MCP tool options. func (c *AvroConverter) ToMCPToolInputSchemaProperties(schemaInfo *cliutils.SchemaInfo) ([]mcp.ToolOption, error) { if schemaInfo.Type != "AVRO" { return nil, fmt.Errorf("expected AVRO schema, got %s", schemaInfo.Type) @@ -37,6 +41,7 @@ func (c *AvroConverter) ToMCPToolInputSchemaProperties(schemaInfo *cliutils.Sche return processAvroSchemaStringToMCPToolInput(string(schemaInfo.Schema)) } +// SerializeMCPRequestToPulsarPayload serializes MCP arguments into an AVRO payload. func (c *AvroConverter) SerializeMCPRequestToPulsarPayload(arguments map[string]any, targetPulsarSchemaInfo *cliutils.SchemaInfo) ([]byte, error) { if err := c.ValidateArguments(arguments, targetPulsarSchemaInfo); err != nil { return nil, fmt.Errorf("arguments validation failed: %w", err) @@ -44,6 +49,7 @@ func (c *AvroConverter) SerializeMCPRequestToPulsarPayload(arguments map[string] return serializeArgumentsToAvroBinary(arguments, string(targetPulsarSchemaInfo.Schema)) } +// ValidateArguments validates arguments against the AVRO schema. func (c *AvroConverter) ValidateArguments(arguments map[string]any, targetPulsarSchemaInfo *cliutils.SchemaInfo) error { if targetPulsarSchemaInfo.Type != "AVRO" { return fmt.Errorf("expected AVRO schema for validation, got %s", targetPulsarSchemaInfo.Type) diff --git a/pkg/schema/avro_core.go b/pkg/schema/avro_core.go index be84a53..6d8da4d 100644 --- a/pkg/schema/avro_core.go +++ b/pkg/schema/avro_core.go @@ -63,7 +63,7 @@ func avroFieldToMcpOption(field *avro.Field) (mcp.ToolOption, error) { } isRequired := true - var underlyingTypeForDefault avro.Schema = fieldType // Used to check default value against non-union type + underlyingTypeForDefault := fieldType // Used to check default value against non-union type if unionSchema, ok := fieldType.(*avro.UnionSchema); ok { isNullAble := false diff --git a/pkg/schema/avro_test.go b/pkg/schema/avro_test.go index 58e181c..5a125a8 100644 --- a/pkg/schema/avro_test.go +++ b/pkg/schema/avro_test.go @@ -349,10 +349,11 @@ func TestAvroConverter_SerializeMCPRequestToPulsarPayload(t *testing.T) { var schemaToUse string var argsToMarshal map[string]interface{} - if tt.schemaInfo.Name == "SimpleAvroSerialize" { + switch tt.schemaInfo.Name { + case "SimpleAvroSerialize": schemaToUse = simpleRecordSchema argsToMarshal = tt.args - } else if tt.schemaInfo.Name == "ComplexAvroSerialize" { + case "ComplexAvroSerialize": schemaToUse = complexRecordSchemaString complexArgsCopy := make(map[string]interface{}) for k, v := range tt.args { diff --git a/pkg/schema/boolean.go b/pkg/schema/boolean.go index b357cab..76c15e3 100644 --- a/pkg/schema/boolean.go +++ b/pkg/schema/boolean.go @@ -36,6 +36,7 @@ func NewBooleanConverter() *BooleanConverter { } } +// ToMCPToolInputSchemaProperties converts BOOLEAN schema info into MCP tool options. func (c *BooleanConverter) ToMCPToolInputSchemaProperties(schemaInfo *cliutils.SchemaInfo) ([]mcp.ToolOption, error) { if schemaInfo.Type != "BOOLEAN" { return nil, fmt.Errorf("expected BOOLEAN schema, got %s", schemaInfo.Type) @@ -46,6 +47,7 @@ func (c *BooleanConverter) ToMCPToolInputSchemaProperties(schemaInfo *cliutils.S }, nil } +// SerializeMCPRequestToPulsarPayload serializes MCP arguments into a BOOLEAN payload. func (c *BooleanConverter) SerializeMCPRequestToPulsarPayload(arguments map[string]any, targetPulsarSchemaInfo *cliutils.SchemaInfo) ([]byte, error) { if err := c.ValidateArguments(arguments, targetPulsarSchemaInfo); err != nil { return nil, fmt.Errorf("arguments validation failed: %w", err) @@ -59,6 +61,7 @@ func (c *BooleanConverter) SerializeMCPRequestToPulsarPayload(arguments map[stri return []byte(fmt.Sprintf("%t", payload)), nil } +// ValidateArguments validates arguments against the BOOLEAN schema. func (c *BooleanConverter) ValidateArguments(arguments map[string]any, targetPulsarSchemaInfo *cliutils.SchemaInfo) error { if targetPulsarSchemaInfo.Type != "BOOLEAN" { return fmt.Errorf("expected BOOLEAN schema, got %s", targetPulsarSchemaInfo.Type) diff --git a/pkg/schema/common.go b/pkg/schema/common.go index 5bf712e..569eae5 100644 --- a/pkg/schema/common.go +++ b/pkg/schema/common.go @@ -20,7 +20,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar" ) -// GetSchemaType 返回Schema类型的字符串表示 +// GetSchemaType returns the string representation of a schema type. func GetSchemaType(schemaType pulsar.SchemaType) string { switch schemaType { case pulsar.AVRO: diff --git a/pkg/schema/converter.go b/pkg/schema/converter.go index 2c3d6b5..534aa08 100644 --- a/pkg/schema/converter.go +++ b/pkg/schema/converter.go @@ -22,9 +22,11 @@ import ( ) const ( + // ParamName is the default parameter name for payload arguments. ParamName = "payload" ) +// Converter defines schema conversion behaviors for MCP tools. type Converter interface { ToMCPToolInputSchemaProperties(pulsarSchemaInfo *cliutils.SchemaInfo) ([]mcp.ToolOption, error) @@ -33,6 +35,7 @@ type Converter interface { ValidateArguments(arguments map[string]any, targetPulsarSchemaInfo *cliutils.SchemaInfo) error } +// ConverterFactory returns a converter for the given schema type. func ConverterFactory(schemaType string) (Converter, error) { switch schemaType { case "AVRO": @@ -50,6 +53,7 @@ func ConverterFactory(schemaType string) (Converter, error) { } } +// BaseConverter provides shared fields for schema converters. type BaseConverter struct { ParamName string } diff --git a/pkg/schema/number.go b/pkg/schema/number.go index 622e474..f946d71 100644 --- a/pkg/schema/number.go +++ b/pkg/schema/number.go @@ -37,6 +37,7 @@ func NewNumberConverter() *NumberConverter { } } +// ToMCPToolInputSchemaProperties converts numeric schema info into MCP tool options. func (c *NumberConverter) ToMCPToolInputSchemaProperties(schemaInfo *cliutils.SchemaInfo) ([]mcp.ToolOption, error) { if schemaInfo.Type != "INT8" && schemaInfo.Type != "INT16" && schemaInfo.Type != "INT32" && schemaInfo.Type != "INT64" && schemaInfo.Type != "FLOAT" && schemaInfo.Type != "DOUBLE" { return nil, fmt.Errorf("expected INT8, INT16, INT32, INT64, FLOAT, or DOUBLE schema, got %s", schemaInfo.Type) @@ -47,6 +48,7 @@ func (c *NumberConverter) ToMCPToolInputSchemaProperties(schemaInfo *cliutils.Sc }, nil } +// SerializeMCPRequestToPulsarPayload serializes MCP arguments into a numeric payload. func (c *NumberConverter) SerializeMCPRequestToPulsarPayload(arguments map[string]any, targetPulsarSchemaInfo *cliutils.SchemaInfo) ([]byte, error) { if err := c.ValidateArguments(arguments, targetPulsarSchemaInfo); err != nil { return nil, fmt.Errorf("arguments validation failed: %w", err) @@ -75,6 +77,7 @@ func (c *NumberConverter) SerializeMCPRequestToPulsarPayload(arguments map[strin } } +// ValidateArguments validates arguments against the numeric schema. func (c *NumberConverter) ValidateArguments(arguments map[string]any, targetPulsarSchemaInfo *cliutils.SchemaInfo) error { if targetPulsarSchemaInfo.Type != "INT8" && targetPulsarSchemaInfo.Type != "INT16" && targetPulsarSchemaInfo.Type != "INT32" && targetPulsarSchemaInfo.Type != "INT64" && targetPulsarSchemaInfo.Type != "FLOAT" && targetPulsarSchemaInfo.Type != "DOUBLE" { return fmt.Errorf("expected INT8, INT16, INT32, INT64, FLOAT, or DOUBLE schema, got %s", targetPulsarSchemaInfo.Type) diff --git a/pkg/schema/string.go b/pkg/schema/string.go index 9a4b50a..2d45361 100644 --- a/pkg/schema/string.go +++ b/pkg/schema/string.go @@ -36,6 +36,7 @@ func NewStringConverter() *StringConverter { } } +// ToMCPToolInputSchemaProperties converts string schema info into MCP tool options. func (c *StringConverter) ToMCPToolInputSchemaProperties(schemaInfo *cliutils.SchemaInfo) ([]mcp.ToolOption, error) { if schemaInfo.Type != "STRING" && schemaInfo.Type != "BYTES" { return nil, fmt.Errorf("expected STRING or BYTES schema, got %s", schemaInfo.Type) @@ -46,6 +47,7 @@ func (c *StringConverter) ToMCPToolInputSchemaProperties(schemaInfo *cliutils.Sc }, nil } +// SerializeMCPRequestToPulsarPayload serializes MCP arguments into a string payload. func (c *StringConverter) SerializeMCPRequestToPulsarPayload(arguments map[string]any, targetPulsarSchemaInfo *cliutils.SchemaInfo) ([]byte, error) { if err := c.ValidateArguments(arguments, targetPulsarSchemaInfo); err != nil { return nil, fmt.Errorf("arguments validation failed: %w", err) @@ -59,6 +61,7 @@ func (c *StringConverter) SerializeMCPRequestToPulsarPayload(arguments map[strin return []byte(payload), nil } +// ValidateArguments validates arguments against the string schema. func (c *StringConverter) ValidateArguments(arguments map[string]any, targetPulsarSchemaInfo *cliutils.SchemaInfo) error { if targetPulsarSchemaInfo.Type != "STRING" && targetPulsarSchemaInfo.Type != "BYTES" { return fmt.Errorf("expected STRING or BYTES schema, got %s", targetPulsarSchemaInfo.Type)