Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/aws/aws-sdk-go v1.55.8
github.com/aws/aws-sdk-go-v2/service/sts v1.41.5
github.com/ceph/ceph-csi/api v0.0.0-00010101000000-000000000000
github.com/ceph/ceph-nvmeof/lib/go/nvmeof v0.0.0-20251105130956-2cc225b176fc
github.com/ceph/ceph-nvmeof/lib/go/nvmeof v0.0.0-20251228182201-41573a292917
github.com/ceph/go-ceph v0.37.0
github.com/container-storage-interface/spec v1.12.0
github.com/csi-addons/kubernetes-csi-addons v0.13.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
github.com/ceph/ceph-nvmeof/lib/go/nvmeof v0.0.0-20251105130956-2cc225b176fc h1:cKgQo2eMp9dFavepSIJ9Fl5iUp/7OUuRKgme4gm1nJM=
github.com/ceph/ceph-nvmeof/lib/go/nvmeof v0.0.0-20251105130956-2cc225b176fc/go.mod h1:P+rCBJEUy2Yt6vOcTh7pyh9/g+o9pOZc7kyY6k/PS5U=
github.com/ceph/ceph-nvmeof/lib/go/nvmeof v0.0.0-20251228182201-41573a292917 h1:lzt5OFV9pm0soex1mzq5ZY7IquZkOFT61uni3fxhZZo=
github.com/ceph/ceph-nvmeof/lib/go/nvmeof v0.0.0-20251228182201-41573a292917/go.mod h1:P+rCBJEUy2Yt6vOcTh7pyh9/g+o9pOZc7kyY6k/PS5U=
github.com/ceph/go-ceph v0.37.0 h1:KXliBe3ZDr3/AtfY7n9d1MG7ippYNCVhMPcAgm05CFI=
github.com/ceph/go-ceph v0.37.0/go.mod h1:3y2tOlITlyuVFhy8v6PpCEfjMwKPfXJiH0/2hKZZQRE=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
Expand Down
68 changes: 55 additions & 13 deletions internal/nvmeof/controller/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
"strconv"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -357,18 +358,27 @@ func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
params := req.GetParameters()
requiredParams := []string{
"subsystemNQN", "nvmeofGatewayAddress", "nvmeofGatewayPort",
"listeners",
}
for _, param := range requiredParams {
if params[param] == "" {
return fmt.Errorf("missing required parameter: %s", param)
}
}
// Validate listeners JSON
_, err := parseListeners(params["listeners"])
// Validate listeners JSON if provided
listeners, err := parseListeners(params["listeners"])
if err != nil {
return fmt.Errorf("invalid listeners parameter: %w", err)
}
// Validate network mask if provided
networkMask, err := parseNetworkMask(params["networkMask"])
if err != nil {
return fmt.Errorf("invalid network mask parameter: %w", err)
}
// Must have EITHER listeners OR networkMask
if len(listeners) == 0 && networkMask == "" {
return errors.New("must specify either 'listeners' or 'networkMask' in StorageClass")
}

// Validate QoS parameters - cannot mix RBD and NVMe-oF QoS
mutableParams := req.GetMutableParameters()

Expand Down Expand Up @@ -405,12 +415,15 @@ func validatePublishVolumeRequest(req *csi.ControllerPublishVolumeRequest) error
}

