kops/util/pkg/vfs/s3fs.go

351 lines
7.9 KiB
Go

/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vfs
import (
"bytes"
"encoding/hex"
"fmt"
"io"
"os"
"path"
"strings"
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/golang/glog"
"k8s.io/kops/util/pkg/hashing"
)
type S3Path struct {
s3Context *S3Context
bucket string
region string
key string
etag *string
}
var _ Path = &S3Path{}
var _ HasHash = &S3Path{}
// S3Acl is an ACL implementation for objects on S3
type S3Acl struct {
RequestACL *string
}
func newS3Path(s3Context *S3Context, bucket string, key string) *S3Path {
bucket = strings.TrimSuffix(bucket, "/")
key = strings.TrimPrefix(key, "/")
return &S3Path{
s3Context: s3Context,
bucket: bucket,
key: key,
}
}
func (p *S3Path) Path() string {
return "s3://" + p.bucket + "/" + p.key
}
func (p *S3Path) Bucket() string {
return p.bucket
}
func (p *S3Path) Key() string {
return p.key
}
func (p *S3Path) String() string {
return p.Path()
}
func (p *S3Path) Remove() error {
client, err := p.client()
if err != nil {
return err
}
glog.V(8).Infof("removing file %s", p)
request := &s3.DeleteObjectInput{}
request.Bucket = aws.String(p.bucket)
request.Key = aws.String(p.key)
_, err = client.DeleteObject(request)
if err != nil {
// TODO: Check for not-exists, return os.NotExist
return fmt.Errorf("error deleting %s: %v", p, err)
}
return nil
}
func (p *S3Path) Join(relativePath ...string) Path {
args := []string{p.key}
args = append(args, relativePath...)
joined := path.Join(args...)
return &S3Path{
s3Context: p.s3Context,
bucket: p.bucket,
key: joined,
}
}
func (p *S3Path) WriteFile(data io.ReadSeeker, aclObj ACL) error {
client, err := p.client()
if err != nil {
return err
}
glog.V(4).Infof("Writing file %q", p)
// We always use server-side-encryption; it doesn't really cost us anything
sse := "AES256"
request := &s3.PutObjectInput{}
request.Body = data
request.Bucket = aws.String(p.bucket)
request.Key = aws.String(p.key)
request.ServerSideEncryption = aws.String(sse)
acl := os.Getenv("KOPS_STATE_S3_ACL")
acl = strings.TrimSpace(acl)
if acl != "" {
glog.Infof("Using KOPS_STATE_S3_ACL=%s", acl)
request.ACL = aws.String(acl)
} else if aclObj != nil {
s3Acl, ok := aclObj.(*S3Acl)
if !ok {
return fmt.Errorf("write to %s with ACL of unexpected type %T", p, aclObj)
}
request.ACL = s3Acl.RequestACL
}
// We don't need Content-MD5: https://github.com/aws/aws-sdk-go/issues/208
glog.V(8).Infof("Calling S3 PutObject Bucket=%q Key=%q SSE=%q ACL=%q", p.bucket, p.key, sse, acl)
_, err = client.PutObject(request)
if err != nil {
if acl != "" {
return fmt.Errorf("error writing %s (with ACL=%q): %v", p, acl, err)
} else {
return fmt.Errorf("error writing %s: %v", p, err)
}
}
return nil
}
// To prevent concurrent creates on the same file while maintaining atomicity of writes,
// we take a process-wide lock during the operation.
// Not a great approach, but fine for a single process (with low concurrency)
// TODO: should we enable versioning?
var createFileLockS3 sync.Mutex
func (p *S3Path) CreateFile(data io.ReadSeeker, acl ACL) error {
createFileLockS3.Lock()
defer createFileLockS3.Unlock()
// Check if exists
_, err := p.ReadFile()
if err == nil {
return os.ErrExist
}
if !os.IsNotExist(err) {
return err
}
return p.WriteFile(data, acl)
}
// ReadFile implements Path::ReadFile
func (p *S3Path) ReadFile() ([]byte, error) {
var b bytes.Buffer
_, err := p.WriteTo(&b)
if err != nil {
return nil, err
}
return b.Bytes(), nil
}
// WriteTo implements io.WriterTo
func (p *S3Path) WriteTo(out io.Writer) (int64, error) {
client, err := p.client()
if err != nil {
return 0, err
}
glog.V(4).Infof("Reading file %q", p)
request := &s3.GetObjectInput{}
request.Bucket = aws.String(p.bucket)
request.Key = aws.String(p.key)
response, err := client.GetObject(request)
if err != nil {
if AWSErrorCode(err) == "NoSuchKey" {
return 0, os.ErrNotExist
}
return 0, fmt.Errorf("error fetching %s: %v", p, err)
}
defer response.Body.Close()
n, err := io.Copy(out, response.Body)
if err != nil {
return n, fmt.Errorf("error reading %s: %v", p, err)
}
return n, nil
}
func (p *S3Path) ReadDir() ([]Path, error) {
client, err := p.client()
if err != nil {
return nil, err
}
prefix := p.key
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
request := &s3.ListObjectsInput{}
request.Bucket = aws.String(p.bucket)
request.Prefix = aws.String(prefix)
request.Delimiter = aws.String("/")
glog.V(4).Infof("Listing objects in S3 bucket %q with prefix %q", p.bucket, prefix)
var paths []Path
err = client.ListObjectsPages(request, func(page *s3.ListObjectsOutput, lastPage bool) bool {
for _, o := range page.Contents {
key := aws.StringValue(o.Key)
if key == prefix {
// We have reports (#548 and #520) of the directory being returned as a file
// And this will indeed happen if the directory has been created as a file,
// which seems to happen if you use some external tools to manipulate the S3 bucket.
// We need to tolerate that, so skip the parent directory.
glog.V(4).Infof("Skipping read of directory: %q", key)
continue
}
child := &S3Path{
s3Context: p.s3Context,
bucket: p.bucket,
key: key,
etag: o.ETag,
}
paths = append(paths, child)
}
return true
})
if err != nil {
return nil, fmt.Errorf("error listing %s: %v", p, err)
}
glog.V(8).Infof("Listed files in %v: %v", p, paths)
return paths, nil
}
func (p *S3Path) ReadTree() ([]Path, error) {
client, err := p.client()
if err != nil {
return nil, err
}
request := &s3.ListObjectsInput{}
request.Bucket = aws.String(p.bucket)
prefix := p.key
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
request.Prefix = aws.String(prefix)
// No delimiter for recursive search
var paths []Path
err = client.ListObjectsPages(request, func(page *s3.ListObjectsOutput, lastPage bool) bool {
for _, o := range page.Contents {
key := aws.StringValue(o.Key)
child := &S3Path{
s3Context: p.s3Context,
bucket: p.bucket,
key: key,
etag: o.ETag,
}
paths = append(paths, child)
}
return true
})
if err != nil {
return nil, fmt.Errorf("error listing %s: %v", p, err)
}
return paths, nil
}
func (p *S3Path) client() (*s3.S3, error) {
var err error
if p.region == "" {
p.region, err = p.s3Context.getRegionForBucket(p.bucket)
if err != nil {
return nil, err
}
}
client, err := p.s3Context.getClient(p.region)
if err != nil {
return nil, err
}
return client, nil
}
func (p *S3Path) Base() string {
return path.Base(p.key)
}
func (p *S3Path) PreferredHash() (*hashing.Hash, error) {
return p.Hash(hashing.HashAlgorithmMD5)
}
func (p *S3Path) Hash(a hashing.HashAlgorithm) (*hashing.Hash, error) {
if a != hashing.HashAlgorithmMD5 {
return nil, nil
}
if p.etag == nil {
return nil, nil
}
md5 := strings.Trim(*p.etag, "\"")
md5Bytes, err := hex.DecodeString(md5)
if err != nil {
return nil, fmt.Errorf("Etag was not a valid MD5 sum: %q", *p.etag)
}
return &hashing.Hash{Algorithm: hashing.HashAlgorithmMD5, HashValue: md5Bytes}, nil
}
// AWSErrorCode returns the aws error code, if it is an awserr.Error, otherwise ""
func AWSErrorCode(err error) string {
if awsError, ok := err.(awserr.Error); ok {
return awsError.Code()
}
return ""
}