kops/util/pkg/vfs/swiftfs.go

625 lines
16 KiB
Go

/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vfs
import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
"github.com/go-ini/ini"
"github.com/gophercloud/gophercloud/v2"
"github.com/gophercloud/gophercloud/v2/openstack"
swiftcontainer "github.com/gophercloud/gophercloud/v2/openstack/objectstorage/v1/containers"
swiftobject "github.com/gophercloud/gophercloud/v2/openstack/objectstorage/v1/objects"
"github.com/gophercloud/gophercloud/v2/pagination"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/homedir"
"k8s.io/klog/v2"
"k8s.io/kops/util/pkg/hashing"
)
func NewSwiftClient(ctx context.Context) (*gophercloud.ServiceClient, error) {
config := OpenstackConfig{}
// Check if env credentials are valid first
authOption, err := config.GetCredential()
if err != nil {
return nil, err
}
pc, err := openstack.NewClient(authOption.IdentityEndpoint)
if err != nil {
return nil, fmt.Errorf("error building openstack provider client: %v", err)
}
ua := gophercloud.UserAgent{}
ua.Prepend("kops/swift")
pc.UserAgent = ua
klog.V(4).Infof("Using user-agent %s", ua.Join())
tlsconfig := &tls.Config{}
tlsconfig.InsecureSkipVerify = true
transport := &http.Transport{TLSClientConfig: tlsconfig}
pc.HTTPClient = http.Client{
Transport: transport,
}
klog.V(2).Info("authenticating to keystone")
err = openstack.Authenticate(ctx, pc, authOption)
if err != nil {
return nil, fmt.Errorf("error building openstack authenticated client: %v", err)
}
var endpointOpt gophercloud.EndpointOpts
if region, err := config.GetRegion(); err != nil {
klog.Warningf("Retrieving swift configuration from openstack config file: %v", err)
endpointOpt, err = config.GetServiceConfig("Swift")
if err != nil {
return nil, err
}
} else {
endpointOpt = gophercloud.EndpointOpts{
Type: "object-store",
Region: region,
}
}
client, err := openstack.NewObjectStorageV1(pc, endpointOpt)
if err != nil {
return nil, fmt.Errorf("error building swift client: %v", err)
}
return client, nil
}
type OpenstackConfig struct{}
func (OpenstackConfig) filename() (string, error) {
name := os.Getenv("OPENSTACK_CREDENTIAL_FILE")
if name != "" {
klog.V(2).Infof("using openstack config found in $OPENSTACK_CREDENTIAL_FILE: %s", name)
return name, nil
}
homeDir := homedir.HomeDir()
if homeDir == "" {
return "", fmt.Errorf("can not find home directory")
}
f := filepath.Join(homeDir, ".openstack", "config")
klog.V(2).Infof("using openstack config found in %s", f)
return f, nil
}
func (oc OpenstackConfig) getSection(name string, items []string) (map[string]string, error) {
filename, err := oc.filename()
if err != nil {
return nil, err
}
config, err := ini.Load(filename)
if err != nil {
return nil, fmt.Errorf("error loading config file: %v", err)
}
section, err := config.GetSection(name)
if err != nil {
return nil, fmt.Errorf("error getting section of %s: %v", name, err)
}
values := make(map[string]string)
for _, item := range items {
values[item] = section.Key(item).String()
}
return values, nil
}
func (oc OpenstackConfig) GetCredential() (gophercloud.AuthOptions, error) {
// prioritize environment config
env, enverr := openstack.AuthOptionsFromEnv()
if enverr != nil {
klog.Warningf("Could not initialize OpenStack config from environment: %v", enverr)
// fallback to config file
return oc.getCredentialFromFile()
}
if env.ApplicationCredentialID != "" && env.Username == "" {
env.Scope = &gophercloud.AuthScope{}
}
env.AllowReauth = true
return env, nil
}
func (oc OpenstackConfig) GetRegion() (string, error) {
var region string
if region = os.Getenv("OS_REGION_NAME"); region != "" {
if len(region) > 1 {
if region[0] == '\'' && region[len(region)-1] == '\'' {
region = region[1 : len(region)-1]
}
}
return region, nil
}
items := []string{"region"}
// TODO: Unsure if this is the correct section for region
values, err := oc.getSection("Global", items)
if err != nil {
return "", fmt.Errorf("region not provided in OS_REGION_NAME or openstack config section GLOBAL")
}
return values["region"], nil
}
func (oc OpenstackConfig) getCredentialFromFile() (gophercloud.AuthOptions, error) {
opt := gophercloud.AuthOptions{}
name := "Default"
items := []string{"identity", "user", "user_id", "password", "domain_id", "domain_name", "tenant_id", "tenant_name"}
values, err := oc.getSection(name, items)
if err != nil {
return opt, err
}
for _, c1 := range []string{"identity", "password"} {
if values[c1] == "" {
return opt, fmt.Errorf("missing %s in section of %s", c1, name)
}
}
checkItems := [][]string{{"user", "user_id"}, {"domain_name", "domain_id"}, {"tenant_name", "tenant_id"}}
for _, c2 := range checkItems {
if values[c2[0]] == "" && values[c2[1]] == "" {
return opt, fmt.Errorf("missing %s and %s in section of %s", c2[0], c2[1], name)
}
}
opt.IdentityEndpoint = values["identity"]
opt.UserID = values["user_id"]
opt.Username = values["user"]
opt.Password = values["password"]
opt.TenantID = values["tenant_id"]
opt.TenantName = values["tenant_name"]
opt.DomainID = values["domain_id"]
opt.DomainName = values["domain_name"]
opt.AllowReauth = true
return opt, nil
}
func (oc OpenstackConfig) GetServiceConfig(name string) (gophercloud.EndpointOpts, error) {
opt := gophercloud.EndpointOpts{}
items := []string{"service_type", "service_name", "region", "availability"}
values, err := oc.getSection(name, items)
if err != nil {
return opt, err
}
if values["region"] == "" {
return opt, fmt.Errorf("missing region in section of %s", name)
}
opt.Type = values["service_type"]
opt.Name = values["service_name"]
opt.Region = values["region"]
opt.Availability = gophercloud.Availability(values["availability"])
return opt, nil
}
// SwiftPath is a vfs path for Openstack Cloud Storage.
type SwiftPath struct {
vfsContext *VFSContext
bucket string
key string
hash string
}
var (
_ Path = &SwiftPath{}
_ HasHash = &SwiftPath{}
)
// swiftReadBackoff is the backoff strategy for Swift read retries.
var swiftReadBackoff = wait.Backoff{
Duration: time.Second,
Factor: 1.5,
Jitter: 0.1,
Steps: 4,
}
// swiftWriteBackoff is the backoff strategy for Swift write retries.
var swiftWriteBackoff = wait.Backoff{
Duration: time.Second,
Factor: 1.5,
Jitter: 0.1,
Steps: 5,
}
func NewSwiftPath(vfsContext *VFSContext, bucket string, key string) (*SwiftPath, error) {
bucket = strings.TrimSuffix(bucket, "/")
key = strings.TrimPrefix(key, "/")
return &SwiftPath{
vfsContext: vfsContext,
bucket: bucket,
key: key,
}, nil
}
func (p *SwiftPath) Path() string {
return "swift://" + p.bucket + "/" + p.key
}
func (p *SwiftPath) Bucket() string {
return p.bucket
}
func (p *SwiftPath) String() string {
return p.Path()
}
func (p *SwiftPath) getClient(ctx context.Context) (*gophercloud.ServiceClient, error) {
return p.vfsContext.getSwiftClient(ctx)
}
func (p *SwiftPath) Remove(ctx context.Context) error {
done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) {
client, err := p.getClient(ctx)
if err != nil {
return false, err
}
opt := swiftobject.DeleteOpts{}
if _, err := swiftobject.Delete(ctx, client, p.bucket, p.key, opt).Extract(); err != nil {
if isSwiftNotFound(err) {
return true, os.ErrNotExist
}
return false, fmt.Errorf("error deleting %s: %v", p, err)
}
return true, nil
})
if err != nil {
return err
} else if done {
return nil
} else {
return wait.ErrWaitTimeout
}
}
func (p *SwiftPath) RemoveAll(ctx context.Context) error {
tree, err := p.ReadTree(ctx)
if err != nil {
return err
}
objects := make([]string, len(tree))
for i := range tree {
swiftObject, isSwiftObject := tree[i].(*SwiftPath)
if !isSwiftObject {
return fmt.Errorf("invalid path in swiftfs tree: %s", tree[i].Path())
}
objects[i] = swiftObject.key
}
objectsToDelete := []string(nil)
for len(objects) > 0 {
// BulkDelete can only process 10000 objects per call
if len(objects) > 10000 {
objectsToDelete = objects[:10000]
objects = objects[10000:]
} else {
objectsToDelete = objects
objects = nil
}
done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) {
client, err := p.getClient(ctx)
if err != nil {
return false, err
}
if _, err := swiftobject.BulkDelete(ctx, client, p.bucket, objectsToDelete).Extract(); err != nil {
if isSwiftNotFound(err) {
return true, os.ErrNotExist
}
return false, fmt.Errorf("error remove %d files: %w", len(objectsToDelete), err)
}
return true, nil
})
if err != nil {
return err
} else if done {
continue
} else {
return wait.ErrWaitTimeout
}
}
return nil
}
func (p *SwiftPath) RemoveAllVersions(ctx context.Context) error {
return p.Remove(ctx)
}
func (p *SwiftPath) Join(relativePath ...string) Path {
args := []string{p.key}
args = append(args, relativePath...)
joined := path.Join(args...)
return &SwiftPath{
vfsContext: p.vfsContext,
bucket: p.bucket,
key: joined,
}
}
func (p *SwiftPath) WriteFile(ctx context.Context, data io.ReadSeeker, acl ACL) error {
done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) {
client, err := p.getClient(ctx)
if err != nil {
return false, err
}
klog.V(4).Infof("Writing file %q", p)
if _, err := data.Seek(0, 0); err != nil {
return false, fmt.Errorf("error seeking to start of data stream for %s: %v", p, err)
}
createOpts := swiftobject.CreateOpts{Content: data}
if _, err := swiftobject.Create(ctx, client, p.bucket, p.key, createOpts).Extract(); err != nil {
return false, fmt.Errorf("error writing %s: %v", p, err)
}
return true, nil
})
if err != nil {
return err
} else if done {
return nil
} else {
// Shouldn't happen - we always return a non-nil error with false.
return wait.ErrWaitTimeout
}
}
// To prevent concurrent creates on the same file while maintaining atomicity of writes,
// we take a process-wide lock during the operation.
// Not a great approach, but fine for a single process (with low concurrency).
// TODO: should we enable versioning?
var createFileLockSwift sync.Mutex
func (p *SwiftPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL) error {
client, err := p.getClient(ctx)
if err != nil {
return err
}
createFileLockSwift.Lock()
defer createFileLockSwift.Unlock()
// Check if exists.
if _, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) {
klog.V(4).Infof("Getting file %q", p)
_, err := swiftobject.Get(ctx, client, p.bucket, p.key, swiftobject.GetOpts{}).Extract()
if err == nil {
return true, nil
} else if isSwiftNotFound(err) {
return true, os.ErrNotExist
} else {
return false, fmt.Errorf("error getting %s: %v", p, err)
}
}); err == nil {
return os.ErrExist
} else if !os.IsNotExist(err) {
return err
}
err = p.createBucket()
if err != nil {
return err
}
return p.WriteFile(ctx, data, acl)
}
func (p *SwiftPath) createBucket() error {
ctx := context.TODO()
done, err := RetryWithBackoff(swiftWriteBackoff, func() (bool, error) {
client, err := p.getClient(ctx)
if err != nil {
return false, err
}
if _, err := swiftcontainer.Get(ctx, client, p.bucket, swiftcontainer.GetOpts{}).Extract(); err == nil {
return true, nil
}
if isSwiftNotFound(err) {
createOpts := swiftcontainer.CreateOpts{}
_, err = swiftcontainer.Create(ctx, client, p.bucket, createOpts).Extract()
return err == nil, err
}
return false, err
})
if err != nil {
return err
} else if done {
return nil
} else {
// Shouldn't happen - we always return a non-nil error with false.
return wait.ErrWaitTimeout
}
}
// ReadFile implements Path::ReadFile
func (p *SwiftPath) ReadFile(ctx context.Context) ([]byte, error) {
var b bytes.Buffer
done, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) {
b.Reset()
_, err := p.WriteTo(&b)
if err != nil {
if os.IsNotExist(err) {
// Not recoverable
return true, err
}
return false, err
}
// Success!
return true, nil
})
if err != nil {
return nil, err
} else if done {
return b.Bytes(), nil
} else {
// Shouldn't happen - we always return a non-nil error with false
return nil, wait.ErrWaitTimeout
}
}
// WriteTo implements io.WriterTo
func (p *SwiftPath) WriteTo(out io.Writer) (int64, error) {
ctx := context.TODO()
klog.V(4).Infof("Reading file %q", p)
client, err := p.getClient(ctx)
if err != nil {
return 0, err
}
opt := swiftobject.DownloadOpts{}
result := swiftobject.Download(ctx, client, p.bucket, p.key, opt)
if result.Err != nil {
if isSwiftNotFound(result.Err) {
return 0, os.ErrNotExist
}
return 0, fmt.Errorf("error reading %s: %v", p, result.Err)
}
defer result.Body.Close()
return io.Copy(out, result.Body)
}
func (p *SwiftPath) readPath(opt swiftobject.ListOpts) ([]Path, error) {
ctx := context.TODO()
var ret []Path
done, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) {
client, err := p.getClient(ctx)
if err != nil {
return false, err
}
var paths []Path
pager := swiftobject.List(client, p.bucket, opt)
if err := pager.EachPage(ctx, func(ctx context.Context, page pagination.Page) (bool, error) {
objects, err1 := swiftobject.ExtractInfo(page)
if err1 != nil {
return false, err1
}
for _, o := range objects {
child := &SwiftPath{
vfsContext: p.vfsContext,
bucket: p.bucket,
key: o.Name,
hash: o.Hash,
}
paths = append(paths, child)
}
return true, nil
}); err != nil {
if isSwiftNotFound(err) {
return true, os.ErrNotExist
}
return false, fmt.Errorf("error listing %s: %v", p, err)
}
klog.V(8).Infof("Listed files in %v: %v", p, paths)
ret = paths
return true, nil
})
if err != nil {
return nil, err
} else if done {
return ret, nil
} else {
return nil, wait.ErrWaitTimeout
}
}
// ReadDir implements Path::ReadDir.
func (p *SwiftPath) ReadDir() ([]Path, error) {
prefix := p.key
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
opt := swiftobject.ListOpts{
Path: prefix,
}
return p.readPath(opt)
}
// ReadTree implements Path::ReadTree.
func (p *SwiftPath) ReadTree(ctx context.Context) ([]Path, error) {
prefix := p.key
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
opt := swiftobject.ListOpts{
Prefix: prefix,
}
return p.readPath(opt)
}
func (p *SwiftPath) Base() string {
return path.Base(p.key)
}
func (p *SwiftPath) PreferredHash() (*hashing.Hash, error) {
return p.Hash(hashing.HashAlgorithmMD5)
}
func (p *SwiftPath) Hash(a hashing.HashAlgorithm) (*hashing.Hash, error) {
if a != hashing.HashAlgorithmMD5 {
return nil, nil
}
md5 := p.hash
if md5 == "" {
return nil, nil
}
md5Bytes, err := hex.DecodeString(md5)
if err != nil {
return nil, fmt.Errorf("etag was not a valid MD5 sum: %q", md5)
}
return &hashing.Hash{Algorithm: hashing.HashAlgorithmMD5, HashValue: md5Bytes}, nil
}
func isSwiftNotFound(err error) bool {
return gophercloud.ResponseCodeIs(err, http.StatusNotFound)
}