fix: Handle concurrent operation of NodeUnstage with NodeStage and NodeUnpublish with NodePublish

This commit is contained in:
Harshika 2021-01-17 21:50:12 +05:30
parent d177e3a4a0
commit ad176e84a7
4 changed files with 172 additions and 1 deletions

View File

@ -65,6 +65,11 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
}
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
}
defer d.volumeLocks.Release(volumeID)
mountOptions := []string{"bind"}
if req.GetReadonly() {
mountOptions = append(mountOptions, "ro")
@ -125,6 +130,10 @@ func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublish
if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
}
defer d.volumeLocks.Release(volumeID)
klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath)
err := CleanupMountPoint(d.mounter, targetPath, false)
@ -161,6 +170,11 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%s field is missing, current context: %v", sourceField, context))
}
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
}
defer d.volumeLocks.Release(volumeID)
var username, password, domain string
for k, v := range secrets {
switch strings.ToLower(k) {
@ -236,6 +250,11 @@ func (d *Driver) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolu
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
}
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
}
defer d.volumeLocks.Release(volumeID)
klog.V(2).Infof("NodeUnstageVolume: CleanupMountPoint on %s with volume %s", stagingTargetPath, volumeID)
if err := CleanupSMBMountPoint(d.mounter, stagingTargetPath, false); err != nil {
return nil, status.Errorf(codes.Internal, "failed to unmount staging target %q: %v", stagingTargetPath, err)

View File

@ -69,8 +69,10 @@ func TestNodeStageVolume(t *testing.T) {
tests := []struct {
desc string
setup func(*Driver)
req csi.NodeStageVolumeRequest
expectedErr testutil.TestError
cleanup func(*Driver)
// use this field only when Windows
// gives flaky error messages due
@ -120,6 +122,22 @@ func TestNodeStageVolume(t *testing.T) {
WindowsError: status.Error(codes.Internal, fmt.Sprintf("Could not mount target %s: mkdir %s: The system cannot find the path specified.", smbFile, smbFile)),
},
},
{
desc: "[Error] Volume operation in progress",
setup: func(d *Driver) {
d.volumeLocks.TryAcquire("vol_1")
},
req: csi.NodeStageVolumeRequest{VolumeId: "vol_1", StagingTargetPath: sourceTest,
VolumeCapability: &stdVolCap,
VolumeContext: volContext,
Secrets: secrets},
expectedErr: testutil.TestError{
DefaultError: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")),
},
cleanup: func(d *Driver) {
d.volumeLocks.Release("vol_1")
},
},
{
desc: "[Error] Failed SMB mount mocked by MountSensitive",
req: csi.NodeStageVolumeRequest{VolumeId: "vol_1##", StagingTargetPath: errorMountSensSource,
@ -159,6 +177,9 @@ func TestNodeStageVolume(t *testing.T) {
}
d.mounter = mounter
if test.setup != nil {
test.setup(d)
}
_, err = d.NodeStageVolume(context.Background(), &test.req)
// separate assertion for flaky error messages
@ -171,6 +192,9 @@ func TestNodeStageVolume(t *testing.T) {
t.Errorf("test case: %s, \nUnexpected error: %v\nExpected error: %v", test.desc, err, test.expectedErr.GetExpectedError())
}
}
if test.cleanup != nil {
test.cleanup(d)
}
}
// Clean up
@ -230,9 +254,11 @@ func TestNodePublishVolume(t *testing.T) {
tests := []struct {
desc string
setup func(*Driver)
req csi.NodePublishVolumeRequest
skipOnWindows bool
expectedErr testutil.TestError
cleanup func(*Driver)
}{
{
desc: "[Error] Volume capabilities missing",
@ -278,6 +304,23 @@ func TestNodePublishVolume(t *testing.T) {
WindowsError: status.Errorf(codes.Internal, "Could not mount target %#v: mkdir %s: The system cannot find the path specified.", smbFile, smbFile),
},
},
{
desc: "[Error] Volume operation in progress",
setup: func(d *Driver) {
d.volumeLocks.TryAcquire("vol_1")
},
req: csi.NodePublishVolumeRequest{VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap},
VolumeId: "vol_1",
TargetPath: targetTest,
StagingTargetPath: sourceTest,
Readonly: true},
expectedErr: testutil.TestError{
DefaultError: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")),
},
cleanup: func(d *Driver) {
d.volumeLocks.Release("vol_1")
},
},
{
desc: "[Error] Mount error mocked by Mount",
req: csi.NodePublishVolumeRequest{VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap},
@ -332,10 +375,16 @@ func TestNodePublishVolume(t *testing.T) {
for _, test := range tests {
if !(test.skipOnWindows && runtime.GOOS == "windows") {
if test.setup != nil {
test.setup(d)
}
_, err := d.NodePublishVolume(context.Background(), &test.req)
if !testutil.AssertError(&test.expectedErr, err) {
t.Errorf("test case: %s, \nUnexpected error: %v\nExpected error: %v", test.desc, err, test.expectedErr.GetExpectedError())
}
if test.cleanup != nil {
test.cleanup(d)
}
}
}
@ -353,9 +402,11 @@ func TestNodeUnpublishVolume(t *testing.T) {
tests := []struct {
desc string
setup func(*Driver)
req csi.NodeUnpublishVolumeRequest
expectedErr testutil.TestError
skipOnWindows bool
cleanup func(*Driver)
}{
{
desc: "[Error] Volume ID missing",
@ -371,6 +422,19 @@ func TestNodeUnpublishVolume(t *testing.T) {
DefaultError: status.Error(codes.InvalidArgument, "Target path missing in request"),
},
},
{
desc: "[Error] Volume operation in progress",
setup: func(d *Driver) {
d.volumeLocks.TryAcquire("vol_1")
},
req: csi.NodeUnpublishVolumeRequest{TargetPath: targetFile, VolumeId: "vol_1"},
expectedErr: testutil.TestError{
DefaultError: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")),
},
cleanup: func(d *Driver) {
d.volumeLocks.Release("vol_1")
},
},
{
desc: "[Error] Unmount error mocked by IsLikelyNotMountPoint",
req: csi.NodeUnpublishVolumeRequest{TargetPath: errorTarget, VolumeId: "vol_1"},
@ -399,10 +463,16 @@ func TestNodeUnpublishVolume(t *testing.T) {
for _, test := range tests {
if !(test.skipOnWindows && runtime.GOOS == "windows") {
if test.setup != nil {
test.setup(d)
}
_, err := d.NodeUnpublishVolume(context.Background(), &test.req)
if !testutil.AssertError(&test.expectedErr, err) {
t.Errorf("test case: %s, \nUnexpected error: %v\nExpected error: %v", test.desc, err, test.expectedErr.GetExpectedError())
}
if test.cleanup != nil {
test.cleanup(d)
}
}
}
@ -418,9 +488,11 @@ func TestNodeUnstageVolume(t *testing.T) {
tests := []struct {
desc string
setup func(*Driver)
req csi.NodeUnstageVolumeRequest
skipOnWindows bool
expectedErr testutil.TestError
cleanup func(*Driver)
}{
{
desc: "[Error] Volume ID missing",
@ -436,6 +508,19 @@ func TestNodeUnstageVolume(t *testing.T) {
DefaultError: status.Error(codes.InvalidArgument, "Staging target not provided"),
},
},
{
desc: "[Error] Volume operation in progress",
setup: func(d *Driver) {
d.volumeLocks.TryAcquire("vol_1")
},
req: csi.NodeUnstageVolumeRequest{StagingTargetPath: targetFile, VolumeId: "vol_1"},
expectedErr: testutil.TestError{
DefaultError: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")),
},
cleanup: func(d *Driver) {
d.volumeLocks.Release("vol_1")
},
},
{
desc: "[Error] CleanupMountPoint error mocked by IsLikelyNotMountPoint",
req: csi.NodeUnstageVolumeRequest{StagingTargetPath: errorTarget, VolumeId: "vol_1"},
@ -462,12 +547,17 @@ func TestNodeUnstageVolume(t *testing.T) {
for _, test := range tests {
if !(test.skipOnWindows && runtime.GOOS == "windows") {
if test.setup != nil {
test.setup(d)
}
_, err := d.NodeUnstageVolume(context.Background(), &test.req)
if !testutil.AssertError(&test.expectedErr, err) {
t.Errorf("test case: %s, \nUnexpected error: %v\nExpected error: %v", test.desc, err, test.expectedErr.GetExpectedError())
}
if test.cleanup != nil {
test.cleanup(d)
}
}
}
// Clean up

View File

@ -37,6 +37,9 @@ const (
type Driver struct {
csicommon.CSIDriver
mounter *mount.SafeFormatAndMount
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
volumeLocks *volumeLocks
}
// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
@ -46,6 +49,7 @@ func NewDriver(nodeID string) *Driver {
driver.Name = DriverName
driver.Version = driverVersion
driver.NodeID = nodeID
driver.volumeLocks = newVolumeLocks()
return &driver
}

58
pkg/smb/volume_lock.go Normal file
View File

@ -0,0 +1,58 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smb
import (
"sync"
"k8s.io/apimachinery/pkg/util/sets"
)
const (
volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
)
// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
// with an ongoing operation.
type volumeLocks struct {
locks sets.String
mux sync.Mutex
}
func newVolumeLocks() *volumeLocks {
return &volumeLocks{
locks: sets.NewString(),
}
}
// TryAcquire tries to acquire the lock for operating on volumeID and returns true if successful.
// If another operation is already using volumeID, returns false.
func (vl *volumeLocks) TryAcquire(volumeID string) bool {
vl.mux.Lock()
defer vl.mux.Unlock()
if vl.locks.Has(volumeID) {
return false
}
vl.locks.Insert(volumeID)
return true
}
func (vl *volumeLocks) Release(volumeID string) {
vl.mux.Lock()
defer vl.mux.Unlock()
vl.locks.Delete(volumeID)
}