diff --git a/util/pkg/vfs/context.go b/util/pkg/vfs/context.go index 17c60b62ed..269b16f889 100644 --- a/util/pkg/vfs/context.go +++ b/util/pkg/vfs/context.go @@ -438,6 +438,23 @@ func (c *VFSContext) getGCSClient(ctx context.Context) (*storage.Service, error) return gcsClient, nil } +// getSwiftClient returns the openstack switch client, caching it for future calls +func (c *VFSContext) getSwiftClient(ctx context.Context) (*gophercloud.ServiceClient, error) { + c.mutex.Lock() + defer c.mutex.Unlock() + + if c.swiftClient != nil { + return c.swiftClient, nil + } + + swiftClient, err := NewSwiftClient(ctx) + if err != nil { + return nil, err + } + c.swiftClient = swiftClient + return swiftClient, nil +} + func (c *VFSContext) buildOpenstackSwiftPath(p string) (*SwiftPath, error) { u, err := url.Parse(p) if err != nil { @@ -453,15 +470,7 @@ func (c *VFSContext) buildOpenstackSwiftPath(p string) (*SwiftPath, error) { return nil, fmt.Errorf("invalid swift path: %q", p) } - if c.swiftClient == nil { - swiftClient, err := NewSwiftClient() - if err != nil { - return nil, err - } - c.swiftClient = swiftClient - } - - return NewSwiftPath(c.swiftClient, bucket, u.Path) + return NewSwiftPath(c, bucket, u.Path) } func (c *VFSContext) buildAzureBlobPath(p string) (*AzureBlobPath, error) { diff --git a/util/pkg/vfs/swiftfs.go b/util/pkg/vfs/swiftfs.go index 0274ccb4d0..f105018e97 100644 --- a/util/pkg/vfs/swiftfs.go +++ b/util/pkg/vfs/swiftfs.go @@ -18,6 +18,7 @@ package vfs import ( "bytes" + "context" "crypto/tls" "encoding/hex" "fmt" @@ -42,7 +43,7 @@ import ( "k8s.io/kops/util/pkg/hashing" ) -func NewSwiftClient() (*gophercloud.ServiceClient, error) { +func NewSwiftClient(ctx context.Context) (*gophercloud.ServiceClient, error) { config := OpenstackConfig{} // Check if env credentials are valid first @@ -226,10 +227,10 @@ func (oc OpenstackConfig) GetServiceConfig(name string) (gophercloud.EndpointOpt // SwiftPath is a vfs path for Openstack Cloud Storage. type SwiftPath struct { - client *gophercloud.ServiceClient - bucket string - key string - hash string + vfsContext *VFSContext + bucket string + key string + hash string } var ( @@ -253,14 +254,14 @@ var swiftWriteBackoff = wait.Backoff{ Steps: 5, } -func NewSwiftPath(client *gophercloud.ServiceClient, bucket string, key string) (*SwiftPath, error) { +func NewSwiftPath(vfsContext *VFSContext, bucket string, key string) (*SwiftPath, error) { bucket = strings.TrimSuffix(bucket, "/") key = strings.TrimPrefix(key, "/") return &SwiftPath{ - client: client, - bucket: bucket, - key: key, + vfsContext: vfsContext, + bucket: bucket, + key: key, }, nil } @@ -276,11 +277,20 @@ func (p *SwiftPath) String() string { return p.Path() } +func (p *SwiftPath) getClient(ctx context.Context) (*gophercloud.ServiceClient, error) { + return p.vfsContext.getSwiftClient(ctx) +} + func (p *SwiftPath) Remove() error { + ctx := context.TODO() + done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) { - opt := swiftobject.DeleteOpts{} - _, err := swiftobject.Delete(p.client, p.bucket, p.key, opt).Extract() + client, err := p.getClient(ctx) if err != nil { + return false, err + } + opt := swiftobject.DeleteOpts{} + if _, err := swiftobject.Delete(client, p.bucket, p.key, opt).Extract(); err != nil { if isSwiftNotFound(err) { return true, os.ErrNotExist } @@ -307,22 +317,28 @@ func (p *SwiftPath) Join(relativePath ...string) Path { args = append(args, relativePath...) joined := path.Join(args...) return &SwiftPath{ - client: p.client, - bucket: p.bucket, - key: joined, + vfsContext: p.vfsContext, + bucket: p.bucket, + key: joined, } } func (p *SwiftPath) WriteFile(data io.ReadSeeker, acl ACL) error { + ctx := context.TODO() + done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) { + client, err := p.getClient(ctx) + if err != nil { + return false, err + } + klog.V(4).Infof("Writing file %q", p) if _, err := data.Seek(0, 0); err != nil { return false, fmt.Errorf("error seeking to start of data stream for %s: %v", p, err) } createOpts := swiftobject.CreateOpts{Content: data} - _, err := swiftobject.Create(p.client, p.bucket, p.key, createOpts).Extract() - if err != nil { + if _, err := swiftobject.Create(client, p.bucket, p.key, createOpts).Extract(); err != nil { return false, fmt.Errorf("error writing %s: %v", p, err) } @@ -345,14 +361,21 @@ func (p *SwiftPath) WriteFile(data io.ReadSeeker, acl ACL) error { var createFileLockSwift sync.Mutex func (p *SwiftPath) CreateFile(data io.ReadSeeker, acl ACL) error { + ctx := context.TODO() + + client, err := p.getClient(ctx) + if err != nil { + return err + } + createFileLockSwift.Lock() defer createFileLockSwift.Unlock() // Check if exists. - _, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) { + if _, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) { klog.V(4).Infof("Getting file %q", p) - _, err := swiftobject.Get(p.client, p.bucket, p.key, swiftobject.GetOpts{}).Extract() + _, err := swiftobject.Get(client, p.bucket, p.key, swiftobject.GetOpts{}).Extract() if err == nil { return true, nil } else if isSwiftNotFound(err) { @@ -360,8 +383,7 @@ func (p *SwiftPath) CreateFile(data io.ReadSeeker, acl ACL) error { } else { return false, fmt.Errorf("error getting %s: %v", p, err) } - }) - if err == nil { + }); err == nil { return os.ErrExist } else if !os.IsNotExist(err) { return err @@ -376,14 +398,20 @@ func (p *SwiftPath) CreateFile(data io.ReadSeeker, acl ACL) error { } func (p *SwiftPath) createBucket() error { + ctx := context.TODO() + done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) { - _, err := swiftcontainer.Get(p.client, p.bucket, swiftcontainer.GetOpts{}).Extract() - if err == nil { + client, err := p.getClient(ctx) + if err != nil { + return false, err + } + + if _, err := swiftcontainer.Get(client, p.bucket, swiftcontainer.GetOpts{}).Extract(); err == nil { return true, nil } if isSwiftNotFound(err) { createOpts := swiftcontainer.CreateOpts{} - _, err = swiftcontainer.Create(p.client, p.bucket, createOpts).Extract() + _, err = swiftcontainer.Create(client, p.bucket, createOpts).Extract() return err == nil, err } return false, err @@ -426,10 +454,17 @@ func (p *SwiftPath) ReadFile() ([]byte, error) { // WriteTo implements io.WriterTo func (p *SwiftPath) WriteTo(out io.Writer) (int64, error) { + ctx := context.TODO() + klog.V(4).Infof("Reading file %q", p) + client, err := p.getClient(ctx) + if err != nil { + return 0, err + } + opt := swiftobject.DownloadOpts{} - result := swiftobject.Download(p.client, p.bucket, p.key, opt) + result := swiftobject.Download(client, p.bucket, p.key, opt) if result.Err != nil { if isSwiftNotFound(result.Err) { return 0, os.ErrNotExist @@ -442,28 +477,34 @@ func (p *SwiftPath) WriteTo(out io.Writer) (int64, error) { } func (p *SwiftPath) readPath(opt swiftobject.ListOpts) ([]Path, error) { + ctx := context.TODO() + var ret []Path done, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) { + client, err := p.getClient(ctx) + if err != nil { + return false, err + } + var paths []Path - pager := swiftobject.List(p.client, p.bucket, opt) - err := pager.EachPage(func(page pagination.Page) (bool, error) { + pager := swiftobject.List(client, p.bucket, opt) + if err := pager.EachPage(func(page pagination.Page) (bool, error) { objects, err1 := swiftobject.ExtractInfo(page) if err1 != nil { return false, err1 } for _, o := range objects { child := &SwiftPath{ - client: p.client, - bucket: p.bucket, - key: o.Name, - hash: o.Hash, + vfsContext: p.vfsContext, + bucket: p.bucket, + key: o.Name, + hash: o.Hash, } paths = append(paths, child) } return true, nil - }) - if err != nil { + }); err != nil { if isSwiftNotFound(err) { return true, os.ErrNotExist }