Merge pull request #14843 from justinsb/vfs_openstack_swift_context

VFS: Add context to internals of Openstack Swift client
This commit is contained in:
Kubernetes Prow Robot 2022-12-21 20:53:24 -08:00 committed by GitHub
commit 61fd1289cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 91 additions and 41 deletions

View File

@ -438,6 +438,23 @@ func (c *VFSContext) getGCSClient(ctx context.Context) (*storage.Service, error)
return gcsClient, nil
}
// getSwiftClient returns the openstack switch client, caching it for future calls
func (c *VFSContext) getSwiftClient(ctx context.Context) (*gophercloud.ServiceClient, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.swiftClient != nil {
return c.swiftClient, nil
}
swiftClient, err := NewSwiftClient(ctx)
if err != nil {
return nil, err
}
c.swiftClient = swiftClient
return swiftClient, nil
}
func (c *VFSContext) buildOpenstackSwiftPath(p string) (*SwiftPath, error) {
u, err := url.Parse(p)
if err != nil {
@ -453,15 +470,7 @@ func (c *VFSContext) buildOpenstackSwiftPath(p string) (*SwiftPath, error) {
return nil, fmt.Errorf("invalid swift path: %q", p)
}
if c.swiftClient == nil {
swiftClient, err := NewSwiftClient()
if err != nil {
return nil, err
}
c.swiftClient = swiftClient
}
return NewSwiftPath(c.swiftClient, bucket, u.Path)
return NewSwiftPath(c, bucket, u.Path)
}
func (c *VFSContext) buildAzureBlobPath(p string) (*AzureBlobPath, error) {

View File

@ -18,6 +18,7 @@ package vfs
import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
"fmt"
@ -42,7 +43,7 @@ import (
"k8s.io/kops/util/pkg/hashing"
)
func NewSwiftClient() (*gophercloud.ServiceClient, error) {
func NewSwiftClient(ctx context.Context) (*gophercloud.ServiceClient, error) {
config := OpenstackConfig{}
// Check if env credentials are valid first
@ -226,10 +227,10 @@ func (oc OpenstackConfig) GetServiceConfig(name string) (gophercloud.EndpointOpt
// SwiftPath is a vfs path for Openstack Cloud Storage.
type SwiftPath struct {
client *gophercloud.ServiceClient
bucket string
key string
hash string
vfsContext *VFSContext
bucket string
key string
hash string
}
var (
@ -253,14 +254,14 @@ var swiftWriteBackoff = wait.Backoff{
Steps: 5,
}
func NewSwiftPath(client *gophercloud.ServiceClient, bucket string, key string) (*SwiftPath, error) {
func NewSwiftPath(vfsContext *VFSContext, bucket string, key string) (*SwiftPath, error) {
bucket = strings.TrimSuffix(bucket, "/")
key = strings.TrimPrefix(key, "/")
return &SwiftPath{
client: client,
bucket: bucket,
key: key,
vfsContext: vfsContext,
bucket: bucket,
key: key,
}, nil
}
@ -276,11 +277,20 @@ func (p *SwiftPath) String() string {
return p.Path()
}
func (p *SwiftPath) getClient(ctx context.Context) (*gophercloud.ServiceClient, error) {
return p.vfsContext.getSwiftClient(ctx)
}
func (p *SwiftPath) Remove() error {
ctx := context.TODO()
done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) {
opt := swiftobject.DeleteOpts{}
_, err := swiftobject.Delete(p.client, p.bucket, p.key, opt).Extract()
client, err := p.getClient(ctx)
if err != nil {
return false, err
}
opt := swiftobject.DeleteOpts{}
if _, err := swiftobject.Delete(client, p.bucket, p.key, opt).Extract(); err != nil {
if isSwiftNotFound(err) {
return true, os.ErrNotExist
}
@ -307,22 +317,28 @@ func (p *SwiftPath) Join(relativePath ...string) Path {
args = append(args, relativePath...)
joined := path.Join(args...)
return &SwiftPath{
client: p.client,
bucket: p.bucket,
key: joined,
vfsContext: p.vfsContext,
bucket: p.bucket,
key: joined,
}
}
func (p *SwiftPath) WriteFile(data io.ReadSeeker, acl ACL) error {
ctx := context.TODO()
done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) {
client, err := p.getClient(ctx)
if err != nil {
return false, err
}
klog.V(4).Infof("Writing file %q", p)
if _, err := data.Seek(0, 0); err != nil {
return false, fmt.Errorf("error seeking to start of data stream for %s: %v", p, err)
}
createOpts := swiftobject.CreateOpts{Content: data}
_, err := swiftobject.Create(p.client, p.bucket, p.key, createOpts).Extract()
if err != nil {
if _, err := swiftobject.Create(client, p.bucket, p.key, createOpts).Extract(); err != nil {
return false, fmt.Errorf("error writing %s: %v", p, err)
}
@ -345,14 +361,21 @@ func (p *SwiftPath) WriteFile(data io.ReadSeeker, acl ACL) error {
var createFileLockSwift sync.Mutex
func (p *SwiftPath) CreateFile(data io.ReadSeeker, acl ACL) error {
ctx := context.TODO()
client, err := p.getClient(ctx)
if err != nil {
return err
}
createFileLockSwift.Lock()
defer createFileLockSwift.Unlock()
// Check if exists.
_, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) {
if _, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) {
klog.V(4).Infof("Getting file %q", p)
_, err := swiftobject.Get(p.client, p.bucket, p.key, swiftobject.GetOpts{}).Extract()
_, err := swiftobject.Get(client, p.bucket, p.key, swiftobject.GetOpts{}).Extract()
if err == nil {
return true, nil
} else if isSwiftNotFound(err) {
@ -360,8 +383,7 @@ func (p *SwiftPath) CreateFile(data io.ReadSeeker, acl ACL) error {
} else {
return false, fmt.Errorf("error getting %s: %v", p, err)
}
})
if err == nil {
}); err == nil {
return os.ErrExist
} else if !os.IsNotExist(err) {
return err
@ -376,14 +398,20 @@ func (p *SwiftPath) CreateFile(data io.ReadSeeker, acl ACL) error {
}
func (p *SwiftPath) createBucket() error {
ctx := context.TODO()
done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) {
_, err := swiftcontainer.Get(p.client, p.bucket, swiftcontainer.GetOpts{}).Extract()
if err == nil {
client, err := p.getClient(ctx)
if err != nil {
return false, err
}
if _, err := swiftcontainer.Get(client, p.bucket, swiftcontainer.GetOpts{}).Extract(); err == nil {
return true, nil
}
if isSwiftNotFound(err) {
createOpts := swiftcontainer.CreateOpts{}
_, err = swiftcontainer.Create(p.client, p.bucket, createOpts).Extract()
_, err = swiftcontainer.Create(client, p.bucket, createOpts).Extract()
return err == nil, err
}
return false, err
@ -426,10 +454,17 @@ func (p *SwiftPath) ReadFile() ([]byte, error) {
// WriteTo implements io.WriterTo
func (p *SwiftPath) WriteTo(out io.Writer) (int64, error) {
ctx := context.TODO()
klog.V(4).Infof("Reading file %q", p)
client, err := p.getClient(ctx)
if err != nil {
return 0, err
}
opt := swiftobject.DownloadOpts{}
result := swiftobject.Download(p.client, p.bucket, p.key, opt)
result := swiftobject.Download(client, p.bucket, p.key, opt)
if result.Err != nil {
if isSwiftNotFound(result.Err) {
return 0, os.ErrNotExist
@ -442,28 +477,34 @@ func (p *SwiftPath) WriteTo(out io.Writer) (int64, error) {
}
func (p *SwiftPath) readPath(opt swiftobject.ListOpts) ([]Path, error) {
ctx := context.TODO()
var ret []Path
done, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) {
client, err := p.getClient(ctx)
if err != nil {
return false, err
}
var paths []Path
pager := swiftobject.List(p.client, p.bucket, opt)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
pager := swiftobject.List(client, p.bucket, opt)
if err := pager.EachPage(func(page pagination.Page) (bool, error) {
objects, err1 := swiftobject.ExtractInfo(page)
if err1 != nil {
return false, err1
}
for _, o := range objects {
child := &SwiftPath{
client: p.client,
bucket: p.bucket,
key: o.Name,
hash: o.Hash,
vfsContext: p.vfsContext,
bucket: p.bucket,
key: o.Name,
hash: o.Hash,
}
paths = append(paths, child)
}
return true, nil
})
if err != nil {
}); err != nil {
if isSwiftNotFound(err) {
return true, os.ErrNotExist
}