func parseListeners(listenersJSON string) ([]nvmeof.ListenerDetails, error) {
if listenersJSON == "" { // No "listeners" entry was provided
return []nvmeof.ListenerDetails{}, nil
}
var listeners []nvmeof.ListenerDetails
if err := json.Unmarshal([]byte(listenersJSON), &listeners); err != nil {
return nil, fmt.Errorf("failed to parse listeners JSON: %w", err)
}

if len(listeners) == 0 { // TODO: when auto listener will be implemented , make listeners optional
if len(listeners) == 0 { // At least one listener is required
return nil, errors.New("at least one listener must be specified")
}

Expand All @@ -424,6 +437,20 @@ func parseListeners(listenersJSON string) ([]nvmeof.ListenerDetails, error) {
return listeners, nil
}

// Validate network mask CIDR format.
func parseNetworkMask(networkMask string) (string, error) {
if networkMask == "" {
return "", nil
}

_, _, err := net.ParseCIDR(networkMask)
if err != nil {
return "", fmt.Errorf("invalid network mask CIDR format: %w", err)
}

return networkMask, nil
}

// parseQoSParameters extracts and parses QoS parameters from the given map.
func parseQoSParameters(params map[string]string) (*nvmeof.NVMeoFQosVolume, error) {
qos := &nvmeof.NVMeoFQosVolume{}
Expand Down Expand Up @@ -546,7 +573,8 @@ func (cs *Server) modifyNVMeoFQoS(
func ensureSubsystem(
ctx context.Context,
gateway *nvmeof.GatewayRpcClient,
subsystemNQN string,
subsystemNQN,
networkMask string,
listeners []nvmeof.ListenerDetails,
) error {
exists, err := gateway.SubsystemExists(ctx, subsystemNQN)
Expand All @@ -557,16 +585,20 @@ func ensureSubsystem(
return nil
}
// Create if doesn't exist (controller decision)
err = gateway.CreateSubsystem(ctx, subsystemNQN)
err = gateway.CreateSubsystem(ctx, subsystemNQN, networkMask)
if err != nil {
return err
}

// Create all listeners
for i, listener := range listeners {
log.DebugLog(ctx, "Creating listener %d: %s", i, listener.String())
if err := gateway.CreateListener(ctx, subsystemNQN, listener); err != nil {
return fmt.Errorf("failed to create listener %d (%s): %w", i, listener.String(), err)
// if networkMask is not provided, listeners are not created automatically by gateway,
// should create them manually one by one.
if networkMask == "" {
// Create all listeners
for i, listener := range listeners {
log.DebugLog(ctx, "Creating listener %d: %s", i, listener.String())
if err := gateway.CreateListener(ctx, subsystemNQN, listener); err != nil {
return fmt.Errorf("failed to create listener %d (%s): %w", i, listener.String(), err)
}
}
}

Expand Down Expand Up @@ -638,7 +670,7 @@ func (cs *Server) createNVMeoFResources(
if err != nil {
return nil, fmt.Errorf("failed to parse listeners: %w", err)
}

networkMask := params["networkMask"]
nvmeofGatewayPortStr := params["nvmeofGatewayPort"]
nvmeofGatewayPort, err := strconv.ParseUint(nvmeofGatewayPortStr, 10, 32)
if err != nil {
Expand Down Expand Up @@ -689,7 +721,7 @@ func (cs *Server) createNVMeoFResources(
defer cs.subsystemLocks.Release(nvmeofData.SubsystemNQN)

// Step 3: Ensure subsystem exists (and listener)
if err := ensureSubsystem(ctx, gateway, nvmeofData.SubsystemNQN, nvmeofData.ListenerInfo); err != nil {
if err := ensureSubsystem(ctx, gateway, nvmeofData.SubsystemNQN, networkMask, nvmeofData.ListenerInfo); err != nil {
return nil, fmt.Errorf("subsystem setup failed: %w", err)
}

Expand All @@ -713,6 +745,16 @@ func (cs *Server) createNVMeoFResources(
}
}

// Step 6: If using auto-listeners, query them back for storing in metadata
if networkMask != "" {
autoListeners, err := gateway.ListListeners(ctx, nvmeofData.SubsystemNQN)
if err != nil {
return nil, fmt.Errorf("failed to list auto-created listeners: %w", err)
}
nvmeofData.ListenerInfo = nvmeof.ConvertListenersFromProto(autoListeners.GetListeners())
log.DebugLog(ctx, "Retrieved %d auto-created listeners", len(nvmeofData.ListenerInfo))
}

uuid, err := gateway.GetUUIDBySubsystemAndNameSpaceID(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID)
if err != nil {
return nil, fmt.Errorf("get namespace uuid failed: %w", err)
Expand Down
43 changes: 41 additions & 2 deletions internal/nvmeof/nvmeof.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (gw *GatewayRpcClient) GetUUIDBySubsystemAndNameSpaceID(
}

// CreateSubsystem creates an NVMe-oF subsystem on the gateway.
func (gw *GatewayRpcClient) CreateSubsystem(ctx context.Context, subsystemNQN string) error {
func (gw *GatewayRpcClient) CreateSubsystem(ctx context.Context, subsystemNQN, networkMask string) error {
log.DebugLog(ctx, "Creating NVMe subsystem: %s on gateway %s",
subsystemNQN, gw.config)

Expand All @@ -257,7 +257,9 @@ func (gw *GatewayRpcClient) CreateSubsystem(ctx context.Context, subsystemNQN st
// DhchapKey: nil, // No authentication
// KeyEncrypted: nil, // No encryption
}

if networkMask != "" {
req.NetworkMask = &networkMask
}
status, err := gw.client.CreateSubsystem(ctx, req)
switch {
case err != nil:
Expand Down Expand Up @@ -478,6 +480,43 @@ func (gw *GatewayRpcClient) ListNamespaces(ctx context.Context, subsystemNQN str
return resp, nil
}

// List listeners in a subsystem.
func (gw *GatewayRpcClient) ListListeners(ctx context.Context, subsystemNQN string) (*pb.ListenersInfo, error) {
log.DebugLog(ctx, "Listing listeners in subsystem %s", subsystemNQN)

req := &pb.ListListenersReq{
Subsystem: subsystemNQN,
}

resp, err := gw.client.ListListeners(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to list listeners in subsystem %s: %w", subsystemNQN, err)
}
if resp.GetStatus() != 0 {
return nil, fmt.Errorf("gateway ListListeners returned error: %s", resp.GetErrorMessage())
}

log.DebugLog(ctx, "Listed listeners in subsystem %s successfully", subsystemNQN)

return resp, nil
}

// ConvertListenersFromProto converts protobuf ListenerInfo to internal ListenerDetails format.
func ConvertListenersFromProto(protoListeners []*pb.ListenerInfo) []ListenerDetails {
listeners := make([]ListenerDetails, 0, len(protoListeners))
for _, l := range protoListeners {
listeners = append(listeners, ListenerDetails{
GatewayAddress: GatewayAddress{
Address: l.GetTraddr(),
Port: l.GetTrsvcid(),
},
Hostname: l.GetHostName(),
})
}

return listeners
}

// Connect to Gateway gRPC server.
func (c *GatewayRpcClient) connect() error {
// Create connection using new gRPC API
Expand Down
3 changes: 2 additions & 1 deletion internal/nvmeof/tests/nvmeof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func TestRealGateway(t *testing.T) {
})

// Test create subsystem
err = client.CreateSubsystem(ctx, nvmeofData.SubsystemNQN)
networkMask := "" // No auto-listeners
err = client.CreateSubsystem(ctx, nvmeofData.SubsystemNQN, networkMask)
require.NoError(t, err)
t.Logf("✓ Subsystem created: %s", nvmeofData.SubsystemNQN)

Expand Down
Loading
Loading