mirror of https://github.com/kubernetes/kops.git
tests: build and push locally for metal tests
Because we now push much larger files, replace our in-memory storage with disk-backed storage.
This commit is contained in:
parent
0671e777d8
commit
774d2fdfca
|
@ -0,0 +1,62 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
# Copyright 2024 The Kubernetes Authors.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
# This is a convenience script for developing kOps on Metal.
|
||||||
|
# It builds the code, including nodeup, and uploads to our fake S3 storage server.
|
||||||
|
# It also sets KOPS_BASE_URL to point to that storage server.
|
||||||
|
# To use, source the script. For example `. hack/dev-build-metal.sh` (note the initial `.`)
|
||||||
|
|
||||||
|
# Can't use set -e in a script we want to source
|
||||||
|
#set -e
|
||||||
|
|
||||||
|
#set -x
|
||||||
|
|
||||||
|
REPO_ROOT=$(git rev-parse --show-toplevel)
|
||||||
|
cd "${REPO_ROOT}" || return
|
||||||
|
|
||||||
|
# Dev environments typically do not need to test multiple architectures
|
||||||
|
KOPS_ARCH=amd64
|
||||||
|
export KOPS_ARCH
|
||||||
|
|
||||||
|
# Configure aws cli to talk to local storage
|
||||||
|
aws configure --profile metal set aws_access_key_id accesskey
|
||||||
|
aws configure --profile metal set aws_secret_access_key secret
|
||||||
|
aws configure --profile metal set aws_region us-east-1
|
||||||
|
aws configure --profile metal set endpoint_url http://10.123.45.1:8443
|
||||||
|
export AWS_ENDPOINT_URL=http://10.123.45.1:8443
|
||||||
|
export AWS_PROFILE=metal
|
||||||
|
export AWS_REGION=us-east-1
|
||||||
|
|
||||||
|
# Avoid chunking in S3 uploads (not supported by our mock yet)
|
||||||
|
aws configure --profile metal set s3.multipart_threshold 64GB
|
||||||
|
|
||||||
|
export UPLOAD_DEST=s3://kops-dev-build/
|
||||||
|
aws --version
|
||||||
|
aws s3 ls "${UPLOAD_DEST}" || aws s3 mb "${UPLOAD_DEST}" || return
|
||||||
|
make kops-install dev-version-dist-${KOPS_ARCH} || return
|
||||||
|
|
||||||
|
hack/upload .build/upload/ "${UPLOAD_DEST}" || return
|
||||||
|
|
||||||
|
# Set KOPS_BASE_URL
|
||||||
|
(tools/get_version.sh | grep VERSION | awk '{print $2}') || return
|
||||||
|
KOPS_VERSION=$(tools/get_version.sh | grep VERSION | awk '{print $2}')
|
||||||
|
export KOPS_BASE_URL=http://10.123.45.1:8443/kops-dev-build/kops/${KOPS_VERSION}/
|
||||||
|
echo "set KOPS_BASE_URL=${KOPS_BASE_URL}"
|
||||||
|
|
||||||
|
# Set feature flags needed on Metal
|
||||||
|
# export KOPS_FEATURE_FLAGS=
|
||||||
|
|
||||||
|
echo "SUCCESS"
|
|
@ -51,5 +51,6 @@ for vm in 0 1 2; do
|
||||||
vm_name="vm${vm}"
|
vm_name="vm${vm}"
|
||||||
mkdir -p ${ARTIFACTS}/vms/${vm_name}/logs/
|
mkdir -p ${ARTIFACTS}/vms/${vm_name}/logs/
|
||||||
scp -o StrictHostKeyChecking=accept-new -i ${REPO_ROOT}/.build/.ssh/id_ed25519 root@10.123.45.10:/var/log/etcd* ${ARTIFACTS}/vms/${vm_name}/logs/ || true
|
scp -o StrictHostKeyChecking=accept-new -i ${REPO_ROOT}/.build/.ssh/id_ed25519 root@10.123.45.10:/var/log/etcd* ${ARTIFACTS}/vms/${vm_name}/logs/ || true
|
||||||
ssh -o StrictHostKeyChecking=accept-new -i ${REPO_ROOT}/.build/.ssh/id_ed25519 root@10.123.45.10 journalctl --no-pager -u kubelet 2>&1 > ${ARTIFACTS}/vms/${vm_name}/logs/journal-kubelet.service || true
|
ssh -o StrictHostKeyChecking=accept-new -i ${REPO_ROOT}/.build/.ssh/id_ed25519 root@10.123.45.10 journalctl --no-pager -u kubelet 2>&1 > ${ARTIFACTS}/vms/${vm_name}/logs/kubelet.service || true
|
||||||
|
ssh -o StrictHostKeyChecking=accept-new -i ${REPO_ROOT}/.build/.ssh/id_ed25519 root@10.123.45.10 journalctl --no-pager -u kops-configuration 2>&1 > ${ARTIFACTS}/vms/${vm_name}/logs/kops-configuration.service || true
|
||||||
done
|
done
|
||||||
|
|
|
@ -22,7 +22,10 @@ set -o xtrace
|
||||||
REPO_ROOT=$(git rev-parse --show-toplevel)
|
REPO_ROOT=$(git rev-parse --show-toplevel)
|
||||||
cd ${REPO_ROOT}
|
cd ${REPO_ROOT}
|
||||||
|
|
||||||
BINDIR=${REPO_ROOT}/.build/bin
|
WORKDIR=${REPO_ROOT}/.build/
|
||||||
|
|
||||||
|
BINDIR=${WORKDIR}/bin
|
||||||
|
mkdir -p "${BINDIR}"
|
||||||
go build -o ${BINDIR}/kops ./cmd/kops
|
go build -o ${BINDIR}/kops ./cmd/kops
|
||||||
|
|
||||||
KOPS=${BINDIR}/kops
|
KOPS=${BINDIR}/kops
|
||||||
|
@ -37,10 +40,17 @@ function cleanup() {
|
||||||
|
|
||||||
trap cleanup EXIT
|
trap cleanup EXIT
|
||||||
|
|
||||||
|
# Create the directory that will back our mock s3 storage
|
||||||
|
rm -rf ${WORKDIR}/s3
|
||||||
|
mkdir -p ${WORKDIR}/s3/
|
||||||
|
|
||||||
|
# Start our VMs
|
||||||
${REPO_ROOT}/tests/e2e/scenarios/bare-metal/start-vms
|
${REPO_ROOT}/tests/e2e/scenarios/bare-metal/start-vms
|
||||||
|
|
||||||
echo "Waiting 30 seconds for VMs to start"
|
. hack/dev-build-metal.sh
|
||||||
sleep 30
|
|
||||||
|
echo "Waiting 10 seconds for VMs to start"
|
||||||
|
sleep 10
|
||||||
|
|
||||||
# Remove from known-hosts in case of reuse
|
# Remove from known-hosts in case of reuse
|
||||||
ssh-keygen -f ~/.ssh/known_hosts -R 10.123.45.10 || true
|
ssh-keygen -f ~/.ssh/known_hosts -R 10.123.45.10 || true
|
||||||
|
@ -69,7 +79,7 @@ export S3_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
|
||||||
# Create the state-store bucket in our mock s3 server
|
# Create the state-store bucket in our mock s3 server
|
||||||
export KOPS_STATE_STORE=s3://kops-state-store/
|
export KOPS_STATE_STORE=s3://kops-state-store/
|
||||||
aws --version
|
aws --version
|
||||||
aws --endpoint-url=${S3_ENDPOINT} s3 mb s3://kops-state-store
|
aws s3 ls s3://kops-state-store || aws s3 mb s3://kops-state-store
|
||||||
|
|
||||||
# List clusters (there should not be any yet)
|
# List clusters (there should not be any yet)
|
||||||
${KOPS} get cluster || true
|
${KOPS} get cluster || true
|
||||||
|
|
|
@ -23,6 +23,8 @@ REPO_ROOT=$(git rev-parse --show-toplevel)
|
||||||
cd ${REPO_ROOT}/tests/e2e/scenarios/bare-metal
|
cd ${REPO_ROOT}/tests/e2e/scenarios/bare-metal
|
||||||
|
|
||||||
WORKDIR=${REPO_ROOT}/.build
|
WORKDIR=${REPO_ROOT}/.build
|
||||||
|
BINDIR=${WORKDIR}/bin
|
||||||
|
mkdir -p $BINDIR
|
||||||
|
|
||||||
# Create SSH keys
|
# Create SSH keys
|
||||||
mkdir -p ${WORKDIR}/.ssh
|
mkdir -p ${WORKDIR}/.ssh
|
||||||
|
@ -32,12 +34,12 @@ fi
|
||||||
|
|
||||||
# Build software we need
|
# Build software we need
|
||||||
cd ${REPO_ROOT}/tools/metal/dhcp
|
cd ${REPO_ROOT}/tools/metal/dhcp
|
||||||
go build -o ${WORKDIR}/dhcp .
|
go build -o ${BINDIR}/dhcp .
|
||||||
cd ${REPO_ROOT}/tools/metal/storage
|
cd ${REPO_ROOT}/tools/metal/storage
|
||||||
go build -o ${WORKDIR}/storage .
|
go build -o ${BINDIR}/storage .
|
||||||
|
|
||||||
# Give permission to listen on ports < 1024 (sort of like a partial suid binary)
|
# Give permission to listen on ports < 1024 (sort of like a partial suid binary)
|
||||||
sudo setcap cap_net_bind_service=ep ${WORKDIR}/dhcp
|
sudo setcap cap_net_bind_service=ep ${BINDIR}/dhcp
|
||||||
|
|
||||||
# Install software we need
|
# Install software we need
|
||||||
if ! genisoimage --version; then
|
if ! genisoimage --version; then
|
||||||
|
@ -110,7 +112,7 @@ After=network.target
|
||||||
EnvironmentFile=/etc/environment
|
EnvironmentFile=/etc/environment
|
||||||
Type=exec
|
Type=exec
|
||||||
WorkingDirectory=${WORKDIR}/
|
WorkingDirectory=${WORKDIR}/
|
||||||
ExecStart=${WORKDIR}/dhcp
|
ExecStart=${BINDIR}/dhcp
|
||||||
Restart=always
|
Restart=always
|
||||||
|
|
||||||
[Install]
|
[Install]
|
||||||
|
@ -121,6 +123,7 @@ EOF
|
||||||
systemctl --user enable --now qemu-dhcp.service
|
systemctl --user enable --now qemu-dhcp.service
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
function start_storage() {
|
function start_storage() {
|
||||||
mkdir -p ~/.config/systemd/user
|
mkdir -p ~/.config/systemd/user
|
||||||
cat <<EOF > ~/.config/systemd/user/qemu-storage.service
|
cat <<EOF > ~/.config/systemd/user/qemu-storage.service
|
||||||
|
@ -132,7 +135,7 @@ After=network.target
|
||||||
EnvironmentFile=/etc/environment
|
EnvironmentFile=/etc/environment
|
||||||
Type=exec
|
Type=exec
|
||||||
WorkingDirectory=${WORKDIR}/
|
WorkingDirectory=${WORKDIR}/
|
||||||
ExecStart=${WORKDIR}/storage --http-listen=10.123.45.1:8443
|
ExecStart=${BINDIR}/storage --http-listen=10.123.45.1:8443 --storage-dir=${WORKDIR}/s3/
|
||||||
Restart=always
|
Restart=always
|
||||||
|
|
||||||
[Install]
|
[Install]
|
||||||
|
|
|
@ -27,7 +27,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/kubernetes/kops/tools/metal/dhcp/pkg/objectstore"
|
"github.com/kubernetes/kops/tools/metal/dhcp/pkg/objectstore"
|
||||||
"github.com/kubernetes/kops/tools/metal/dhcp/pkg/objectstore/testobjectstore"
|
"github.com/kubernetes/kops/tools/metal/dhcp/pkg/objectstore/fsobjectstore"
|
||||||
"github.com/kubernetes/kops/tools/metal/dhcp/pkg/s3model"
|
"github.com/kubernetes/kops/tools/metal/dhcp/pkg/s3model"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
@ -47,13 +47,22 @@ func run(ctx context.Context) error {
|
||||||
|
|
||||||
httpListen := ""
|
httpListen := ""
|
||||||
flag.StringVar(&httpListen, "http-listen", httpListen, "endpoint on which to serve HTTP requests")
|
flag.StringVar(&httpListen, "http-listen", httpListen, "endpoint on which to serve HTTP requests")
|
||||||
|
|
||||||
|
storageDir := ""
|
||||||
|
flag.StringVar(&storageDir, "storage-dir", storageDir, "directory in which to store data")
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
if httpListen == "" {
|
if httpListen == "" {
|
||||||
return fmt.Errorf("must specify http-listen flag")
|
return fmt.Errorf("must specify http-listen flag")
|
||||||
}
|
}
|
||||||
|
|
||||||
store := testobjectstore.New()
|
if storageDir == "" {
|
||||||
|
return fmt.Errorf("must specify storage-dir flag")
|
||||||
|
}
|
||||||
|
|
||||||
|
// store := testobjectstore.New()
|
||||||
|
store := fsobjectstore.New(storageDir)
|
||||||
|
|
||||||
s3Server := &S3Server{
|
s3Server := &S3Server{
|
||||||
store: store,
|
store: store,
|
||||||
|
@ -88,7 +97,12 @@ type S3Server struct {
|
||||||
func (s *S3Server) ListAllMyBuckets(ctx context.Context, req *s3Request, r *ListAllMyBucketsInput) error {
|
func (s *S3Server) ListAllMyBuckets(ctx context.Context, req *s3Request, r *ListAllMyBucketsInput) error {
|
||||||
output := &s3model.ListAllMyBucketsResult{}
|
output := &s3model.ListAllMyBucketsResult{}
|
||||||
|
|
||||||
for _, bucket := range s.store.ListBuckets(ctx) {
|
buckets, err := s.store.ListBuckets(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("listing buckets: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, bucket := range buckets {
|
||||||
output.Buckets = append(output.Buckets, s3model.Bucket{
|
output.Buckets = append(output.Buckets, s3model.Bucket{
|
||||||
CreationDate: bucket.CreationDate.Format(s3TimeFormat),
|
CreationDate: bucket.CreationDate.Format(s3TimeFormat),
|
||||||
Name: bucket.Name,
|
Name: bucket.Name,
|
||||||
|
@ -156,6 +170,12 @@ func (s *S3Server) ServeRequest(ctx context.Context, w http.ResponseWriter, r *h
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
Key: key,
|
Key: key,
|
||||||
})
|
})
|
||||||
|
case http.MethodHead:
|
||||||
|
// GetObject can handle req.Method == HEAD
|
||||||
|
return s.GetObject(ctx, req, &GetObjectInput{
|
||||||
|
Bucket: bucket,
|
||||||
|
Key: key,
|
||||||
|
})
|
||||||
case http.MethodPut:
|
case http.MethodPut:
|
||||||
return s.PutObject(ctx, req, &PutObjectInput{
|
return s.PutObject(ctx, req, &PutObjectInput{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
|
@ -180,6 +200,8 @@ type ListObjectsV2Input struct {
|
||||||
const s3TimeFormat = "2006-01-02T15:04:05.000Z"
|
const s3TimeFormat = "2006-01-02T15:04:05.000Z"
|
||||||
|
|
||||||
func (s *S3Server) ListObjectsV2(ctx context.Context, req *s3Request, input *ListObjectsV2Input) error {
|
func (s *S3Server) ListObjectsV2(ctx context.Context, req *s3Request, input *ListObjectsV2Input) error {
|
||||||
|
log := klog.FromContext(ctx)
|
||||||
|
|
||||||
bucket, _, err := s.store.GetBucket(ctx, input.Bucket)
|
bucket, _, err := s.store.GetBucket(ctx, input.Bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get bucket %q: %w", input.Bucket, err)
|
return fmt.Errorf("failed to get bucket %q: %w", input.Bucket, err)
|
||||||
|
@ -200,10 +222,21 @@ func (s *S3Server) ListObjectsV2(ctx context.Context, req *s3Request, input *Lis
|
||||||
Name: input.Bucket,
|
Name: input.Bucket,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
prefixes := make(map[string]bool)
|
||||||
for _, object := range objects {
|
for _, object := range objects {
|
||||||
|
log.V(4).Info("found candidate object", "obj", object)
|
||||||
if input.Prefix != "" && !strings.HasPrefix(object.Key, input.Prefix) {
|
if input.Prefix != "" && !strings.HasPrefix(object.Key, input.Prefix) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if input.Delimiter != "" {
|
||||||
|
afterPrefix := object.Key[len(input.Prefix):]
|
||||||
|
|
||||||
|
tokens := strings.SplitN(afterPrefix, input.Delimiter, 2)
|
||||||
|
if len(tokens) == 2 {
|
||||||
|
prefixes[input.Prefix+tokens[0]+input.Delimiter] = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
// TODO: support delimiter
|
// TODO: support delimiter
|
||||||
output.Contents = append(output.Contents, s3model.Object{
|
output.Contents = append(output.Contents, s3model.Object{
|
||||||
Key: object.Key,
|
Key: object.Key,
|
||||||
|
@ -211,7 +244,18 @@ func (s *S3Server) ListObjectsV2(ctx context.Context, req *s3Request, input *Lis
|
||||||
Size: object.Size,
|
Size: object.Size,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if input.Delimiter != "" {
|
||||||
|
for prefix := range prefixes {
|
||||||
|
output.CommonPrefixes = append(output.CommonPrefixes, s3model.CommonPrefix{
|
||||||
|
Prefix: prefix,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
output.Delimiter = input.Delimiter
|
||||||
|
}
|
||||||
|
output.Prefix = input.Prefix
|
||||||
output.KeyCount = len(output.Contents)
|
output.KeyCount = len(output.Contents)
|
||||||
|
output.IsTruncated = false
|
||||||
|
|
||||||
return req.writeXML(ctx, output)
|
return req.writeXML(ctx, output)
|
||||||
}
|
}
|
||||||
|
@ -270,7 +314,7 @@ func (s *S3Server) GetObject(ctx context.Context, req *s3Request, input *GetObje
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return object.WriteTo(req.w)
|
return object.WriteTo(req.r, req.w)
|
||||||
}
|
}
|
||||||
|
|
||||||
type GetObjectACLInput struct {
|
type GetObjectACLInput struct {
|
||||||
|
|
|
@ -0,0 +1,276 @@
|
||||||
|
/*
|
||||||
|
Copyright 2024 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package fsobjectstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/fs"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/kubernetes/kops/tools/metal/dhcp/pkg/objectstore"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FilesystemObjectStore struct {
|
||||||
|
mutex sync.Mutex
|
||||||
|
basedir string
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(basedir string) *FilesystemObjectStore {
|
||||||
|
return &FilesystemObjectStore{
|
||||||
|
basedir: basedir,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ objectstore.ObjectStore = &FilesystemObjectStore{}
|
||||||
|
|
||||||
|
func (m *FilesystemObjectStore) ListBuckets(ctx context.Context) ([]objectstore.BucketInfo, error) {
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
|
||||||
|
entries, err := os.ReadDir(m.basedir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("reading directory %q: %w", m.basedir, err)
|
||||||
|
}
|
||||||
|
buckets := make([]objectstore.BucketInfo, 0, len(entries))
|
||||||
|
for _, entry := range entries {
|
||||||
|
bucketInfo, err := m.buildBucketInfo(ctx, entry.Name())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
buckets = append(buckets, *bucketInfo)
|
||||||
|
}
|
||||||
|
return buckets, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *FilesystemObjectStore) buildBucketInfo(ctx context.Context, bucketName string) (*objectstore.BucketInfo, error) {
|
||||||
|
p := filepath.Join(m.basedir, bucketName)
|
||||||
|
stat, err := os.Stat(p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting info for directory %q: %w", p, err)
|
||||||
|
}
|
||||||
|
sysStat := stat.Sys().(*syscall.Stat_t)
|
||||||
|
|
||||||
|
bucketInfo := &objectstore.BucketInfo{
|
||||||
|
Name: stat.Name(),
|
||||||
|
CreationDate: time.Unix(sysStat.Ctim.Sec, sysStat.Ctim.Nsec),
|
||||||
|
Owner: getOwnerID(ctx),
|
||||||
|
}
|
||||||
|
return bucketInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *FilesystemObjectStore) GetBucket(ctx context.Context, bucketName string) (objectstore.Bucket, *objectstore.BucketInfo, error) {
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
|
||||||
|
bucketInfo, err := m.buildBucketInfo(ctx, bucketName)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, fs.ErrNotExist) {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
bucketDir := filepath.Join(m.basedir, bucketName)
|
||||||
|
return &FilesystemBucket{basedir: bucketDir, bucketName: bucketName}, bucketInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getOwnerID returns the owner ID for the given context.
|
||||||
|
// This is a fake implementation for testing purposes.
|
||||||
|
func getOwnerID(ctx context.Context) string {
|
||||||
|
return "fake-owner"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *FilesystemObjectStore) CreateBucket(ctx context.Context, bucketName string) (*objectstore.BucketInfo, error) {
|
||||||
|
log := klog.FromContext(ctx)
|
||||||
|
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
|
||||||
|
bucketInfo, err := m.buildBucketInfo(ctx, bucketName)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, fs.ErrNotExist) {
|
||||||
|
// OK
|
||||||
|
} else {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if bucketInfo != nil {
|
||||||
|
// return nil, status.Errorf(codes.AlreadyExists, "bucket %q already exists", bucketName)
|
||||||
|
err := status.Errorf(codes.AlreadyExists, "bucket %q already exists", bucketName)
|
||||||
|
code := status.Code(err)
|
||||||
|
log.Error(err, "failed to create bucket", "code", code)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
p := filepath.Join(m.basedir, bucketName)
|
||||||
|
if err := os.Mkdir(p, 0700); err != nil {
|
||||||
|
return nil, fmt.Errorf("creating directory for bucket %q: %w", p, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketInfo, err = m.buildBucketInfo(ctx, bucketName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return bucketInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type FilesystemBucket struct {
|
||||||
|
basedir string
|
||||||
|
bucketName string
|
||||||
|
mutex sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ objectstore.Bucket = &FilesystemBucket{}
|
||||||
|
|
||||||
|
func (m *FilesystemBucket) ListObjects(ctx context.Context) ([]objectstore.ObjectInfo, error) {
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
|
||||||
|
prefix := m.basedir
|
||||||
|
if !strings.HasSuffix(prefix, "/") {
|
||||||
|
prefix += "/"
|
||||||
|
}
|
||||||
|
var objects []objectstore.ObjectInfo
|
||||||
|
if err := filepath.WalkDir(m.basedir, func(path string, d fs.DirEntry, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if d.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(path, prefix) {
|
||||||
|
return fmt.Errorf("unexpected path walking %q, expected prefix %q: %q", m.basedir, prefix, path)
|
||||||
|
}
|
||||||
|
key := strings.TrimPrefix(path, prefix)
|
||||||
|
objectInfo, err := m.buildObjectInfo(ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
objects = append(objects, *objectInfo)
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return objects, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *FilesystemBucket) GetObject(ctx context.Context, key string) (objectstore.Object, error) {
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
|
||||||
|
p := m.pathForKey(key)
|
||||||
|
|
||||||
|
_, err := m.buildObjectInfo(ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, fs.ErrNotExist) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &FilesystemObject{bucket: m, path: p, key: key}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *FilesystemBucket) buildObjectInfo(ctx context.Context, key string) (*objectstore.ObjectInfo, error) {
|
||||||
|
p := filepath.Join(m.basedir, key)
|
||||||
|
stat, err := os.Stat(p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting info for file %q: %w", p, err)
|
||||||
|
}
|
||||||
|
objectInfo := &objectstore.ObjectInfo{
|
||||||
|
Key: key,
|
||||||
|
LastModified: stat.ModTime(),
|
||||||
|
Size: stat.Size(),
|
||||||
|
}
|
||||||
|
return objectInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *FilesystemBucket) pathForKey(key string) string {
|
||||||
|
p := filepath.Join(m.basedir, key)
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *FilesystemBucket) PutObject(ctx context.Context, key string, r io.Reader) (*objectstore.ObjectInfo, error) {
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
|
||||||
|
p := filepath.Join(m.basedir, key)
|
||||||
|
|
||||||
|
dir := filepath.Dir(p)
|
||||||
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||||
|
return nil, fmt.Errorf("making directories %q: %w", dir, err)
|
||||||
|
}
|
||||||
|
f, err := os.Create(p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating file %q: %w", p, err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
if _, err := io.Copy(f, r); err != nil {
|
||||||
|
return nil, fmt.Errorf("writing data: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
objectInfo, err := m.buildObjectInfo(ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return objectInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type FilesystemObject struct {
|
||||||
|
bucket *FilesystemBucket
|
||||||
|
key string
|
||||||
|
path string
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ objectstore.Object = &FilesystemObject{}
|
||||||
|
|
||||||
|
func (o *FilesystemObject) WriteTo(r *http.Request, w http.ResponseWriter) error {
|
||||||
|
f, err := os.Open(o.path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("opening file %q: %w", o.path, err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
stat, err := os.Stat(o.path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("getting stat for file %q: %w", o.path, err)
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Length", strconv.FormatInt(stat.Size(), 10))
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
if r.Method != http.MethodHead {
|
||||||
|
if _, err := io.Copy(w, f); err != nil {
|
||||||
|
return fmt.Errorf("reading file %q: %w", o.path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -24,7 +24,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type ObjectStore interface {
|
type ObjectStore interface {
|
||||||
ListBuckets(ctx context.Context) []BucketInfo
|
ListBuckets(ctx context.Context) ([]BucketInfo, error)
|
||||||
|
|
||||||
// GetBucket returns the bucket with the given name.
|
// GetBucket returns the bucket with the given name.
|
||||||
// If the bucket does not exist, it returns (nil, nil).
|
// If the bucket does not exist, it returns (nil, nil).
|
||||||
|
@ -62,5 +62,5 @@ type ObjectInfo struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Object interface {
|
type Object interface {
|
||||||
WriteTo(w http.ResponseWriter) error
|
WriteTo(req *http.Request, w http.ResponseWriter) error
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -43,7 +44,7 @@ func New() *TestObjectStore {
|
||||||
|
|
||||||
var _ objectstore.ObjectStore = &TestObjectStore{}
|
var _ objectstore.ObjectStore = &TestObjectStore{}
|
||||||
|
|
||||||
func (m *TestObjectStore) ListBuckets(ctx context.Context) []objectstore.BucketInfo {
|
func (m *TestObjectStore) ListBuckets(ctx context.Context) ([]objectstore.BucketInfo, error) {
|
||||||
m.mutex.Lock()
|
m.mutex.Lock()
|
||||||
defer m.mutex.Unlock()
|
defer m.mutex.Unlock()
|
||||||
|
|
||||||
|
@ -51,7 +52,7 @@ func (m *TestObjectStore) ListBuckets(ctx context.Context) []objectstore.BucketI
|
||||||
for _, bucket := range m.buckets {
|
for _, bucket := range m.buckets {
|
||||||
buckets = append(buckets, bucket.info)
|
buckets = append(buckets, bucket.info)
|
||||||
}
|
}
|
||||||
return buckets
|
return buckets, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *TestObjectStore) GetBucket(ctx context.Context, bucketName string) (objectstore.Bucket, *objectstore.BucketInfo, error) {
|
func (m *TestObjectStore) GetBucket(ctx context.Context, bucketName string) (objectstore.Bucket, *objectstore.BucketInfo, error) {
|
||||||
|
@ -157,8 +158,13 @@ type TestObject struct {
|
||||||
|
|
||||||
var _ objectstore.Object = &TestObject{}
|
var _ objectstore.Object = &TestObject{}
|
||||||
|
|
||||||
func (o *TestObject) WriteTo(w http.ResponseWriter) error {
|
func (o *TestObject) WriteTo(r *http.Request, w http.ResponseWriter) error {
|
||||||
_, err := w.Write(o.data)
|
w.Header().Set("Content-Length", strconv.Itoa(len(o.data)))
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
if r.Method != http.MethodHead {
|
||||||
|
_, err := w.Write(o.data)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,18 +34,22 @@ type Bucket struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ListBucketResult struct {
|
type ListBucketResult struct {
|
||||||
IsTruncated bool `xml:"IsTruncated"`
|
IsTruncated bool `xml:"IsTruncated"`
|
||||||
Contents []Object `xml:"Contents"`
|
Contents []Object `xml:"Contents"`
|
||||||
Name string `xml:"Name"`
|
Name string `xml:"Name"`
|
||||||
Prefix string `xml:"Prefix"`
|
Prefix string `xml:"Prefix"`
|
||||||
Delimiter string `xml:"Delimiter"`
|
Delimiter string `xml:"Delimiter"`
|
||||||
MaxKeys int `xml:"MaxKeys"`
|
MaxKeys int `xml:"MaxKeys"`
|
||||||
CommonPrefixes []string `xml:"CommonPrefixes>Prefix"`
|
CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"`
|
||||||
EncodingType string `xml:"EncodingType"`
|
EncodingType string `xml:"EncodingType"`
|
||||||
KeyCount int `xml:"KeyCount"`
|
KeyCount int `xml:"KeyCount"`
|
||||||
ContinuationToken string `xml:"ContinuationToken"`
|
ContinuationToken string `xml:"ContinuationToken"`
|
||||||
NextContinuationToken string `xml:"NextContinuationToken"`
|
NextContinuationToken string `xml:"NextContinuationToken"`
|
||||||
StartAfter string `xml:"StartAfter"`
|
StartAfter string `xml:"StartAfter"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CommonPrefix struct {
|
||||||
|
Prefix string `xml:"Prefix"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Object struct {
|
type Object struct {
|
||||||
|
|
Loading…
Reference in New Issue