create subDir in CreateVolume and delete in DeleteVolume

This commit is contained in:
Manohar Reddy 2021-05-07 16:57:12 +00:00
parent 7104ad2c9b
commit d020b78056
15 changed files with 446 additions and 77 deletions

View File

@ -15,6 +15,7 @@ spec:
{{ include "smb.labels" . | indent 6 }}
app: csi-smb-controller
spec:
dnsPolicy: ClusterFirstWithHostNet
serviceAccountName: csi-smb-controller-sa
nodeSelector:
kubernetes.io/os: linux
@ -96,6 +97,8 @@ spec:
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
securityContext:
privileged: true
volumeMounts:
- mountPath: /csi
name: socket-dir

View File

@ -37,6 +37,9 @@ rules:
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get"]
---
kind: ClusterRoleBinding

View File

@ -14,6 +14,7 @@ spec:
labels:
app: csi-smb-controller
spec:
dnsPolicy: ClusterFirstWithHostNet
serviceAccountName: csi-smb-controller-sa
nodeSelector:
kubernetes.io/os: linux
@ -89,6 +90,8 @@ spec:
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
securityContext:
privileged: true
volumeMounts:
- mountPath: /csi
name: socket-dir

View File

@ -12,7 +12,9 @@ parameters:
source: "//smb-server.default.svc.cluster.local/share"
csi.storage.k8s.io/node-stage-secret-name: "smbcreds"
csi.storage.k8s.io/node-stage-secret-namespace: "default"
createSubDir: "false" # optional: create a sub dir for new volume
csi.storage.k8s.io/provisioner-secret-name: "smbcreds"
csi.storage.k8s.io/provisioner-secret-namespace: "default"
createSubDir: "true" # optional: create a sub dir for new volume
reclaimPolicy: Retain # only retain is supported
volumeBindingMode: Immediate
mountOptions:

View File

@ -33,6 +33,9 @@ rules:
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get"]
---
kind: ClusterRoleBinding

2
go.mod
View File

@ -25,7 +25,7 @@ require (
k8s.io/client-go v0.21.0
k8s.io/klog/v2 v2.8.0
k8s.io/kubernetes v1.21.0
k8s.io/mount-utils v0.0.0
k8s.io/mount-utils v0.21.1
k8s.io/utils v0.0.0-20201110183641-67b214c5f920
sigs.k8s.io/yaml v1.2.0
)

View File

