Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Disk struct {
Name string
Shareable bool
CapacityGiB int64
State string
}

// DiskOptions represents parameters to create an PowerVS volume.
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/powervs.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func (p *powerVSCloud) GetDiskByName(name string) (disk *Disk, err error) {
WWN: strings.ToLower(*v.Wwn),
Shareable: *v.Shareable,
CapacityGiB: int64(*v.Size),
State: *v.State,
}, nil
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/driver/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package driver

// constants of keys in PublishContext.
const (
WWNKey = "wwn"
WWNKey = "wwn"
DiskType = "diskType"
DiskName = "name"
IsShareable = "shareable"
)

// constants of keys in volume parameters.
Expand Down
134 changes: 70 additions & 64 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ package driver

import (
"context"
"fmt"
"os"
"strconv"
"strings"
"time"

csi "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
gcfg "gopkg.in/gcfg.v1"
"gopkg.in/gcfg.v1"

"k8s.io/klog/v2"

Expand Down Expand Up @@ -129,6 +132,7 @@ func newControllerService(driverOptions *Options) controllerService {

func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
klog.V(4).Infof("CreateVolume: called with args %+v", req)
start := time.Now()
volName := req.GetName()
if volName == "" {
return nil, status.Error(codes.InvalidArgument, "Volume name not provided")
Expand All @@ -152,7 +156,7 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol
if !isValidVolumeCapabilities(volCaps) {
modes := util.GetAccessModes(volCaps)
stringModes := strings.Join(*modes, ", ")
errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes [ReadWriteOnce], [ReadWriteMany], [ReadOnlyMany] supported."
errString := fmt.Sprintf("Volume capabilities %s not supported. Only AccessModes [ReadWriteOnce], [ReadWriteMany], [ReadOnlyMany] supported.", stringModes)
return nil, status.Error(codes.InvalidArgument, errString)
}

Expand All @@ -167,9 +171,8 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol
}
}

shareable := isShareableVolume(volCaps)
opts := &cloud.DiskOptions{
Shareable: shareable,
Shareable: isShareableVolume(volCaps),
CapacityBytes: volSizeBytes,
VolumeType: volumeType,
}
Expand All @@ -178,26 +181,29 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol
return handleClone(d.cloud, req, volName, volSizeBytes, opts)
}

// check if disk exists
// disk exists only if previous createVolume request fails due to any network/tcp error
diskDetails, _ := d.cloud.GetDiskByName(volName)
if diskDetails != nil {
// Check if the disk already exists
// Disk exists only if previous createVolume request fails due to any network/tcp error
disk, _ := d.cloud.GetDiskByName(volName)
if disk != nil {
// wait for volume to be available as the volume already exists
err := verifyVolumeDetails(opts, diskDetails)
klog.V(3).Infof("CreateVolume: Found an existing volume %s in %q state.", volName, disk.State)
err := verifyVolumeDetails(opts, disk)
if err != nil {
return nil, err
}
err = d.cloud.WaitForVolumeState(diskDetails.VolumeID, cloud.VolumeAvailableState)
if disk.State != cloud.VolumeAvailableState {
err = d.cloud.WaitForVolumeState(disk.VolumeID, cloud.VolumeAvailableState)
if err != nil {
return nil, status.Errorf(codes.Internal, "Disk exists, but not in required state. Current:%s Required:%s", disk.State, cloud.VolumeAvailableState)
}
}
} else {
disk, err = d.cloud.CreateDisk(volName, opts)
if err != nil {
return nil, status.Errorf(codes.Internal, "Disk already exists and not in expected state")
return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err)
}
return newCreateVolumeResponse(diskDetails, req.VolumeContentSource), nil
}

disk, err := d.cloud.CreateDisk(volName, opts)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err)
}
klog.V(3).Infof("CreateVolume: created volume %s, took %s", volName, time.Since(start))
return newCreateVolumeResponse(disk, req.VolumeContentSource), nil
}

