Merge pull request #14925 from justinsb/add_context_to_readfile

Add Context arg to vfs ReadFile
This commit is contained in:
Kubernetes Prow Robot 2023-01-01 07:37:30 -08:00 committed by GitHub
commit b07168c985
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 97 additions and 70 deletions

View File

@ -44,7 +44,7 @@ func (s *Server) getNodeConfig(ctx context.Context, req *nodeup.BootstrapRequest
{
p := s.configBase.Join(registry.PathClusterCompleted)
b, err := p.ReadFile()
b, err := p.ReadFile(ctx)
if err != nil {
return nil, fmt.Errorf("error loading cluster config %q: %w", p, err)
}
@ -54,7 +54,7 @@ func (s *Server) getNodeConfig(ctx context.Context, req *nodeup.BootstrapRequest
{
p := s.configBase.Join("igconfig", "node", instanceGroupName, "nodeupconfig.yaml")
b, err := p.ReadFile()
b, err := p.ReadFile(ctx)
if err != nil {
return nil, fmt.Errorf("error loading NodeupConfig %q: %v", p, err)
}

View File

@ -318,7 +318,7 @@ func runCreateClusterIntegrationTest(t *testing.T, srcDir string, version string
}
// Compare additional objects
addons, err := clientset.AddonsFor(&clusters.Items[0]).List()
addons, err := clientset.AddonsFor(&clusters.Items[0]).List(ctx)
if err != nil {
t.Fatalf("error listing addons: %v", err)
}

View File

@ -103,7 +103,7 @@ func RunGetAll(ctx context.Context, f commandutils.Factory, out io.Writer, optio
var addonObjects []*unstructured.Unstructured
{
addons, err := client.AddonsFor(cluster).List()
addons, err := client.AddonsFor(cluster).List(ctx)
if err != nil {
return err
}

View File

@ -286,7 +286,7 @@ func fullClusterSpecs(ctx context.Context, clusters []*kopsapi.Cluster) ([]*kops
return nil, fmt.Errorf("error reading full cluster spec for %q: %v", cluster.ObjectMeta.Name, err)
}
configPath := configBase.Join(registry.PathClusterCompleted)
b, err := configPath.ReadFile()
b, err := configPath.ReadFile(ctx)
if err != nil {
return nil, fmt.Errorf("error loading Cluster %q: %v", configPath, err)
}

View File

@ -121,7 +121,7 @@ func serverVersion(f *util.Factory, options *VersionOptions) string {
return "could not talk to vfs"
}
kopsVersionUpdatedBytes, err := configBase.Join(registry.PathKopsVersionUpdated).ReadFile()
kopsVersionUpdatedBytes, err := configBase.Join(registry.PathKopsVersionUpdated).ReadFile(ctx)
if err != nil {
return "could get cluster version"
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package model
import (
"context"
"fmt"
"path/filepath"
"strings"
@ -53,6 +54,8 @@ var _ fi.NodeupModelBuilder = &KubeAPIServerBuilder{}
// Build is responsible for generating the configuration for the kube-apiserver.
func (b *KubeAPIServerBuilder) Build(c *fi.NodeupModelBuilderContext) error {
ctx := c.Context()
if !b.HasAPIServer {
return nil
}
@ -181,7 +184,7 @@ func (b *KubeAPIServerBuilder) Build(c *fi.NodeupModelBuilderContext) error {
}
{
pod, err := b.buildPod(&kubeAPIServer)
pod, err := b.buildPod(ctx, &kubeAPIServer)
if err != nil {
return fmt.Errorf("error building kube-apiserver manifest: %v", err)
}
@ -507,7 +510,7 @@ func (b *KubeAPIServerBuilder) allAuthTokens() (map[string]string, error) {
}
// buildPod is responsible for generating the kube-apiserver pod and thus manifest file
func (b *KubeAPIServerBuilder) buildPod(kubeAPIServer *kops.KubeAPIServerConfig) (*v1.Pod, error) {
func (b *KubeAPIServerBuilder) buildPod(ctx context.Context, kubeAPIServer *kops.KubeAPIServerConfig) (*v1.Pod, error) {
// we need to replace 127.0.0.1 for etcd urls with the dns names in case this apiserver is not
// running on master nodes
if !b.IsMaster {
@ -715,7 +718,7 @@ func (b *KubeAPIServerBuilder) buildPod(kubeAPIServer *kops.KubeAPIServerConfig)
kubemanifest.MarkPodAsClusterCritical(pod)
if useHealthcheckProxy {
if err := b.addHealthcheckSidecar(pod); err != nil {
if err := b.addHealthcheckSidecar(ctx, pod); err != nil {
return nil, err
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package model
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
@ -31,7 +32,7 @@ func (b *KubeAPIServerBuilder) findHealthcheckManifest() *nodeup.StaticManifest
return b.findStaticManifest("kube-apiserver-healthcheck")
}
func (b *KubeAPIServerBuilder) addHealthcheckSidecar(pod *corev1.Pod) error {
func (b *KubeAPIServerBuilder) addHealthcheckSidecar(ctx context.Context, pod *corev1.Pod) error {
manifest := b.findHealthcheckManifest()
if manifest == nil {
return nil
@ -39,7 +40,7 @@ func (b *KubeAPIServerBuilder) addHealthcheckSidecar(pod *corev1.Pod) error {
p := b.ConfigBase.Join(manifest.Path)
data, err := p.ReadFile()
data, err := p.ReadFile(ctx)
if err != nil {
return fmt.Errorf("error reading kube-apiserver-healthcheck manifest %s: %v", manifest.Path, err)
}

View File

@ -35,6 +35,8 @@ var _ fi.NodeupModelBuilder = &ManifestsBuilder{}
// Build creates tasks for copying the manifests
func (b *ManifestsBuilder) Build(c *fi.NodeupModelBuilderContext) error {
ctx := c.Context()
// Write etcd manifests (currently etcd <=> master)
if b.IsMaster {
for _, manifest := range b.NodeupConfig.EtcdManifests {
@ -42,7 +44,7 @@ func (b *ManifestsBuilder) Build(c *fi.NodeupModelBuilderContext) error {
if err != nil {
return fmt.Errorf("error parsing path for etcd manifest %s: %v", manifest, err)
}
data, err := p.ReadFile()
data, err := p.ReadFile(ctx)
if err != nil {
return fmt.Errorf("error reading etcd manifest %s: %v", manifest, err)
}

View File

@ -69,5 +69,5 @@ type AddonsClient interface {
Replace(objects kubemanifest.ObjectList) error
// List returns all the addon objects
List() (kubemanifest.ObjectList, error)
List(ctx context.Context) (kubemanifest.ObjectList, error)
}

View File

@ -96,10 +96,10 @@ func (c *vfsAddonsClient) Replace(addons kubemanifest.ObjectList) error {
return nil
}
func (c *vfsAddonsClient) List() (kubemanifest.ObjectList, error) {
func (c *vfsAddonsClient) List(ctx context.Context) (kubemanifest.ObjectList, error) {
configPath := c.basePath.Join("default")
b, err := configPath.ReadFile()
b, err := configPath.ReadFile(ctx)
if err != nil {
if os.IsNotExist(err) {
return nil, nil

View File

@ -48,10 +48,12 @@ func newClusterVFS(basePath vfs.Path) *ClusterVFS {
}
func (c *ClusterVFS) Get(name string, options metav1.GetOptions) (*api.Cluster, error) {
ctx := context.TODO()
if options.ResourceVersion != "" {
return nil, fmt.Errorf("ResourceVersion not supported in ClusterVFS::Get")
}
o, err := c.find(name)
o, err := c.find(ctx, name)
if err != nil {
return nil, err
}
@ -71,6 +73,8 @@ func (c *ClusterVFS) configBase(clusterName string) (vfs.Path, error) {
}
func (c *ClusterVFS) List(options metav1.ListOptions) (*api.ClusterList, error) {
ctx := context.TODO()
names, err := c.listNames()
if err != nil {
return nil, err
@ -79,7 +83,7 @@ func (c *ClusterVFS) List(options metav1.ListOptions) (*api.ClusterList, error)
var items []api.Cluster
for _, clusterName := range names {
cluster, err := c.find(clusterName)
cluster, err := c.find(ctx, clusterName)
if err != nil {
klog.Warningf("cluster %q found in state store listing, but cannot be loaded: %v", clusterName, err)
continue
@ -180,13 +184,13 @@ func (r *ClusterVFS) listNames() ([]string, error) {
return keys, nil
}
func (r *ClusterVFS) find(clusterName string) (*api.Cluster, error) {
func (r *ClusterVFS) find(ctx context.Context, clusterName string) (*api.Cluster, error) {
if clusterName == "" {
return nil, fmt.Errorf("clusterName is required")
}
configPath := r.basePath.Join(clusterName, registry.PathCluster)
o, err := r.readConfig(configPath)
o, err := r.readConfig(ctx, configPath)
if err != nil {
if os.IsNotExist(err) {
return nil, nil

View File

@ -60,7 +60,7 @@ func (c *commonVFS) init(kind string, basePath vfs.Path, storeVersion runtime.Gr
}
func (c *commonVFS) find(ctx context.Context, name string) (runtime.Object, error) {
o, err := c.readConfig(c.basePath.Join(name))
o, err := c.readConfig(ctx, c.basePath.Join(name))
if err != nil {
if os.IsNotExist(err) {
return nil, nil
@ -113,8 +113,8 @@ func (c *commonVFS) serialize(o runtime.Object) ([]byte, error) {
return b.Bytes(), nil
}
func (c *commonVFS) readConfig(configPath vfs.Path) (runtime.Object, error) {
data, err := configPath.ReadFile()
func (c *commonVFS) readConfig(ctx context.Context, configPath vfs.Path) (runtime.Object, error) {
data, err := configPath.ReadFile(ctx)
if err != nil {
if os.IsNotExist(err) {
return nil, err
@ -141,7 +141,7 @@ func (c *commonVFS) writeConfig(ctx context.Context, cluster *kops.Cluster, conf
case vfs.WriteOptionCreate:
create = true
case vfs.WriteOptionOnlyIfExists:
_, err = configPath.ReadFile()
_, err = configPath.ReadFile(ctx)
if err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("cannot update configuration file %s: does not exist", configPath)

View File

@ -18,6 +18,7 @@ package templates
import (
"bytes"
"context"
"fmt"
"io"
"os"
@ -36,13 +37,13 @@ type Templates struct {
TemplateFunctions template.FuncMap
}
func LoadTemplates(cluster *kops.Cluster, base vfs.Path) (*Templates, error) {
func LoadTemplates(ctx context.Context, cluster *kops.Cluster, base vfs.Path) (*Templates, error) {
t := &Templates{
cluster: cluster,
resources: make(map[string]fi.Resource),
TemplateFunctions: make(template.FuncMap),
}
err := t.loadFrom(base)
err := t.loadFrom(ctx, base)
if err != nil {
return nil, err
}
@ -53,14 +54,14 @@ func (t *Templates) Find(key string) fi.Resource {
return t.resources[key]
}
func (t *Templates) loadFrom(base vfs.Path) error {
func (t *Templates) loadFrom(ctx context.Context, base vfs.Path) error {
files, err := base.ReadTree()
if err != nil {
return fmt.Errorf("error reading from %s", base)
}
for _, f := range files {
contents, err := f.ReadFile()
contents, err := f.ReadFile(ctx)
if err != nil {
if os.IsNotExist(err) {
// This is just an annoyance of gobindata - we can't tell the difference between files & directories. Ignore.

View File

@ -63,7 +63,9 @@ func (p *AssetPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl vfs.
// WriteTo implements io.WriterTo
func (p *AssetPath) WriteTo(out io.Writer) (int64, error) {
data, err := p.ReadFile()
ctx := context.TODO()
data, err := p.ReadFile(ctx)
if err != nil {
return 0, err
}
@ -72,7 +74,7 @@ func (p *AssetPath) WriteTo(out io.Writer) (int64, error) {
}
// ReadFile implements Path::ReadFile
func (p *AssetPath) ReadFile() ([]byte, error) {
func (p *AssetPath) ReadFile(ctx context.Context) ([]byte, error) {
data, err := content.ReadFile(p.location)
if _, ok := err.(*fs.PathError); ok {
return nil, os.ErrNotExist

View File

@ -181,7 +181,7 @@ func (c *ApplyClusterCmd) Run(ctx context.Context) error {
}
if c.AdditionalObjects == nil {
additionalObjects, err := c.Clientset.AddonsFor(c.Cluster).List()
additionalObjects, err := c.Clientset.AddonsFor(c.Cluster).List(ctx)
if err != nil {
return err
}
@ -269,7 +269,7 @@ func (c *ApplyClusterCmd) Run(ctx context.Context) error {
}
if !c.AllowKopsDowngrade {
kopsVersionUpdatedBytes, err := configBase.Join(registry.PathKopsVersionUpdated).ReadFile()
kopsVersionUpdatedBytes, err := configBase.Join(registry.PathKopsVersionUpdated).ReadFile(ctx)
if err == nil {
kopsVersionUpdated := strings.TrimSpace(string(kopsVersionUpdatedBytes))
version, err := semver.Parse(kopsVersionUpdated)
@ -325,7 +325,7 @@ func (c *ApplyClusterCmd) Run(ctx context.Context) error {
}
addonsClient := c.Clientset.AddonsFor(cluster)
addons, err := addonsClient.List()
addons, err := addonsClient.List(ctx)
if err != nil {
return fmt.Errorf("error fetching addons: %v", err)
}
@ -522,7 +522,7 @@ func (c *ApplyClusterCmd) Run(ctx context.Context) error {
}
{
templates, err := templates.LoadTemplates(cluster, models.NewAssetPath("cloudup/resources"))
templates, err := templates.LoadTemplates(ctx, cluster, models.NewAssetPath("cloudup/resources"))
if err != nil {
return fmt.Errorf("error loading templates: %v", err)
}

View File

@ -112,7 +112,7 @@ func runChannelBuilderTest(t *testing.T, key string, addonManifests []string) {
}
cluster = fullSpec
templates, err := templates.LoadTemplates(cluster, models.NewAssetPath("cloudup/resources"))
templates, err := templates.LoadTemplates(ctx, cluster, models.NewAssetPath("cloudup/resources"))
if err != nil {
t.Fatalf("error building templates for %q: %v", key, err)
}

View File

@ -49,6 +49,8 @@ type ManagedFile struct {
}
func (e *ManagedFile) Find(c *fi.CloudupContext) (*ManagedFile, error) {
ctx := c.Context()
managedFiles, err := getBasePath(c, e)
if err != nil {
return nil, err
@ -61,7 +63,7 @@ func (e *ManagedFile) Find(c *fi.CloudupContext) (*ManagedFile, error) {
filePath := managedFiles.Join(location)
existingData, err := filePath.ReadFile()
existingData, err := filePath.ReadFile(ctx)
if err != nil {
if os.IsNotExist(err) {
return nil, nil

View File

@ -137,7 +137,7 @@ func (c *NodeUpCommand) Run(out io.Writer) error {
p := configBase.Join(registry.PathClusterCompleted)
var err error
b, err = p.ReadFile()
b, err = p.ReadFile(ctx)
if err != nil {
return fmt.Errorf("error loading Cluster %q: %v", p, err)
}
@ -165,7 +165,7 @@ func (c *NodeUpCommand) Run(out io.Writer) error {
} else if bootConfig.InstanceGroupName != "" {
nodeupConfigLocation := configBase.Join("igconfig", bootConfig.InstanceGroupRole.ToLowerString(), bootConfig.InstanceGroupName, "nodeupconfig.yaml")
b, err := nodeupConfigLocation.ReadFile()
b, err := nodeupConfigLocation.ReadFile(ctx)
if err != nil {
return fmt.Errorf("error loading NodeupConfig %q: %v", nodeupConfigLocation, err)
}

View File

@ -18,6 +18,7 @@ package fi
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
@ -188,7 +189,9 @@ func NewVFSResource(path vfs.Path) *VFSResource {
}
func (r *VFSResource) Open() (io.Reader, error) {
data, err := r.Path.ReadFile()
ctx := context.TODO()
data, err := r.Path.ReadFile(ctx)
if err != nil {
if os.IsNotExist(err) {
return nil, err

View File

@ -98,8 +98,10 @@ func (c *VFSSecretStore) buildSecretPath(name string) vfs.Path {
}
func (c *VFSSecretStore) FindSecret(id string) (*fi.Secret, error) {
ctx := context.TODO()
p := c.buildSecretPath(id)
s, err := c.loadSecret(p)
s, err := c.loadSecret(ctx, p)
if err != nil {
return nil, err
}
@ -173,7 +175,7 @@ func (c *VFSSecretStore) GetOrCreateSecret(ctx context.Context, id string, secre
}
// Make double-sure it round-trips
s, err := c.loadSecret(p)
s, err := c.loadSecret(ctx, p)
if err != nil {
klog.Fatalf("unable to load secret immediately after creation %v: %v", p, err)
return nil, false, err
@ -197,15 +199,15 @@ func (c *VFSSecretStore) ReplaceSecret(id string, secret *fi.Secret) (*fi.Secret
}
// Confirm the secret exists
s, err := c.loadSecret(p)
s, err := c.loadSecret(ctx, p)
if err != nil {
return nil, fmt.Errorf("unable to load secret immediately after creation %v: %v", p, err)
}
return s, nil
}
func (c *VFSSecretStore) loadSecret(p vfs.Path) (*fi.Secret, error) {
data, err := p.ReadFile()
func (c *VFSSecretStore) loadSecret(ctx context.Context, p vfs.Path) (*fi.Secret, error) {
data, err := p.ReadFile(ctx)
if err != nil {
if os.IsNotExist(err) {
return nil, nil

View File

@ -101,9 +101,9 @@ func (c *VFSCAStore) parseKeysetYaml(data []byte) (*kops.Keyset, bool, error) {
// loadKeyset loads a Keyset from the path.
// Returns (nil, nil) if the file is not found
// Bundles avoid the need for a list-files permission, which can be tricky on e.g. GCE
func (c *VFSCAStore) loadKeyset(p vfs.Path) (*Keyset, error) {
func (c *VFSCAStore) loadKeyset(ctx context.Context, p vfs.Path) (*Keyset, error) {
bundlePath := p.Join("keyset.yaml")
data, err := bundlePath.ReadFile()
data, err := bundlePath.ReadFile(ctx)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
@ -225,10 +225,10 @@ var legacyKeysetMappings = map[string]string{
// FindKeyset implements KeystoreReader.
func (c *VFSCAStore) FindKeyset(ctx context.Context, id string) (*Keyset, error) {
keys, err := c.findPrivateKeyset(id)
keys, err := c.findPrivateKeyset(ctx, id)
if keys == nil || os.IsNotExist(err) {
if legacyId := legacyKeysetMappings[id]; legacyId != "" {
keys, err = c.findPrivateKeyset(legacyId)
keys, err = c.findPrivateKeyset(ctx, legacyId)
if keys != nil {
keys.LegacyFormat = true
}
@ -240,6 +240,8 @@ func (c *VFSCAStore) FindKeyset(ctx context.Context, id string) (*Keyset, error)
// ListKeysets implements CAStore::ListKeysets
func (c *VFSCAStore) ListKeysets() (map[string]*Keyset, error) {
ctx := context.TODO()
baseDir := c.basedir.Join("private")
files, err := baseDir.ReadTree()
if err != nil {
@ -261,7 +263,7 @@ func (c *VFSCAStore) ListKeysets() (map[string]*Keyset, error) {
}
name := tokens[0]
loadedKeyset, err := c.loadKeyset(baseDir.Join(name))
loadedKeyset, err := c.loadKeyset(ctx, baseDir.Join(name))
if err != nil {
klog.Warningf("ignoring keyset %q: %w", name, err)
continue
@ -364,7 +366,7 @@ func (c *VFSCAStore) StoreKeyset(ctx context.Context, name string, keyset *Keyse
return nil
}
func (c *VFSCAStore) findPrivateKeyset(id string) (*Keyset, error) {
func (c *VFSCAStore) findPrivateKeyset(ctx context.Context, id string) (*Keyset, error) {
var keys *Keyset
var err error
if id == CertificateIDCA {
@ -376,7 +378,7 @@ func (c *VFSCAStore) findPrivateKeyset(id string) (*Keyset, error) {
return cached, nil
}
keys, err = c.loadKeyset(c.buildPrivateKeyPoolPath(id))
keys, err = c.loadKeyset(ctx, c.buildPrivateKeyPoolPath(id))
if err != nil {
return nil, err
}
@ -389,7 +391,7 @@ func (c *VFSCAStore) findPrivateKeyset(id string) (*Keyset, error) {
}
} else {
p := c.buildPrivateKeyPoolPath(id)
keys, err = c.loadKeyset(p)
keys, err = c.loadKeyset(ctx, p)
if err != nil {
return nil, err
}
@ -422,6 +424,8 @@ func (c *VFSCAStore) buildSSHPublicKeyPath(id string) vfs.Path {
}
func (c *VFSCAStore) FindSSHPublicKeys() ([]*kops.SSHCredential, error) {
ctx := context.TODO()
p := c.basedir.Join("ssh", "public", "admin")
files, err := p.ReadDir()
@ -435,7 +439,7 @@ func (c *VFSCAStore) FindSSHPublicKeys() ([]*kops.SSHCredential, error) {
var items []*kops.SSHCredential
for _, f := range files {
data, err := f.ReadFile()
data, err := f.ReadFile(ctx)
if err != nil {
if os.IsNotExist(err) {
klog.V(2).Infof("Ignoring not-found issue reading %q", f)

View File

@ -113,7 +113,7 @@ func TestVFSCAStoreRoundTrip(t *testing.T) {
// Check private/kubernetes-ca/keyset.yaml round-tripped
{
privateKeysetYaml, err := pathMap["memfs://tests/private/kubernetes-ca/keyset.yaml"].ReadFile()
privateKeysetYaml, err := pathMap["memfs://tests/private/kubernetes-ca/keyset.yaml"].ReadFile(ctx)
if err != nil {
t.Fatalf("error reading file memfs://tests/private/kubernetes-ca/keyset.yaml: %v", err)
}

View File

@ -99,7 +99,7 @@ func (p *AzureBlobPath) Join(relativePath ...string) Path {
}
// ReadFile returns the content of the blob.
func (p *AzureBlobPath) ReadFile() ([]byte, error) {
func (p *AzureBlobPath) ReadFile(ctx context.Context) ([]byte, error) {
var b bytes.Buffer
_, err := p.WriteTo(&b)
if err != nil {
@ -151,7 +151,7 @@ func (p *AzureBlobPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl
defer createFileLockAzureBlob.Unlock()
// Check if the blob exists.
_, err := p.ReadFile()
_, err := p.ReadFile(ctx)
if err == nil {
return os.ErrExist
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package vfs
import (
"context"
"sync"
"time"
)
@ -44,6 +45,8 @@ type cacheEntry struct {
}
func (c *Cache) Read(p Path, ttl time.Duration) ([]byte, error) {
ctx := context.TODO()
key := p.Path()
c.mutex.Lock()
@ -62,7 +65,7 @@ func (c *Cache) Read(p Path, ttl time.Duration) ([]byte, error) {
return entry.Contents, nil
}
b, err := p.ReadFile()
b, err := p.ReadFile(ctx)
if err != nil {
return nil, err
}

View File

@ -173,7 +173,7 @@ func (c *VFSContext) ReadFile(location string, options ...VFSOption) ([]byte, er
if err != nil {
return nil, err
}
return p.ReadFile()
return p.ReadFile(ctx)
}
func (c *VFSContext) BuildVfsPath(p string) (Path, error) {

View File

@ -115,7 +115,7 @@ func (p *FSPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL) er
}
// ReadFile implements Path::ReadFile
func (p *FSPath) ReadFile() ([]byte, error) {
func (p *FSPath) ReadFile(ctx context.Context) ([]byte, error) {
file, err := os.ReadFile(p.location)
if errors.Is(err, syscall.ENOENT) {
err = os.ErrNotExist

View File

@ -53,7 +53,7 @@ func TestCreateFile(t *testing.T) {
}
// Check file content
data, err := fspath.ReadFile()
data, err := fspath.ReadFile(ctx)
if err != nil {
t.Errorf("Error reading file %s, error: %v", test.path, err)
}

View File

@ -227,7 +227,7 @@ func (p *GSPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL) er
defer createFileLockGCS.Unlock()
// Check if exists
_, err := p.ReadFile()
_, err := p.ReadFile(ctx)
if err == nil {
return os.ErrExist
}
@ -240,7 +240,7 @@ func (p *GSPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL) er
}
// ReadFile implements Path::ReadFile
func (p *GSPath) ReadFile() ([]byte, error) {
func (p *GSPath) ReadFile(ctx context.Context) ([]byte, error) {
var b bytes.Buffer
done, err := RetryWithBackoff(gcsReadBackoff, func() (bool, error) {
b.Reset()

View File

@ -95,7 +95,7 @@ func (p *KubernetesPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl
}
// ReadFile implements Path::ReadFile
func (p *KubernetesPath) ReadFile() ([]byte, error) {
func (p *KubernetesPath) ReadFile(ctx context.Context) ([]byte, error) {
var b bytes.Buffer
_, err := p.WriteTo(&b)
if err != nil {

View File

@ -125,7 +125,7 @@ func (p *MemFSPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL)
}
// ReadFile implements Path::ReadFile
func (p *MemFSPath) ReadFile() ([]byte, error) {
func (p *MemFSPath) ReadFile(ctx context.Context) ([]byte, error) {
if p.contents == nil {
return nil, os.ErrNotExist
}

View File

@ -54,7 +54,7 @@ func TestMemFsCreateFile(t *testing.T) {
}
// Check file content
data, err := memfspath.ReadFile()
data, err := memfspath.ReadFile(ctx)
if err != nil {
t.Errorf("Failed reading path %s, error: %v", test.path, err)
continue

View File

@ -304,7 +304,7 @@ func (p *S3Path) CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL) er
defer createFileLockS3.Unlock()
// Check if exists
_, err := p.ReadFile()
_, err := p.ReadFile(ctx)
if err == nil {
return os.ErrExist
}
@ -317,7 +317,7 @@ func (p *S3Path) CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL) er
}
// ReadFile implements Path::ReadFile
func (p *S3Path) ReadFile() ([]byte, error) {
func (p *S3Path) ReadFile(ctx context.Context) ([]byte, error) {
var b bytes.Buffer
_, err := p.WriteTo(&b)
if err != nil {

View File

@ -247,7 +247,7 @@ func (p *SSHPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL) e
defer createFileLockSSH.Unlock()
// Check if exists
_, err := p.ReadFile()
_, err := p.ReadFile(ctx)
if err == nil {
return os.ErrExist
}
@ -260,7 +260,7 @@ func (p *SSHPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL) e
}
// ReadFile implements Path::ReadFile
func (p *SSHPath) ReadFile() ([]byte, error) {
func (p *SSHPath) ReadFile(ctx context.Context) ([]byte, error) {
var b bytes.Buffer
_, err := p.WriteTo(&b)
if err != nil {

View File

@ -423,7 +423,7 @@ func (p *SwiftPath) createBucket() error {
}
// ReadFile implements Path::ReadFile
func (p *SwiftPath) ReadFile() ([]byte, error) {
func (p *SwiftPath) ReadFile(ctx context.Context) ([]byte, error) {
var b bytes.Buffer
done, err := RetryWithBackoff(swiftReadBackoff, func() (bool, error) {
b.Reset()

View File

@ -49,7 +49,7 @@ type Path interface {
// ReadFile returns the contents of the file, or an error if the file could not be read.
// If the file did not exist, err = os.ErrNotExist
// As this reads the entire file into memory, consider using WriteTo for bigger files
ReadFile() ([]byte, error)
ReadFile(ctx context.Context) ([]byte, error)
WriteFile(ctx context.Context, data io.ReadSeeker, acl ACL) error
// CreateFile writes the file contents, but only if the file does not already exist
CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL) error