@ -18,32 +18,132 @@ package smb
import (
"context"
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
)
// smbVolume is an internal representation of a volume
// created by the provisioner.
type smbVolume struct {
// Volume id
id string
// Address of the SMB server.
sourceField string
// Subdirectory of the SMB server to create volumes under
subDir string
// size of volume
size int64
}
// Ordering of elements in the CSI volume id.
// ID is of the form {server}/{subDir}.
const (
idsourceField = iota
idSubDir
totalIDElements // Always last
)
// CreateVolume only supports static provisioning, no create volume action
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
name := req.GetName()
if len(name) == 0 {
return nil, status.Error(codes.InvalidArgument, "CreateVolume name must be provided")
}
var volCap *csi.VolumeCapability
volumeCapabilities := req.GetVolumeCapabilities()
if len(volumeCapabilities) == 0 {
return nil, status.Error(codes.InvalidArgument, "CreateVolume Volume capabilities must be provided")
}
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: req.GetName(),
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: req.GetParameters(),
},
}, nil
if len(volumeCapabilities) > 0 {
volCap = req.GetVolumeCapabilities()[0]
}
reqCapacity := req.GetCapacityRange().GetRequiredBytes()
smbVol, err := d.newSMBVolume(name, reqCapacity, req.GetParameters())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
// check if create SubDir is enable in storage class parameters
parameters := req.GetParameters()
var createSubDir string
for k, v := range parameters {
switch strings.ToLower(k) {
case createSubDirField:
createSubDir = v
}
}
secrets := req.GetSecrets()
if strings.EqualFold(createSubDir, "true") {
if len(secrets) > 0 {
// Mount smb base share so we can create a subdirectory
if err := d.internalMount(ctx, smbVol, volCap, secrets); err != nil {
return nil, status.Errorf(codes.Internal, "failed to mount smb server: %v", err.Error())
}
defer func() {
if err = d.internalUnmount(ctx, smbVol); err != nil {
klog.Warningf("failed to unmount smb server: %v", err.Error())
}
}()
// Create subdirectory under base-dir
// TODO: revisit permissions
internalVolumePath := d.getInternalVolumePath(smbVol)
if err = os.Mkdir(internalVolumePath, 0777); err != nil && !os.IsExist(err) {
return nil, status.Errorf(codes.Internal, "failed to make subdirectory: %v", err.Error())
}
parameters[sourceField] = parameters[sourceField] + "/" + smbVol.subDir
} else {
klog.Warningf("CreateVolume: Volume secrets should be provided when createSubDir is true")
}
}
return &csi.CreateVolumeResponse{Volume: d.smbVolToCSI(smbVol, parameters)}, nil
}
// DeleteVolume only supports static provisioning, no delete volume action
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
volumeID := req.GetVolumeId()
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "volume id is empty")
}
smbVol, err := d.getSmbVolFromID(volumeID)
if err != nil {
// An invalid ID should be treated as doesn't exist
klog.Warningf("failed to get smb volume for volume id %v deletion: %v", volumeID, err)
return &csi.DeleteVolumeResponse{}, nil
}
secrets := req.GetSecrets()
if len(secrets) > 0 {
// Mount smb base share so we can delete the subdirectory
if err = d.internalMount(ctx, smbVol, nil, secrets); err != nil {
return nil, status.Errorf(codes.Internal, "failed to mount smb server: %v", err.Error())
}
defer func() {
if err = d.internalUnmount(ctx, smbVol); err != nil {
klog.Warningf("failed to unmount smb server: %v", err.Error())
}
}()
// Delete subdirectory under base-dir
internalVolumePath := d.getInternalVolumePath(smbVol)
klog.V(2).Infof("Removing subdirectory at %v", internalVolumePath)
if err = os.RemoveAll(internalVolumePath); err != nil {
return nil, status.Errorf(codes.Internal, "failed to delete subdirectory: %v", err.Error())
}
} else {
klog.Warningf("DeleteVolume: Volume secrets should be provided")
}
return &csi.DeleteVolumeResponse{}, nil
}
@ -105,3 +205,119 @@ func (d *Driver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequ
func (d *Driver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// Given a smbVolume, return a CSI volume id
func (d *Driver) getVolumeIDFromSmbVol(vol *smbVolume) string {
idElements := make([]string, totalIDElements)
idElements[idsourceField] = strings.Trim(vol.sourceField, "/")
idElements[idSubDir] = strings.Trim(vol.subDir, "/")
return strings.Join(idElements, "/")
}
// Get working directory for CreateVolume and DeleteVolume
func (d *Driver) getInternalMountPath(vol *smbVolume) string {
// use default if empty
if d.workingMountDir == "" {
d.workingMountDir = "/tmp"
}
return filepath.Join(d.workingMountDir, vol.subDir)
}
// Mount smb server at base-dir
func (d *Driver) internalMount(ctx context.Context, vol *smbVolume, volCap *csi.VolumeCapability, secrets map[string]string) error {
stagingPath := d.getInternalMountPath(vol)
if volCap == nil {
volCap = &csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
}
}
klog.V(4).Infof("internally mounting %v at %v", sourceField, stagingPath)
_, err := d.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
StagingTargetPath: stagingPath,
VolumeContext: map[string]string{
sourceField: vol.sourceField,
},
VolumeCapability: volCap,
VolumeId: vol.id,
Secrets: secrets,
})
return err
}
// Unmount smb server at base-dir
func (d *Driver) internalUnmount(ctx context.Context, vol *smbVolume) error {
targetPath := d.getInternalMountPath(vol)
// Unmount smb server at base-dir
klog.V(4).Infof("internally unmounting %v", targetPath)
_, err := d.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{
VolumeId: vol.id,
StagingTargetPath: d.getInternalMountPath(vol),
})
return err
}
// Convert VolumeCreate parameters to an smbVolume
func (d *Driver) newSMBVolume(name string, size int64, params map[string]string) (*smbVolume, error) {
var sourceField string
// Validate parameters (case-insensitive).
for k, v := range params {
switch strings.ToLower(k) {
case paramSource:
sourceField = v
}
}
// Validate required parameters
if sourceField == "" {
return nil, fmt.Errorf("%v is a required parameter", paramSource)
}
vol := &smbVolume{
sourceField: sourceField,
subDir: name,
size: size,
}
vol.id = d.getVolumeIDFromSmbVol(vol)
return vol, nil
}
// Get internal path where the volume is created
// The reason why the internal path is "workingDir/subDir/subDir" is because:
// * the semantic is actually "workingDir/volId/subDir" and volId == subDir.
// * we need a mount directory per volId because you can have multiple
// CreateVolume calls in parallel and they may use the same underlying share.
// Instead of refcounting how many CreateVolume calls are using the same
// share, it's simpler to just do a mount per request.
func (d *Driver) getInternalVolumePath(vol *smbVolume) string {
return filepath.Join(d.getInternalMountPath(vol), vol.subDir)
}
// Convert into smbVolume into a csi.Volume
func (d *Driver) smbVolToCSI(vol *smbVolume, parameters map[string]string) *csi.Volume {
return &csi.Volume{
CapacityBytes: 0, // by setting it to zero, Provisioner will use PVC requested size as PV size
VolumeId: vol.id,
VolumeContext: parameters,
}
}
// Given a CSI volume id, return a smbVolume
func (d *Driver) getSmbVolFromID(id string) (*smbVolume, error) {
volRegex := regexp.MustCompile("^([^/]+)/([^/]+)$")
tokens := volRegex.FindStringSubmatch(id)
if tokens == nil {
return nil, fmt.Errorf("Could not split %q into server, baseDir and subDir", id)
}
return &smbVolume{
id: id,
sourceField: tokens[1],
subDir: tokens[2],
}, nil
}

