/* Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package vfs import ( "bytes" "context" "encoding/base64" "fmt" "io" "net/http" "os" "path" "strings" "sync" "time" "google.golang.org/api/googleapi" storage "google.golang.org/api/storage/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/kops/upup/pkg/fi/cloudup/terraformWriter" "k8s.io/kops/util/pkg/hashing" ) // GSPath is a vfs path for Google Cloud Storage type GSPath struct { // vfsContext holds the VFS context for this path, // in particular the client / credentials we should use. vfsContext *VFSContext bucket string key string md5Hash string } var ( _ Path = &GSPath{} _ TerraformPath = &GSPath{} _ HasHash = &GSPath{} ) // gcsReadBackoff is the backoff strategy for GCS read retries var gcsReadBackoff = wait.Backoff{ Duration: time.Second, Factor: 1.5, Jitter: 0.1, Steps: 4, } // GSAcl is an ACL implementation for objects on Google Cloud Storage type GSAcl struct { Acl []*storage.ObjectAccessControl } func (a *GSAcl) String() string { var s []string for _, acl := range a.Acl { s = append(s, fmt.Sprintf("%+v", acl)) } return "{" + strings.Join(s, ", ") + "}" } var _ ACL = &GSAcl{} // gcsWriteBackoff is the backoff strategy for GCS write retries var gcsWriteBackoff = wait.Backoff{ Duration: time.Second, Factor: 1.5, Jitter: 0.1, Steps: 5, } func NewGSPath(c *VFSContext, bucket string, key string) *GSPath { bucket = strings.TrimSuffix(bucket, "/") key = strings.TrimPrefix(key, "/") return &GSPath{ vfsContext: c, bucket: bucket, key: key, } } func (p *GSPath) Path() string { return "gs://" + p.bucket + "/" + p.key } func (p *GSPath) Bucket() string { return p.bucket } func (p *GSPath) Object() string { return p.key } // Client returns the storage.Service bound to this path func (p *GSPath) Client(ctx context.Context) (*storage.Service, error) { return p.getStorageClient(ctx) } func (p *GSPath) String() string { return p.Path() } func (p *GSPath) Remove(ctx context.Context) error { done, err := RetryWithBackoff(gcsWriteBackoff, func() (bool, error) { client, err := p.getStorageClient(ctx) if err != nil { return false, err } if err := client.Objects.Delete(p.bucket, p.key).Context(ctx).Do(); err != nil { // TODO: Check for not-exists, return os.NotExist return false, fmt.Errorf("error deleting %s: %w", p, err) } return true, nil }) if err != nil { return err } else if done { return nil } else { // Shouldn't happen - we always return a non-nil error with false return wait.ErrWaitTimeout } } func (p *GSPath) RemoveAll(ctx context.Context) error { tree, err := p.ReadTree(ctx) if err != nil { return err } for _, objectPath := range tree { err := objectPath.Remove(ctx) if err != nil { return fmt.Errorf("error removing file %s: %w", objectPath, err) } } return nil } func (p *GSPath) RemoveAllVersions(ctx context.Context) error { return p.Remove(ctx) } func (p *GSPath) Join(relativePath ...string) Path { args := []string{p.key} args = append(args, relativePath...) joined := path.Join(args...) return &GSPath{ vfsContext: p.vfsContext, bucket: p.bucket, key: joined, } } func (p *GSPath) WriteFile(ctx context.Context, data io.ReadSeeker, acl ACL) error { md5Hash, err := hashing.HashAlgorithmMD5.Hash(data) if err != nil { return err } done, err := RetryWithBackoff(gcsWriteBackoff, func() (bool, error) { obj := &storage.Object{ Name: p.key, Md5Hash: base64.StdEncoding.EncodeToString(md5Hash.HashValue), } if acl != nil { gsACL, ok := acl.(*GSAcl) if !ok { return true, fmt.Errorf("write to %s with ACL of unexpected type %T", p, acl) } obj.Acl = gsACL.Acl klog.V(4).Infof("Writing file %q with ACL %v", p, gsACL) } else { klog.V(4).Infof("Writing file %q", p) } if _, err := data.Seek(0, 0); err != nil { return false, fmt.Errorf("error seeking to start of data stream for write to %s: %v", p, err) } client, err := p.getStorageClient(ctx) if err != nil { return false, err } _, err = client.Objects.Insert(p.bucket, obj).Context(ctx).Media(data).Do() if err != nil { return false, fmt.Errorf("error writing %s: %v", p, err) } return true, nil }) if err != nil { return err } else if done { return nil } else { // Shouldn't happen - we always return a non-nil error with false return wait.ErrWaitTimeout } } // To prevent concurrent creates on the same file while maintaining atomicity of writes, // we take a process-wide lock during the operation. // Not a great approach, but fine for a single process (with low concurrency) // TODO: should we enable versioning? var createFileLockGCS sync.Mutex func (p *GSPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL) error { createFileLockGCS.Lock() defer createFileLockGCS.Unlock() // Check if exists _, err := p.ReadFile(ctx) if err == nil { return os.ErrExist } if !os.IsNotExist(err) { return err } return p.WriteFile(ctx, data, acl) } // ReadFile implements Path::ReadFile func (p *GSPath) ReadFile(ctx context.Context) ([]byte, error) { var b bytes.Buffer done, err := RetryWithBackoff(gcsReadBackoff, func() (bool, error) { b.Reset() _, err := p.WriteTo(&b) if err != nil { if os.IsNotExist(err) { // Not recoverable return true, err } return false, err } // Success! return true, nil }) if err != nil { return nil, err } else if done { return b.Bytes(), nil } else { // Shouldn't happen - we always return a non-nil error with false return nil, wait.ErrWaitTimeout } } // WriteTo implements io.WriterTo::WriteTo func (p *GSPath) WriteTo(out io.Writer) (int64, error) { ctx := context.TODO() klog.V(4).Infof("Reading file %q", p) client, err := p.getStorageClient(ctx) if err != nil { return 0, err } response, err := client.Objects.Get(p.bucket, p.key).Context(ctx).Download() if err != nil { if isGCSNotFound(err) { return 0, os.ErrNotExist } return 0, fmt.Errorf("error reading %s: %v", p, err) } if response == nil { return 0, fmt.Errorf("no response returned from reading %s", p) } defer response.Body.Close() return io.Copy(out, response.Body) } // ReadDir implements Path::ReadDir func (p *GSPath) ReadDir() ([]Path, error) { ctx := context.TODO() var ret []Path done, err := RetryWithBackoff(gcsReadBackoff, func() (bool, error) { prefix := p.key if !strings.HasSuffix(prefix, "/") { prefix += "/" } client, err := p.getStorageClient(ctx) if err != nil { return false, err } var paths []Path if err := client.Objects.List(p.bucket).Context(ctx).Delimiter("/").Prefix(prefix).Pages(ctx, func(page *storage.Objects) error { for _, o := range page.Items { child := &GSPath{ vfsContext: p.vfsContext, bucket: p.bucket, key: o.Name, md5Hash: o.Md5Hash, } paths = append(paths, child) } return nil }); err != nil { if isGCSNotFound(err) { return true, os.ErrNotExist } return false, fmt.Errorf("error listing %s: %v", p, err) } klog.V(8).Infof("Listed files in %v: %v", p, paths) ret = paths return true, nil }) if err != nil { return nil, err } else if done { return ret, nil } else { // Shouldn't happen - we always return a non-nil error with false return nil, wait.ErrWaitTimeout } } // ReadTree implements Path::ReadTree func (p *GSPath) ReadTree(ctx context.Context) ([]Path, error) { var ret []Path done, err := RetryWithBackoff(gcsReadBackoff, func() (bool, error) { // No delimiter for recursive search prefix := p.key if prefix != "" && !strings.HasSuffix(prefix, "/") { prefix += "/" } client, err := p.getStorageClient(ctx) if err != nil { return false, err } var paths []Path if err := client.Objects.List(p.bucket).Context(ctx).Prefix(prefix).Pages(ctx, func(page *storage.Objects) error { for _, o := range page.Items { key := o.Name child := &GSPath{ vfsContext: p.vfsContext, bucket: p.bucket, key: key, md5Hash: o.Md5Hash, } paths = append(paths, child) } return nil }); err != nil { if isGCSNotFound(err) { return true, os.ErrNotExist } return false, fmt.Errorf("error listing tree %s: %v", p, err) } ret = paths return true, nil }) if err != nil { return nil, err } else if done { return ret, nil } else { // Shouldn't happen - we always return a non-nil error with false return nil, wait.ErrWaitTimeout } } func (p *GSPath) Base() string { return path.Base(p.key) } func (p *GSPath) PreferredHash() (*hashing.Hash, error) { return p.Hash(hashing.HashAlgorithmMD5) } func (p *GSPath) Hash(a hashing.HashAlgorithm) (*hashing.Hash, error) { if a != hashing.HashAlgorithmMD5 { return nil, nil } md5 := p.md5Hash if md5 == "" { return nil, nil } md5Bytes, err := base64.StdEncoding.DecodeString(md5) if err != nil { return nil, fmt.Errorf("Etag was not a valid MD5 sum: %q", md5) } return &hashing.Hash{Algorithm: hashing.HashAlgorithmMD5, HashValue: md5Bytes}, nil } func (p *GSPath) GetHTTPsUrl() (string, error) { url := fmt.Sprintf("https://storage.googleapis.com/%s/%s", p.bucket, p.key) return strings.TrimSuffix(url, "/"), nil } func (p *GSPath) IsBucketPublic(ctx context.Context) (bool, error) { client, err := p.Client(ctx) if err != nil { return false, err } bucket, err := client.Buckets.Get(p.bucket).Do() if err != nil { return false, err } // Check bucket has uniform bucket-level IAM if !bucket.IamConfiguration.BucketPolicyOnly.Enabled { return false, nil } // Check `allUsers` IAM has `roles/storage.objectViewer` permission policy, err := client.Buckets.GetIamPolicy(p.bucket).Do() if err != nil { return false, err } for _, binding := range policy.Bindings { if binding.Role == "roles/storage.objectViewer" { for _, member := range binding.Members { if member == "allUsers" { return true, nil } } } } return false, nil } type terraformGSObject struct { Bucket string `json:"bucket" cty:"bucket"` Name string `json:"name" cty:"name"` Source *terraformWriter.Literal `json:"source" cty:"source"` Provider *terraformWriter.Literal `json:"provider,omitempty" cty:"provider"` } type terraformGSObjectAccessControl struct { Bucket string `json:"bucket" cty:"bucket"` Object *terraformWriter.Literal `json:"object" cty:"object"` RoleEntity []string `json:"role_entity" cty:"role_entity"` Provider *terraformWriter.Literal `json:"provider,omitempty" cty:"provider"` } func (p *GSPath) RenderTerraform(w *terraformWriter.TerraformWriter, name string, data io.Reader, acl ACL) error { bytes, err := io.ReadAll(data) if err != nil { return fmt.Errorf("reading data: %v", err) } tfProviderArguments := map[string]string{} // GCS doesn't need the project and region specified w.EnsureTerraformProvider("google", tfProviderArguments) content, err := w.AddFilePath("google_storage_bucket_object", name, "content", bytes, false) if err != nil { return fmt.Errorf("rendering GCS file: %v", err) } tf := &terraformGSObject{ Bucket: p.Bucket(), Name: p.Object(), Source: content, Provider: terraformWriter.LiteralTokens("google", "files"), } err = w.RenderResource("google_storage_bucket_object", name, tf) if err != nil { return err } // file ACLs can be empty on GCP, because of a bucket-only ACL. if acl != nil { tfACL := &terraformGSObjectAccessControl{ Bucket: p.Bucket(), Object: p.TerraformLink(name), RoleEntity: make([]string, 0), Provider: terraformWriter.LiteralTokens("google", "files"), } for _, re := range acl.(*GSAcl).Acl { // https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/storage_object_acl#role_entity tfACL.RoleEntity = append(tfACL.RoleEntity, fmt.Sprintf("%v:%v", re.Role, re.Entity)) } return w.RenderResource("google_storage_object_acl", name, tfACL) } return nil } func (s *GSPath) TerraformLink(name string) *terraformWriter.Literal { return terraformWriter.LiteralProperty("google_storage_bucket_object", name, "output_name") } func isGCSNotFound(err error) bool { if err == nil { return false } ae, ok := err.(*googleapi.Error) return ok && ae.Code == http.StatusNotFound } func (p *GSPath) getStorageClient(ctx context.Context) (*storage.Service, error) { return p.vfsContext.getGCSClient(ctx) }