From 6406bfe15008ec5b0f2dd1de14d5130c2e4398ff Mon Sep 17 00:00:00 2001 From: justinsb Date: Wed, 21 Dec 2022 10:07:03 -0500 Subject: [PATCH] VFS: Add context to internals of Openstack Swift client Setting up context usage internally ready for the context to be exposed. Also avoid initializing the client until the first usage, making building VFS path more of a builder function (deterministic, not expected to fail for good inputs). --- util/pkg/vfs/context.go | 27 +++++++---- util/pkg/vfs/swiftfs.go | 105 ++++++++++++++++++++++++++++------------ 2 files changed, 91 insertions(+), 41 deletions(-) diff --git a/util/pkg/vfs/context.go b/util/pkg/vfs/context.go index 17c60b62ed..269b16f889 100644 --- a/util/pkg/vfs/context.go +++ b/util/pkg/vfs/context.go @@ -438,6 +438,23 @@ func (c *VFSContext) getGCSClient(ctx context.Context) (*storage.Service, error) return gcsClient, nil } +// getSwiftClient returns the openstack switch client, caching it for future calls +func (c *VFSContext) getSwiftClient(ctx context.Context) (*gophercloud.ServiceClient, error) { + c.mutex.Lock() + defer c.mutex.Unlock() + + if c.swiftClient != nil { + return c.swiftClient, nil + } + + swiftClient, err := NewSwiftClient(ctx) + if err != nil { + return nil, err + } + c.swiftClient = swiftClient + return swiftClient, nil +} + func (c *VFSContext) buildOpenstackSwiftPath(p string) (*SwiftPath, error) { u, err := url.Parse(p) if err != nil { @@ -453,15 +470,7 @@ func (c *VFSContext) buildOpenstackSwiftPath(p string) (*SwiftPath, error) { return nil, fmt.Errorf("invalid swift path: %q", p) } - if c.swiftClient == nil { - swiftClient, err := NewSwiftClient() - if err != nil { - return nil, err - } - c.swiftClient = swiftClient - } - - return NewSwiftPath(c.swiftClient, bucket, u.Path) + return NewSwiftPath(c, bucket, u.Path) } func (c *VFSContext) buildAzureBlobPath(p string) (*AzureBlobPath, error) { diff --git a/util/pkg/vfs/swiftfs.go b/util/pkg/vfs/swiftfs.go index 0274ccb4d0..f105018e97 100644 --- a/util/pkg/vfs/swiftfs.go +++ b/util/pkg/vfs/swiftfs.go @@ -18,6 +18,7 @@ package vfs import ( "bytes" + "context" "crypto/tls" "encoding/hex" "fmt" @@ -42,7 +43,7 @@ import ( "k8s.io/kops/util/pkg/hashing" ) -func NewSwiftClient() (*gophercloud.ServiceClient, error) { +func NewSwiftClient(ctx context.Context) (*gophercloud.ServiceClient, error) { config := OpenstackConfig{} // Check if env credentials are valid first @@ -226,10 +227,10 @@ func (oc OpenstackConfig) GetServiceConfig(name string) (gophercloud.EndpointOpt // SwiftPath is a vfs path for Openstack Cloud Storage. type SwiftPath struct { - client *gophercloud.ServiceClient - bucket string - key string - hash string + vfsContext *VFSContext + bucket string + key string + hash string } var ( @@ -253,14 +254,14 @@ var swiftWriteBackoff = wait.Backoff{ Steps: 5, } -func NewSwiftPath(client *gophercloud.ServiceClient, bucket string, key string) (*SwiftPath, error) { +func NewSwiftPath(vfsContext *VFSContext, bucket string, key string) (*SwiftPath, error) { bucket = strings.TrimSuffix(bucket, "/") key = strings.TrimPrefix(key, "/") return &SwiftPath{ - client: client, - bucket: bucket, - key: key, + vfsContext: vfsContext, + bucket: bucket, + key: key, }, nil } @@ -276,11 +277,20 @@ func (p *SwiftPath) String() string { return p.Path() } +func (p *SwiftPath) getClient(ctx context.Context) (*gophercloud.ServiceClient, error) { + return p.vfsContext.getSwiftClient(ctx) +} + func (p *SwiftPath) Remove() error { + ctx := context.TODO() + done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) { - opt := swiftobject.DeleteOpts{} - _, err := swiftobject.Delete(p.client, p.bucket, p.key, opt).Extract() + client, err := p.getClient(ctx) if err != nil { + return false, err + } + opt := swiftobject.DeleteOpts{} + if _, err := swiftobject.Delete(client, p.bucket, p.key, opt).Extract(); err != nil { if isSwiftNotFound(err) { return true, os.ErrNotExist } @@ -307,22 +317,28 @@ func (p *SwiftPath) Join(relativePath ...string) Path { args = append(args, relativePath...) joined := path.Join(args...) return &SwiftPath{ - client: p.client, - bucket: p.bucket, - key: joined, + vfsContext: p.vfsContext, + bucket: p.bucket, + key: joined, } } func (p *SwiftPath) WriteFile(data io.ReadSeeker, acl ACL) error { + ctx := context.TODO() + done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) { + client, err := p.getClient(ctx) + if err != nil { + return false, err + } + klog.V(4).Infof("Writing file %q", p) if _, err := data.Seek(0, 0); err != nil { return false, fmt.Errorf("error seeking to start of data stream for %s: %v", p, err) } createOpts := swiftobject.CreateOpts{Content: data} - _, err := swiftobject.Create(p.client, p.bucket, p.key, createOpts).Extract() - if err != nil { + if _, err := swiftobject.Create(client, p.bucket, p.key, createOpts).Extract(); err != nil { return false, fmt.Errorf("error writing %s: %v", p, err) } @@ -345,14 +361,21 @@ func (p *SwiftPath) WriteFile(data io.ReadSeeker, acl ACL) error { var createFileLockSwift sync.Mutex func (p *SwiftPath) CreateFile(data io.ReadSeeker, acl ACL) error { + ctx := context.TODO() + + client, err := p.getClient(ctx) + if err != nil { + return err + } + createFileLockSwift.Lock() defer createFileLockSwift.Unlock() // Check if exists. - _, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) { + if _, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) { klog.V(4).Infof("Getting file %q", p) - _, err := swiftobject.Get(p.client, p.bucket, p.key, swiftobject.GetOpts{}).Extract() + _, err := swiftobject.Get(client, p.bucket, p.key, swiftobject.GetOpts{}).Extract() if err == nil { return true, nil } else if isSwiftNotFound(err) { @@ -360,8 +383,7 @@ func (p *SwiftPath) CreateFile(data io.ReadSeeker, acl ACL) error { } else { return false, fmt.Errorf("error getting %s: %v", p, err) } - }) - if err == nil { + }); err == nil { return os.ErrExist } else if !os.IsNotExist(err) { return err @@ -376,14 +398,20 @@ func (p *SwiftPath) CreateFile(data io.ReadSeeker, acl ACL) error { } func (p *SwiftPath) createBucket() error { + ctx := context.TODO() + done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) { - _, err := swiftcontainer.Get(p.client, p.bucket, swiftcontainer.GetOpts{}).Extract() - if err == nil { + client, err := p.getClient(ctx) + if err != nil { + return false, err + } + + if _, err := swiftcontainer.Get(client, p.bucket, swiftcontainer.GetOpts{}).Extract(); err == nil { return true, nil } if isSwiftNotFound(err) { createOpts := swiftcontainer.CreateOpts{} - _, err = swiftcontainer.Create(p.client, p.bucket, createOpts).Extract() + _, err = swiftcontainer.Create(client, p.bucket, createOpts).Extract() return err == nil, err } return false, err @@ -426,10 +454,17 @@ func (p *SwiftPath) ReadFile() ([]byte, error) { // WriteTo implements io.WriterTo func (p *SwiftPath) WriteTo(out io.Writer) (int64, error) { + ctx := context.TODO() + klog.V(4).Infof("Reading file %q", p) + client, err := p.getClient(ctx) + if err != nil { + return 0, err + } + opt := swiftobject.DownloadOpts{} - result := swiftobject.Download(p.client, p.bucket, p.key, opt) + result := swiftobject.Download(client, p.bucket, p.key, opt) if result.Err != nil { if isSwiftNotFound(result.Err) { return 0, os.ErrNotExist @@ -442,28 +477,34 @@ func (p *SwiftPath) WriteTo(out io.Writer) (int64, error) { } func (p *SwiftPath) readPath(opt swiftobject.ListOpts) ([]Path, error) { + ctx := context.TODO() + var ret []Path done, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) { + client, err := p.getClient(ctx) + if err != nil { + return false, err + } + var paths []Path - pager := swiftobject.List(p.client, p.bucket, opt) - err := pager.EachPage(func(page pagination.Page) (bool, error) { + pager := swiftobject.List(client, p.bucket, opt) + if err := pager.EachPage(func(page pagination.Page) (bool, error) { objects, err1 := swiftobject.ExtractInfo(page) if err1 != nil { return false, err1 } for _, o := range objects { child := &SwiftPath{ - client: p.client, - bucket: p.bucket, - key: o.Name, - hash: o.Hash, + vfsContext: p.vfsContext, + bucket: p.bucket, + key: o.Name, + hash: o.Hash, } paths = append(paths, child) } return true, nil - }) - if err != nil { + }); err != nil { if isSwiftNotFound(err) { return true, os.ErrNotExist }