View File

@ -18,15 +18,26 @@ package smb
import (
"context"
"fmt"
"os"
"path/filepath"
"reflect"
"runtime"
"testing"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-driver-smb/test/utils/testutil"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
testServer = "test-server"
testCSIVolume = "test-csi"
testVolumeID = "test-server/test-csi"
)
func TestControllerGetCapabilities(t *testing.T) {
d := NewFakeDriver()
controlCap := []*csi.ControllerServiceCapability{
@ -44,69 +55,216 @@ func TestControllerGetCapabilities(t *testing.T) {
func TestCreateVolume(t *testing.T) {
d := NewFakeDriver()
stdVolCap := []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
},
}
tests := []struct {
desc string
req csi.CreateVolumeRequest
expectedErr error
// Setup workingMountDir
workingMountDir, err := os.Getwd()
if err != nil {
t.Errorf("failed to get current working directory")
}
d.workingMountDir = workingMountDir
// Setup mounter
mounter, err := NewFakeMounter()
if err != nil {
t.Fatalf(fmt.Sprintf("failed to get fake mounter: %v", err))
}
d.mounter = mounter
sourceTest := testutil.GetWorkDirPath("test-csi", t)
cases := []struct {
name string
req *csi.CreateVolumeRequest
resp *csi.CreateVolumeResponse
flakyWindowsErrorMessage string
expectErr bool
}{
{
desc: "Volume capabilities missing",
req: csi.CreateVolumeRequest{},
expectedErr: status.Error(codes.InvalidArgument, "CreateVolume Volume capabilities must be provided"),
name: "valid defaults",
req: &csi.CreateVolumeRequest{
Name: testCSIVolume,
VolumeCapabilities: []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
},
},
Parameters: map[string]string{
paramSource: testServer,
createSubDirField: "true",
},
Secrets: map[string]string{
usernameField: "test",
passwordField: "test",
domainField: "test_doamin",
},
},
resp: &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: testVolumeID,
VolumeContext: map[string]string{
paramSource: filepath.Join(testServer, testCSIVolume),
createSubDirField: "true",
},
},
},
flakyWindowsErrorMessage: fmt.Sprintf("volume(vol_1##) mount \"test-server\" on %#v failed with "+
"smb mapping failed with error: rpc error: code = Unknown desc = NewSmbGlobalMapping failed.",
sourceTest),
},
{
desc: "Valid request",
req: csi.CreateVolumeRequest{
VolumeCapabilities: stdVolCap,
name: "name empty",
req: &csi.CreateVolumeRequest{
VolumeCapabilities: []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
},
},
Parameters: map[string]string{
paramSource: testServer,
},
},
expectedErr: nil,
expectErr: true,
},
{
name: "invalid create context",
req: &csi.CreateVolumeRequest{
Name: testCSIVolume,
VolumeCapabilities: []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
},
},
Parameters: map[string]string{
"unknown-parameter": "foo",
},
},
expectErr: true,
},
}
for _, test := range tests {
_, err := d.CreateVolume(context.Background(), &test.req)
if !reflect.DeepEqual(err, test.expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
for _, test := range cases {
test := test //pin
t.Run(test.name, func(t *testing.T) {
// Setup
_ = os.MkdirAll(filepath.Join(d.workingMountDir, testCSIVolume), os.ModePerm)
// Run
resp, err := d.CreateVolume(context.TODO(), test.req)
// Verify
if test.expectErr && err == nil {
t.Errorf("test %q failed; got success", test.name)
}
// separate assertion for flaky error messages
if test.flakyWindowsErrorMessage != "" && runtime.GOOS == "windows" {
fmt.Println("Skipping checks on Windows ENV")
} else {
if !test.expectErr && err != nil {
t.Errorf("test %q failed: %v", test.name, err)
}
if !reflect.DeepEqual(resp, test.resp) {
t.Errorf("test %q failed: got resp %+v, expected %+v", test.name, resp, test.resp)
}
if !test.expectErr {
info, err := os.Stat(filepath.Join(d.workingMountDir, test.req.Name, test.req.Name))
if err != nil {
t.Errorf("test %q failed: couldn't find volume subdirectory: %v", test.name, err)
}
if !info.IsDir() {
t.Errorf("test %q failed: subfile not a directory", test.name)
}
}
}
})
}
}
func TestDeleteVolume(t *testing.T) {
d := NewFakeDriver()
tests := []struct {
// Setup workingMountDir
workingMountDir, err := os.Getwd()
if err != nil {
t.Errorf("failed to get current working directory")
}
d.workingMountDir = workingMountDir
// Setup mounter
mounter, err := NewFakeMounter()
if err != nil {
t.Fatalf(fmt.Sprintf("failed to get fake mounter: %v", err))
}
d.mounter = mounter
cases := []struct {
desc string
req csi.DeleteVolumeRequest
req *csi.DeleteVolumeRequest
resp *csi.DeleteVolumeResponse
expectedErr error
}{
{
desc: "Volume ID missing",
req: csi.DeleteVolumeRequest{},
req: &csi.DeleteVolumeRequest{},
resp: nil,
expectedErr: status.Error(codes.InvalidArgument, "Volume ID missing in request"),
},
{
desc: "Valid request",
req: csi.DeleteVolumeRequest{VolumeId: "vol_1"},
desc: "Valid request",
req: &csi.DeleteVolumeRequest{
VolumeId: testVolumeID,
Secrets: map[string]string{
usernameField: "test",
passwordField: "test",
domainField: "test_doamin",
},
},
resp: &csi.DeleteVolumeResponse{},
expectedErr: nil,
},
}
for _, test := range tests {
_, err := d.DeleteVolume(context.Background(), &test.req)
if !reflect.DeepEqual(err, test.expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
for _, test := range cases {
test := test //pin
t.Run(test.desc, func(t *testing.T) {
// Setup
_ = os.MkdirAll(filepath.Join(d.workingMountDir, testCSIVolume), os.ModePerm)
_, _ = os.Create(filepath.Join(d.workingMountDir, testCSIVolume, testCSIVolume))
// Run
resp, err := d.DeleteVolume(context.TODO(), test.req)
// Verify
if runtime.GOOS == "windows" {
// skip checks
fmt.Println("Skipping checks on Windows ENV")
} else {
if test.expectedErr == nil && err != nil {
t.Errorf("test %q failed: %v", test.desc, err)
}
if test.expectedErr != nil && err == nil {
t.Errorf("test %q failed; expected error %v, got success", test.desc, test.expectedErr)
}
if !reflect.DeepEqual(resp, test.resp) {
t.Errorf("test %q failed: got resp %+v, expected %+v", test.desc, resp, test.resp)
}
if _, err := os.Stat(filepath.Join(d.workingMountDir, testCSIVolume, testCSIVolume)); test.expectedErr == nil && !os.IsNotExist(err) {
t.Errorf("test %q failed: expected volume subdirectory deleted, it still exists", test.desc)
}
}
})
}
}

View File

@ -20,7 +20,6 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strings"
"time"
@ -83,27 +82,6 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
return nil, fmt.Errorf("prepare publish failed for %s with error: %v", target, err)
}
context := req.GetVolumeContext()
var createSubDir string
for k, v := range context {
switch strings.ToLower(k) {
case createSubDirField:
createSubDir = v
}
}
if strings.EqualFold(createSubDir, "true") {
source = filepath.Join(source, req.GetVolumeId())
klog.V(2).Infof("NodePublishVolume: createSubDir(%s) MkdirAll(%s) volumeID(%s)", createSubDir, source, volumeID)
if err := Mkdir(d.mounter, source, 0750); err != nil {
if os.IsExist(err) {
klog.Warningf("Mkdir(%s) failed with error: %v", source, err)
} else {
return nil, status.Errorf(codes.Internal, "Mkdir(%s) failed with error: %v", source, err)
}
}
}
klog.V(2).Infof("NodePublishVolume: mounting %s at %s with mountOptions: %v volumeID(%s)", source, target, mountOptions, volumeID)
if err := d.mounter.Mount(source, target, "", mountOptions); err != nil {
if removeErr := os.Remove(target); removeErr != nil {

View File

@ -31,6 +31,7 @@ import (
const (
DefaultDriverName = "smb.csi.k8s.io"
createSubDirField = "createsubdir"
paramSource = "source"
)
// Driver implements all interfaces of CSI drivers
@ -39,7 +40,8 @@ type Driver struct {
mounter *mount.SafeFormatAndMount
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
volumeLocks *volumeLocks
volumeLocks *volumeLocks
workingMountDir string
}
// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &

View File

@ -20,11 +20,10 @@ package smb
import (
"fmt"
"os"
"github.com/kubernetes-csi/csi-driver-smb/pkg/mounter"
"k8s.io/klog/v2"
mount "k8s.io/mount-utils"
"os"
)
func Mount(m *mount.SafeFormatAndMount, source, target, fsType string, mountOptions, sensitiveMountOptions []string) error {

View File

@ -55,8 +55,10 @@ var (
}
storageClassCreateSubDir = map[string]string{
"source": "//smb-server.default.svc.cluster.local/share",
"csi.storage.k8s.io/node-stage-secret-name": "smbcreds",
"csi.storage.k8s.io/node-stage-secret-namespace": "default",
"csi.storage.k8s.io/node-stage-secret-name": "smbcreds",
"csi.storage.k8s.io/node-stage-secret-namespace": "default",
"csi.storage.k8s.io/provisioner-secret-name": "smbcreds",
"csi.storage.k8s.io/provisioner-secret-namespace": "default",
"createSubDir": "true",
}
)

View File

@ -76,7 +76,7 @@ echo 'Mount volume test:'
sleep 2
echo "node stats test:"
csc node stats --endpoint "$endpoint" "$volumeid:$target_path:$staging_target_path"
"$CSC_BIN" node stats --endpoint "$endpoint" "$volumeid:$target_path:$staging_target_path"
sleep 2
echo 'Unmount volume test:'

2
vendor/modules.txt vendored
View File

@ -779,7 +779,7 @@ k8s.io/kubernetes/test/e2e/storage/podlogs
k8s.io/kubernetes/test/e2e/storage/utils
k8s.io/kubernetes/test/utils
k8s.io/kubernetes/test/utils/image
# k8s.io/mount-utils v0.0.0 => k8s.io/mount-utils v0.21.1
# k8s.io/mount-utils v0.21.1 => k8s.io/mount-utils v0.21.1
## explicit
k8s.io/mount-utils
# k8s.io/utils v0.0.0-20201110183641-67b214c5f920 => k8s.io/utils v0.0.0-20201110183641-67b214c5f920