Merge pull request #4217 from justinsb/support_writeto_in_vfs_path

VFS: Support io.WriterTo interface
This commit is contained in:
k8s-ci-robot 2018-01-09 12:42:14 -08:00 committed by GitHub
commit 96268846c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 149 additions and 52 deletions

View File

@ -18,12 +18,12 @@ package models
import (
"errors"
"io"
"os"
"path"
"strings"
_ "github.com/google/cadvisor/pages/static"
"k8s.io/kops/util/pkg/vfs"
)
@ -57,6 +57,17 @@ func (p *AssetPath) CreateFile(data []byte, acl vfs.ACL) error {
return ReadOnlyError
}
// WriteTo implements io.WriterTo
func (p *AssetPath) WriteTo(out io.Writer) (int64, error) {
data, err := p.ReadFile()
if err != nil {
return 0, err
}
n, err := out.Write(data)
return int64(n), err
}
// ReadFile implements Path::ReadFile
func (p *AssetPath) ReadFile() ([]byte, error) {
data, err := Asset(p.location)
if err != nil {

View File

@ -112,10 +112,22 @@ func (p *FSPath) CreateFile(data []byte, acl ACL) error {
return p.WriteFile(data, acl)
}
// ReadFile implements Path::ReadFile
func (p *FSPath) ReadFile() ([]byte, error) {
return ioutil.ReadFile(p.location)
}
// WriteTo implements io.WriterTo
func (p *FSPath) WriteTo(out io.Writer) (int64, error) {
f, err := os.Open(p.location)
if err != nil {
return 0, err
}
defer f.Close()
return io.Copy(out, f)
}
func (p *FSPath) ReadDir() ([]Path, error) {
files, err := ioutil.ReadDir(p.location)
if err != nil {

View File

@ -21,7 +21,7 @@ import (
"encoding/base64"
"encoding/hex"
"fmt"
"io/ioutil"
"io"
"net/http"
"os"
"path"
@ -200,39 +200,49 @@ func (p *GSPath) CreateFile(data []byte, acl ACL) error {
// ReadFile implements Path::ReadFile
func (p *GSPath) ReadFile() ([]byte, error) {
var ret []byte
var b bytes.Buffer
done, err := RetryWithBackoff(gcsReadBackoff, func() (bool, error) {
glog.V(4).Infof("Reading file %q", p)
response, err := p.client.Objects.Get(p.bucket, p.key).Download()
b.Reset()
_, err := p.WriteTo(&b)
if err != nil {
if isGCSNotFound(err) {
return true, os.ErrNotExist
if os.IsNotExist(err) {
// Not recoverable
return true, err
}
return false, fmt.Errorf("error reading %s: %v", p, err)
return false, err
}
if response == nil {
return false, fmt.Errorf("no response returned from reading %s", p)
}
defer response.Body.Close()
data, err := ioutil.ReadAll(response.Body)
if err != nil {
return false, fmt.Errorf("error reading %s: %v", p, err)
}
ret = data
// Success!
return true, nil
})
if err != nil {
return nil, err
} else if done {
return ret, nil
return b.Bytes(), nil
} else {
// Shouldn't happen - we always return a non-nil error with false
return nil, wait.ErrWaitTimeout
}
}
// WriteTo implements io.WriterTo::WriteTo
func (p *GSPath) WriteTo(out io.Writer) (int64, error) {
glog.V(4).Infof("Reading file %q", p)
response, err := p.client.Objects.Get(p.bucket, p.key).Download()
if err != nil {
if isGCSNotFound(err) {
return 0, os.ErrNotExist
}
return 0, fmt.Errorf("error reading %s: %v", p, err)
}
if response == nil {
return 0, fmt.Errorf("no response returned from reading %s", p)
}
defer response.Body.Close()
return io.Copy(out, response.Body)
}
// ReadDir implements Path::ReadDir
func (p *GSPath) ReadDir() ([]Path, error) {
var ret []Path

View File

@ -17,7 +17,9 @@ limitations under the License.
package vfs
import (
"bytes"
"fmt"
"io"
"path"
"strings"
@ -85,8 +87,19 @@ func (p *KubernetesPath) CreateFile(data []byte, acl ACL) error {
return fmt.Errorf("KubernetesPath::CreateFile not supported")
}
// ReadFile implements Path::ReadFile
func (p *KubernetesPath) ReadFile() ([]byte, error) {
return nil, fmt.Errorf("KubernetesPath::ReadFile not supported")
var b bytes.Buffer
_, err := p.WriteTo(&b)
if err != nil {
return nil, err
}
return b.Bytes(), nil
}
// WriteTo implements io.WriterTo
func (p *KubernetesPath) WriteTo(out io.Writer) (int64, error) {
return 0, fmt.Errorf("KubernetesPath::WriteTo not supported")
}
func (p *KubernetesPath) ReadDir() ([]Path, error) {

View File

@ -17,6 +17,7 @@ limitations under the License.
package vfs
import (
"io"
"os"
"path"
"strings"
@ -105,6 +106,7 @@ func (p *MemFSPath) CreateFile(data []byte, acl ACL) error {
return p.WriteFile(data, acl)
}
// ReadFile implements Path::ReadFile
func (p *MemFSPath) ReadFile() ([]byte, error) {
if p.contents == nil {
return nil, os.ErrNotExist
@ -113,6 +115,15 @@ func (p *MemFSPath) ReadFile() ([]byte, error) {
return p.contents, nil
}
// WriteTo implements io.WriterTo
func (p *MemFSPath) WriteTo(out io.Writer) (int64, error) {
if p.contents == nil {
return 0, os.ErrNotExist
}
n, err := out.Write(p.contents)
return int64(n), err
}
func (p *MemFSPath) ReadDir() ([]Path, error) {
var paths []Path
for _, f := range p.children {

View File

@ -20,7 +20,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"io/ioutil"
"io"
"os"
"path"
"strings"
@ -178,11 +178,22 @@ func (p *S3Path) CreateFile(data []byte, acl ACL) error {
return p.WriteFile(data, acl)
}
// ReadFile implements Path::ReadFile
func (p *S3Path) ReadFile() ([]byte, error) {
client, err := p.client()
var b bytes.Buffer
_, err := p.WriteTo(&b)
if err != nil {
return nil, err
}
return b.Bytes(), nil
}
// WriteTo implements io.WriterTo
func (p *S3Path) WriteTo(out io.Writer) (int64, error) {
client, err := p.client()
if err != nil {
return 0, err
}
glog.V(4).Infof("Reading file %q", p)
@ -193,17 +204,17 @@ func (p *S3Path) ReadFile() ([]byte, error) {
response, err := client.GetObject(request)
if err != nil {
if AWSErrorCode(err) == "NoSuchKey" {
return nil, os.ErrNotExist
return 0, os.ErrNotExist
}
return nil, fmt.Errorf("error fetching %s: %v", p, err)
return 0, fmt.Errorf("error fetching %s: %v", p, err)
}
defer response.Body.Close()
d, err := ioutil.ReadAll(response.Body)
n, err := io.Copy(out, response.Body)
if err != nil {
return nil, fmt.Errorf("error reading %s: %v", p, err)
return n, fmt.Errorf("error reading %s: %v", p, err)
}
return d, nil
return n, nil
}
func (p *S3Path) ReadDir() ([]Path, error) {

View File

@ -247,26 +247,31 @@ func (p *SSHPath) CreateFile(data []byte, acl ACL) error {
return p.WriteFile(data, acl)
}
// ReadFile implements Path::ReadFile
func (p *SSHPath) ReadFile() ([]byte, error) {
sftpClient, err := p.newClient()
var b bytes.Buffer
_, err := p.WriteTo(&b)
if err != nil {
return nil, err
}
return b.Bytes(), nil
}
// WriteTo implements io.WriterTo
func (p *SSHPath) WriteTo(out io.Writer) (int64, error) {
sftpClient, err := p.newClient()
if err != nil {
return 0, err
}
defer sftpClient.Close()
f, err := sftpClient.Open(p.path)
if err != nil {
return nil, fmt.Errorf("error opening file %s over sftp: %v", p, err)
return 0, fmt.Errorf("error opening file %s over sftp: %v", p, err)
}
defer f.Close()
var b bytes.Buffer
_, err = f.WriteTo(&b)
if err != nil {
return nil, fmt.Errorf("error reading file %s over sftp: %v", p, err)
}
return b.Bytes(), nil
return f.WriteTo(out)
}
func (p *SSHPath) ReadDir() ([]Path, error) {

View File

@ -20,6 +20,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"io"
"os"
"path"
"path/filepath"
@ -325,33 +326,49 @@ func (p *SwiftPath) createBucket() error {
}
}
// ReadFile implements Path::ReadFile.
// ReadFile implements Path::ReadFile
func (p *SwiftPath) ReadFile() ([]byte, error) {
var ret []byte
var b bytes.Buffer
done, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) {
glog.V(4).Infof("Reading file %q", p)
opt := swiftobject.DownloadOpts{}
result := swiftobject.Download(p.client, p.bucket, p.key, opt)
data, err := (&result).ExtractContent()
if err == nil {
ret = data
return true, nil
} else if isSwiftNotFound(err) {
return true, os.ErrNotExist
} else {
return false, fmt.Errorf("error reading %s: %v", p, err)
b.Reset()
_, err := p.WriteTo(&b)
if err != nil {
if os.IsNotExist(err) {
// Not recoverable
return true, err
}
return false, err
}
// Success!
return true, nil
})
if err != nil {
return nil, err
} else if done {
return ret, nil
return b.Bytes(), nil
} else {
// Shouldn't happen - we always return a non-nil error with false
return nil, wait.ErrWaitTimeout
}
}
// WriteTo implements io.WriterTo
func (p *SwiftPath) WriteTo(out io.Writer) (int64, error) {
glog.V(4).Infof("Reading file %q", p)
opt := swiftobject.DownloadOpts{}
result := swiftobject.Download(p.client, p.bucket, p.key, opt)
if result.Err != nil {
if isSwiftNotFound(result.Err) {
return 0, os.ErrNotExist
}
return 0, fmt.Errorf("error reading %s: %v", p, result.Err)
}
defer result.Body.Close()
return io.Copy(out, result.Body)
}
func (p *SwiftPath) readPath(opt swiftobject.ListOpts) ([]Path, error) {
var ret []Path
done, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) {

View File

@ -18,6 +18,7 @@ package vfs
import (
"fmt"
"io"
"strings"
"github.com/golang/glog"
@ -42,7 +43,13 @@ type ACLOracle func(Path) (ACL, error)
// Path is a path in the VFS space, which we can read, write, list etc
type Path interface {
io.WriterTo
Join(relativePath ...string) Path
// ReadFile returns the contents of the file, or an error if the file could not be read.
// If the file did not exist, err = os.ErrNotExist
// As this reads the entire file into memory, consider using WriteTo for bigger files
ReadFile() ([]byte, error)
WriteFile(data []byte, acl ACL) error