mirror of https://github.com/kubernetes/kops.git
				
				
				
			
		
			
				
	
	
		
			373 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			373 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Go
		
	
	
	
/*
 | 
						|
Copyright 2019 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package vfs
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"encoding/hex"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"io/ioutil"
 | 
						|
	"net/http"
 | 
						|
	"os"
 | 
						|
	"path"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/denverdino/aliyungo/oss"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	"k8s.io/klog/v2"
 | 
						|
	"k8s.io/kops/util/pkg/hashing"
 | 
						|
)
 | 
						|
 | 
						|
// OSSPath is a vfs path for Aliyun Open Storage Service
 | 
						|
type OSSPath struct {
 | 
						|
	client *oss.Client
 | 
						|
	bucket string
 | 
						|
	hash   string
 | 
						|
	key    string
 | 
						|
}
 | 
						|
 | 
						|
var _ Path = &OSSPath{}
 | 
						|
var _ HasHash = &OSSPath{}
 | 
						|
 | 
						|
// ossReadBackoff is the backoff strategy for Aliyun OSS read retries.
 | 
						|
var ossReadBackoff = wait.Backoff{
 | 
						|
	Duration: time.Second,
 | 
						|
	Factor:   1.5,
 | 
						|
	Jitter:   0.1,
 | 
						|
	Steps:    4,
 | 
						|
}
 | 
						|
 | 
						|
// ossWriteBackoff is the backoff strategy for Aliyun OSS write retries
 | 
						|
var ossWriteBackoff = wait.Backoff{
 | 
						|
	Duration: time.Second,
 | 
						|
	Factor:   1.5,
 | 
						|
	Jitter:   0.1,
 | 
						|
	Steps:    5,
 | 
						|
}
 | 
						|
 | 
						|
type listOption struct {
 | 
						|
	prefix string
 | 
						|
	delim  string
 | 
						|
	marker string
 | 
						|
	max    int
 | 
						|
}
 | 
						|
 | 
						|
// WriteTo implements io.WriteTo
 | 
						|
func (p *OSSPath) WriteTo(out io.Writer) (int64, error) {
 | 
						|
	klog.V(4).Infof("Reading file %q", p)
 | 
						|
 | 
						|
	b := p.client.Bucket(p.bucket)
 | 
						|
	headers := http.Header{}
 | 
						|
 | 
						|
	response, err := b.GetResponseWithHeaders(p.key, headers)
 | 
						|
	if err != nil {
 | 
						|
		if isOSSNotFound(err) {
 | 
						|
			return 0, os.ErrNotExist
 | 
						|
		}
 | 
						|
		return 0, fmt.Errorf("error fetching %s: %v", p, err)
 | 
						|
	}
 | 
						|
	defer response.Body.Close()
 | 
						|
 | 
						|
	n, err := io.Copy(out, response.Body)
 | 
						|
	if err != nil {
 | 
						|
		return n, fmt.Errorf("error reading %s: %v", p, err)
 | 
						|
	}
 | 
						|
	return n, nil
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) Join(relativePath ...string) Path {
 | 
						|
	args := []string{p.key}
 | 
						|
	args = append(args, relativePath...)
 | 
						|
	joined := path.Join(args...)
 | 
						|
	return &OSSPath{
 | 
						|
		client: p.client,
 | 
						|
		bucket: p.bucket,
 | 
						|
		key:    joined,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) ReadFile() ([]byte, error) {
 | 
						|
	var b bytes.Buffer
 | 
						|
	done, err := RetryWithBackoff(ossReadBackoff, func() (bool, error) {
 | 
						|
		b.Reset()
 | 
						|
		_, err := p.WriteTo(&b)
 | 
						|
		if err != nil {
 | 
						|
			if os.IsNotExist(err) {
 | 
						|
				// Not recoverable
 | 
						|
				return true, err
 | 
						|
			}
 | 
						|
			return false, err
 | 
						|
		}
 | 
						|
		// Success!
 | 
						|
		return true, nil
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	} else if done {
 | 
						|
		return b.Bytes(), nil
 | 
						|
	} else {
 | 
						|
		// Shouldn't happen - we always return a non-nil error with false
 | 
						|
		return nil, wait.ErrWaitTimeout
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) WriteFile(data io.ReadSeeker, acl ACL) error {
 | 
						|
	b := p.client.Bucket(p.bucket)
 | 
						|
 | 
						|
	done, err := RetryWithBackoff(ossWriteBackoff, func() (bool, error) {
 | 
						|
		klog.V(4).Infof("Writing file %q", p)
 | 
						|
 | 
						|
		var perm oss.ACL
 | 
						|
		var ok bool
 | 
						|
		if acl != nil {
 | 
						|
			perm, ok = acl.(oss.ACL)
 | 
						|
			if !ok {
 | 
						|
				return true, fmt.Errorf("write to %s with ACL of unexpected type %T", p, acl)
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			// Private currently is the default ACL
 | 
						|
			perm = oss.Private
 | 
						|
		}
 | 
						|
 | 
						|
		if _, err := data.Seek(0, 0); err != nil {
 | 
						|
			return false, fmt.Errorf("error seeking to start of data stream for write to %s: %v", p, err)
 | 
						|
		}
 | 
						|
 | 
						|
		bytes, err := ioutil.ReadAll(data)
 | 
						|
		if err != nil {
 | 
						|
			return false, fmt.Errorf("error reading from data stream: %v", err)
 | 
						|
		}
 | 
						|
 | 
						|
		contType := "application/octet-stream"
 | 
						|
		err = b.Put(p.key, bytes, contType, perm, oss.Options{})
 | 
						|
		if err != nil {
 | 
						|
			return false, fmt.Errorf("error writing %s: %v", p, err)
 | 
						|
		}
 | 
						|
		return true, nil
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	} else if done {
 | 
						|
		return nil
 | 
						|
	} else {
 | 
						|
		// Shouldn't happen - we always return a non-nil error with false
 | 
						|
		return wait.ErrWaitTimeout
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// To prevent concurrent creates on the same file while maintaining atomicity of writes,
 | 
						|
// we take a process-wide lock during the operation.
 | 
						|
// Not a great approach, but fine for a single process (with low concurrency)
 | 
						|
// TODO: should we enable versioning?
 | 
						|
var createFileLockOSS sync.Mutex
 | 
						|
 | 
						|
func (p *OSSPath) CreateFile(data io.ReadSeeker, acl ACL) error {
 | 
						|
	createFileLockOSS.Lock()
 | 
						|
	defer createFileLockOSS.Unlock()
 | 
						|
 | 
						|
	// Check if exists
 | 
						|
	b := p.client.Bucket(p.bucket)
 | 
						|
	exist, err := b.Exists(p.key)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if exist {
 | 
						|
		return os.ErrExist
 | 
						|
	}
 | 
						|
 | 
						|
	return p.WriteFile(data, acl)
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) Remove() error {
 | 
						|
	b := p.client.Bucket(p.bucket)
 | 
						|
 | 
						|
	done, err := RetryWithBackoff(ossWriteBackoff, func() (bool, error) {
 | 
						|
		klog.V(8).Infof("removing file %s", p)
 | 
						|
 | 
						|
		err := b.Del(p.key)
 | 
						|
		if err != nil {
 | 
						|
			return false, fmt.Errorf("error deleting %s: %v", p, err)
 | 
						|
		}
 | 
						|
		return true, nil
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	} else if done {
 | 
						|
		return nil
 | 
						|
	} else {
 | 
						|
		// Shouldn't happen - we always return a non-nil error with false
 | 
						|
		return wait.ErrWaitTimeout
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) RemoveAllVersions() error {
 | 
						|
	return p.Remove()
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) Base() string {
 | 
						|
	return path.Base(p.key)
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) String() string {
 | 
						|
	return p.Path()
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) Path() string {
 | 
						|
	return "oss://" + p.bucket + "/" + p.key
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) Bucket() string {
 | 
						|
	return p.bucket
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) Key() string {
 | 
						|
	return p.key
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) ReadDir() ([]Path, error) {
 | 
						|
	prefix := p.key
 | 
						|
	if !strings.HasSuffix(prefix, "/") {
 | 
						|
		prefix += "/"
 | 
						|
	}
 | 
						|
	// OSS can return at most 1000 paths(keys + common prefixes) at a time
 | 
						|
	opt := listOption{
 | 
						|
		prefix: prefix,
 | 
						|
		delim:  "/",
 | 
						|
		marker: "",
 | 
						|
		max:    1000,
 | 
						|
	}
 | 
						|
 | 
						|
	return p.listPath(opt)
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) ReadTree() ([]Path, error) {
 | 
						|
	prefix := p.key
 | 
						|
	if !strings.HasSuffix(prefix, "/") {
 | 
						|
		prefix += "/"
 | 
						|
	}
 | 
						|
	// OSS can return at most 1000 paths(keys + common prefixes) at a time
 | 
						|
	opt := listOption{
 | 
						|
		prefix: prefix,
 | 
						|
		// No delimiter for recursive search
 | 
						|
		delim:  "",
 | 
						|
		marker: "",
 | 
						|
		max:    1000,
 | 
						|
	}
 | 
						|
 | 
						|
	return p.listPath(opt)
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) PreferredHash() (*hashing.Hash, error) {
 | 
						|
	return p.Hash(hashing.HashAlgorithmMD5)
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) Hash(a hashing.HashAlgorithm) (*hashing.Hash, error) {
 | 
						|
	if a != hashing.HashAlgorithmMD5 {
 | 
						|
		return nil, nil
 | 
						|
	}
 | 
						|
 | 
						|
	md5 := p.hash
 | 
						|
	if md5 == "" {
 | 
						|
		return nil, nil
 | 
						|
	}
 | 
						|
 | 
						|
	md5Bytes, err := hex.DecodeString(md5)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("Etag was not a valid MD5 sum: %q", md5)
 | 
						|
	}
 | 
						|
 | 
						|
	return &hashing.Hash{Algorithm: hashing.HashAlgorithmMD5, HashValue: md5Bytes}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (p *OSSPath) listPath(opt listOption) ([]Path, error) {
 | 
						|
	var ret []Path
 | 
						|
	b := p.client.Bucket(p.bucket)
 | 
						|
 | 
						|
	done, err := RetryWithBackoff(ossReadBackoff, func() (bool, error) {
 | 
						|
 | 
						|
		var paths []Path
 | 
						|
		for {
 | 
						|
			// OSS can return at most 1000 paths(keys + common prefixes) at a time
 | 
						|
			resp, err := b.List(opt.prefix, opt.delim, opt.marker, opt.max)
 | 
						|
			if err != nil {
 | 
						|
				if isOSSNotFound(err) {
 | 
						|
					return true, os.ErrNotExist
 | 
						|
				}
 | 
						|
				return false, fmt.Errorf("error listing %s: %v", p, err)
 | 
						|
			}
 | 
						|
 | 
						|
			if len(resp.Contents) != 0 || len(resp.CommonPrefixes) != 0 {
 | 
						|
				// Contents represent files
 | 
						|
				for _, k := range resp.Contents {
 | 
						|
					child := &OSSPath{
 | 
						|
						client: p.client,
 | 
						|
						bucket: p.bucket,
 | 
						|
						key:    k.Key,
 | 
						|
					}
 | 
						|
					paths = append(paths, child)
 | 
						|
				}
 | 
						|
				if len(resp.Contents) != 0 {
 | 
						|
					// start with the last key in next iteration of listing.
 | 
						|
					opt.marker = resp.Contents[len(resp.Contents)-1].Key
 | 
						|
				}
 | 
						|
 | 
						|
				// CommonPrefixes represent directories
 | 
						|
				for _, d := range resp.CommonPrefixes {
 | 
						|
					child := &OSSPath{
 | 
						|
						client: p.client,
 | 
						|
						bucket: p.bucket,
 | 
						|
						key:    d,
 | 
						|
					}
 | 
						|
					paths = append(paths, child)
 | 
						|
				}
 | 
						|
				if len(resp.CommonPrefixes) != 0 {
 | 
						|
					lastComPref := resp.CommonPrefixes[len(resp.CommonPrefixes)-1]
 | 
						|
					if strings.Compare(lastComPref, opt.marker) == 1 {
 | 
						|
						opt.marker = lastComPref
 | 
						|
					}
 | 
						|
				}
 | 
						|
			} else {
 | 
						|
				// no more files or directories
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
		klog.V(8).Infof("Listed files in %v: %v", p, paths)
 | 
						|
		ret = paths
 | 
						|
		return true, nil
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	} else if done {
 | 
						|
		return ret, nil
 | 
						|
	} else {
 | 
						|
		// Shouldn't happen - we always return a non-nil error with false
 | 
						|
		return nil, wait.ErrWaitTimeout
 | 
						|
	}
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
func isOSSNotFound(err error) bool {
 | 
						|
	if err == nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	ossErr, ok := err.(*oss.Error)
 | 
						|
	return ok && ossErr.StatusCode == 404
 | 
						|
}
 |