Expand Down Expand Up @@ -229,6 +235,7 @@ func (d *controllerService) DeleteVolume(ctx context.Context, req *csi.DeleteVol

func (d *controllerService) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
klog.V(4).Infof("ControllerPublishVolume: called with args %+v", req)
start := time.Now()
volumeID := req.GetVolumeId()
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
Expand All @@ -243,7 +250,6 @@ func (d *controllerService) ControllerPublishVolume(ctx context.Context, req *cs
if nodeID == "" {
return nil, status.Error(codes.InvalidArgument, "Node ID not provided")
}

volCap := req.GetVolumeCapability()
if volCap == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
Expand All @@ -253,44 +259,42 @@ func (d *controllerService) ControllerPublishVolume(ctx context.Context, req *cs
if !isValidVolumeCapabilities(caps) {
modes := util.GetAccessModes(caps)
stringModes := strings.Join(*modes, ", ")
errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes [ReadWriteOnce], [ReadWriteMany], [ReadOnlyMany] supported."
errString := fmt.Sprintf("Volume capabilities %s not supported. Only AccessModes [ReadWriteOnce], [ReadWriteMany], [ReadOnlyMany] supported.", stringModes)
return nil, status.Error(codes.InvalidArgument, errString)
}

if _, err := d.cloud.GetPVMInstanceByID(nodeID); err != nil {
// Retrieve the details of the VM, which contains the set of disks attached too.
pvInst, err := d.cloud.GetPVMInstanceDetails(nodeID)
if err != nil {
return nil, status.Errorf(codes.NotFound, "Instance %q not found, err: %v", nodeID, err)
}

disk, err := d.cloud.GetDiskByID(volumeID)
pvInfo := map[string]string{WWNKey: req.VolumeContext[WWNKey]}

if err != nil {
if err == cloud.ErrNotFound {
return nil, status.Error(codes.NotFound, "Volume not found")
for _, vId := range pvInst.VolumeIDs {
// If the volumeID of the request matches with the ID of the disk that is attached to the VM, return success.
if vId == volumeID {
klog.V(4).Infof("ControllerPublishVolume: volume %s already attached to node %s, took %s", volumeID, nodeID, time.Since(start))
return &csi.ControllerPublishVolumeResponse{PublishContext: pvInfo}, nil
}
return nil, status.Errorf(codes.Internal, "Could not get volume with ID %q: %v", volumeID, err)
}

pvInfo := map[string]string{WWNKey: disk.WWN}

if err = d.cloud.IsAttached(volumeID, nodeID); err == nil {
klog.V(5).Infof("ControllerPublishVolume: volume %s already attached to node %s, returning success", volumeID, nodeID)
return &csi.ControllerPublishVolumeResponse{PublishContext: pvInfo}, nil
}

// When there are no associated disks, attach the created disk to the VM.
err = d.cloud.AttachDisk(volumeID, nodeID)
if err != nil {
if err == cloud.ErrAlreadyExists {
return nil, status.Error(codes.AlreadyExists, err.Error())
}
if err == cloud.ErrNotFound {
return nil, status.Errorf(codes.NotFound, "Could not attach volume %q to node %q: %v", volumeID, nodeID, err)
}
return nil, status.Errorf(codes.Internal, "Could not attach volume %q to node %q: %v", volumeID, nodeID, err)
}
klog.V(5).Infof("ControllerPublishVolume: volume %s attached to node %s", volumeID, nodeID)

klog.V(4).Infof("ControllerPublishVolume: volume %s attached to node %s, took %s", volumeID, nodeID, time.Since(start))
return &csi.ControllerPublishVolumeResponse{PublishContext: pvInfo}, nil
}

func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
klog.V(4).Infof("ControllerUnpublishVolume: called with args %+v", req)
start := time.Now()
volumeID := req.GetVolumeId()
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
Expand All @@ -306,23 +310,21 @@ func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, req *
return nil, status.Error(codes.InvalidArgument, "Node ID not provided")
}

if _, err := d.cloud.GetDiskByID(volumeID); err != nil {
if err == cloud.ErrNotFound {
klog.V(4).Info("ControllerUnpublishVolume: volume not found, returning with success")
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
}

if err := d.cloud.IsAttached(volumeID, nodeID); err != nil {
klog.V(4).Infof("ControllerUnpublishVolume: volume %s is not attached to %s, err: %v, returning with success", volumeID, nodeID, err)
return &csi.ControllerUnpublishVolumeResponse{}, nil
pvInst, err := d.cloud.GetPVMInstanceDetails(nodeID)
if err != nil {
return nil, status.Errorf(codes.NotFound, "Instance %q not found, err: %v", nodeID, err)
}

if err := d.cloud.DetachDisk(volumeID, nodeID); err != nil {
return nil, status.Errorf(codes.Internal, "Could not detach volume %q from node %q: %v", volumeID, nodeID, err)
for _, vId := range pvInst.VolumeIDs {
if vId == volumeID {
klog.V(4).Infof("ControllerUnpublishVolume: Detaching volume %s from node %s", volumeID, nodeID)
if err := d.cloud.DetachDisk(volumeID, nodeID); err != nil {
return nil, status.Errorf(codes.Internal, "Could not detach volume %q from node %q: %v", volumeID, nodeID, err)
}
}
}
klog.V(5).Infof("ControllerUnpublishVolume: volume %s detached from node %s", volumeID, nodeID)

// The volume in not associated, return success.
klog.V(4).Infof("ControllerUnpublishVolume: volume %s is detached from node %s, took %s", volumeID, nodeID, time.Since(start))
return &csi.ControllerUnpublishVolumeResponse{}, nil
}

Expand Down Expand Up @@ -461,11 +463,17 @@ func (d *controllerService) ListSnapshots(ctx context.Context, req *csi.ListSnap
}

func newCreateVolumeResponse(disk *cloud.Disk, src *csi.VolumeContentSource) *csi.CreateVolumeResponse {
volumeContext := map[string]string{
DiskType: disk.DiskType,
WWNKey: disk.WWN,
DiskName: disk.Name,
IsShareable: strconv.FormatBool(disk.Shareable),
}
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: disk.VolumeID,
CapacityBytes: util.GiBToBytes(disk.CapacityGiB),
VolumeContext: map[string]string{},
VolumeContext: volumeContext,
ContentSource: src,
},
}
Expand All @@ -488,14 +496,13 @@ func getVolSizeBytes(req *csi.CreateVolumeRequest) (int64, error) {

func verifyVolumeDetails(payload *cloud.DiskOptions, diskDetails *cloud.Disk) error {
if payload.Shareable != diskDetails.Shareable {
return status.Errorf(codes.Internal, "shareable in payload and shareable in disk details don't match")
return status.Errorf(codes.Internal, "Field mismatch for 'Shareable'. Payload:%+v, Available: %+v", payload.Shareable, diskDetails.Shareable)
}
if payload.VolumeType != diskDetails.DiskType {
return status.Errorf(codes.Internal, "TYPE in payload and disktype in disk details don't match")
return status.Errorf(codes.Internal, "Field mismatch for 'VolumeType'. Payload:%+v, Available: %+v", payload.VolumeType, diskDetails.DiskType)
}
capacityGIB := util.BytesToGiB(payload.CapacityBytes)
if capacityGIB != diskDetails.CapacityGiB {
return status.Errorf(codes.Internal, "capacityBytes in payload and capacityGIB in disk details don't match")
if util.BytesToGiB(payload.CapacityBytes) != diskDetails.CapacityGiB {
return status.Errorf(codes.Internal, "Field mismatch for 'CapacityGiB'. Payload:%+v, Available: %+v", util.BytesToGiB(payload.CapacityBytes), diskDetails.CapacityGiB)
}
return nil
}
Expand All @@ -506,35 +513,34 @@ func handleClone(cloud cloud.Cloud, req *csi.CreateVolumeRequest, volName string
case *csi.VolumeContentSource_Volume:
diskDetails, _ := cloud.GetDiskByNamePrefix("clone-" + req.GetName())
if diskDetails != nil {
err := verifyVolumeDetails(opts, diskDetails)
if err != nil {
if err := verifyVolumeDetails(opts, diskDetails); err != nil {
return nil, err
}
return newCreateVolumeResponse(diskDetails, req.VolumeContentSource), nil
}
if srcVolume := volumeSource.GetVolume(); srcVolume != nil {
var err error
srcVolumeID := srcVolume.GetVolumeId()
diskDetails, err := cloud.GetDiskByID(srcVolumeID)
diskDetails, err = cloud.GetDiskByID(srcVolumeID)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get the source volume %q: %v", srcVolumeID, err)
}
if util.GiBToBytes(diskDetails.CapacityGiB) != volSizeBytes {
return nil, status.Errorf(codes.Internal, "Cannot clone volume %v, source volume size is not equal to the clone volume", srcVolumeID)
}
err = verifyVolumeDetails(opts, diskDetails)
if err != nil {
if err = verifyVolumeDetails(opts, diskDetails); err != nil {
return nil, err
}
diskFromSourceVolume, err := cloud.CloneDisk(srcVolumeID, volName)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not clone volume %q: %v", volName, err)
}
cloneDiskDetails, err := cloud.GetDiskByID(diskFromSourceVolume.VolumeID)
diskDetails, err = cloud.GetDiskByID(diskFromSourceVolume.VolumeID)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get volume %q after clone: %v", volName, err)
}
return newCreateVolumeResponse(cloneDiskDetails, req.VolumeContentSource), nil
}
return newCreateVolumeResponse(diskDetails, req.VolumeContentSource), nil
}
return nil, status.Errorf(codes.InvalidArgument, "%v not a proper volume source", volumeSource)
}
Loading