mirror of https://github.com/knative/func.git
289 lines
7.7 KiB
Go
289 lines
7.7 KiB
Go
package k8s
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/apimachinery/pkg/util/rand"
|
|
"k8s.io/client-go/kubernetes"
|
|
restclient "k8s.io/client-go/rest"
|
|
k8sclientcmd "k8s.io/client-go/tools/clientcmd"
|
|
)
|
|
|
|
func GetPersistentVolumeClaim(ctx context.Context, name, namespaceOverride string) (*corev1.PersistentVolumeClaim, error) {
|
|
client, namespace, err := NewClientAndResolvedNamespace(namespaceOverride)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return client.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, name, metav1.GetOptions{})
|
|
}
|
|
|
|
func CreatePersistentVolumeClaim(ctx context.Context, name, namespaceOverride string, labels map[string]string, annotations map[string]string, accessMode corev1.PersistentVolumeAccessMode, resourceRequest resource.Quantity, storageClassName string) (err error) {
|
|
client, namespace, err := NewClientAndResolvedNamespace(namespaceOverride)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
pvc := &corev1.PersistentVolumeClaim{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: name,
|
|
Namespace: namespace,
|
|
Labels: labels,
|
|
Annotations: annotations,
|
|
},
|
|
Spec: corev1.PersistentVolumeClaimSpec{
|
|
AccessModes: []corev1.PersistentVolumeAccessMode{accessMode},
|
|
Resources: corev1.VolumeResourceRequirements{
|
|
Requests: corev1.ResourceList{},
|
|
},
|
|
},
|
|
}
|
|
|
|
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = resourceRequest
|
|
|
|
if storageClassName != "" {
|
|
pvc.Spec.StorageClassName = &storageClassName
|
|
}
|
|
|
|
_, err = client.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{})
|
|
return
|
|
}
|
|
|
|
func DeletePersistentVolumeClaims(ctx context.Context, namespaceOverride string, listOptions metav1.ListOptions) (err error) {
|
|
client, namespace, err := NewClientAndResolvedNamespace(namespaceOverride)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return client.CoreV1().PersistentVolumeClaims(namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, listOptions)
|
|
}
|
|
|
|
var TarImage = "ghcr.io/knative/func-utils:v2"
|
|
|
|
// UploadToVolume uploads files (passed in form of tar stream) into volume.
|
|
func UploadToVolume(ctx context.Context, content io.Reader, claimName, namespace string) error {
|
|
return runWithVolumeMounted(ctx, TarImage, []string{"sh", "-c", "umask 0000 && exec tar -xmf -"}, content, claimName, namespace)
|
|
}
|
|
|
|
// Runs a pod with given image, command and stdin
|
|
// while having the volume mounted and working directory set to it.
|
|
func runWithVolumeMounted(ctx context.Context, podImage string, podCommand []string, podInput io.Reader, claimName, namespace string) error {
|
|
var err error
|
|
|
|
cliConf := GetClientConfig()
|
|
restConf, err := cliConf.ClientConfig()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot get client config: %w", err)
|
|
}
|
|
restConf.WarningHandler = restclient.NoWarnings{}
|
|
|
|
err = setConfigDefaults(restConf)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot set config defaults: %w", err)
|
|
}
|
|
|
|
client, err := kubernetes.NewForConfig(restConf)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create k8s client: %w", err)
|
|
}
|
|
|
|
if namespace == "" {
|
|
namespace, err = GetDefaultNamespace()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot get namespace: %w", err)
|
|
}
|
|
}
|
|
|
|
podName := "volume-uploader-" + rand.String(5)
|
|
|
|
pods := client.CoreV1().Pods(namespace)
|
|
|
|
defer func() {
|
|
_ = pods.Delete(ctx, podName, metav1.DeleteOptions{})
|
|
}()
|
|
|
|
const volumeMntPoint = "/tmp/volume_mnt"
|
|
const pVol = "p-vol"
|
|
pod := &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: podName,
|
|
Labels: nil,
|
|
Annotations: nil,
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
SecurityContext: defaultPodSecurityContext(),
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: podName,
|
|
Image: podImage,
|
|
Stdin: true,
|
|
StdinOnce: true,
|
|
WorkingDir: volumeMntPoint,
|
|
Command: podCommand,
|
|
VolumeMounts: []corev1.VolumeMount{
|
|
{
|
|
Name: pVol,
|
|
MountPath: volumeMntPoint,
|
|
},
|
|
},
|
|
SecurityContext: defaultSecurityContext(client),
|
|
},
|
|
},
|
|
Volumes: []corev1.Volume{{
|
|
Name: pVol,
|
|
VolumeSource: corev1.VolumeSource{
|
|
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
|
|
ClaimName: claimName,
|
|
},
|
|
},
|
|
}},
|
|
DNSPolicy: corev1.DNSClusterFirst,
|
|
RestartPolicy: corev1.RestartPolicyNever,
|
|
},
|
|
}
|
|
|
|
localCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
ready := podReady(localCtx, client.CoreV1(), podName, namespace)
|
|
|
|
_, err = pods.Create(ctx, pod, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create pod: %w", err)
|
|
}
|
|
|
|
select {
|
|
case err = <-ready:
|
|
case <-ctx.Done():
|
|
err = ctx.Err()
|
|
case <-time.After(time.Minute * 5):
|
|
err = errors.New("timeout waiting for pod to start")
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("cannot start the pod: %w", err)
|
|
}
|
|
|
|
nameSelector := fields.OneTermEqualSelector("metadata.name", podName).String()
|
|
listOpts := metav1.ListOptions{
|
|
FieldSelector: nameSelector,
|
|
Watch: true,
|
|
}
|
|
watcher, err := pods.Watch(localCtx, listOpts)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot set up the watcher: %w", err)
|
|
}
|
|
defer watcher.Stop()
|
|
termCh := make(chan corev1.ContainerStateTerminated, 1)
|
|
go func() {
|
|
for event := range watcher.ResultChan() {
|
|
p, ok := event.Object.(*corev1.Pod)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if len(p.Status.ContainerStatuses) > 0 {
|
|
termState := event.Object.(*corev1.Pod).Status.ContainerStatuses[0].State.Terminated
|
|
if termState != nil {
|
|
termCh <- *termState
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
var outBuff tsBuff
|
|
err = attach(ctx, client.CoreV1().RESTClient(), restConf, podName, namespace, podInput, &outBuff, &outBuff)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot attach stdio to the pod: %w", err)
|
|
}
|
|
|
|
termState := <-termCh
|
|
if termState.ExitCode != 0 {
|
|
cmdOut := strings.Trim(outBuff.String(), "\n")
|
|
err = fmt.Errorf("the command failed: exitcode=%d, out=%q", termState.ExitCode, cmdOut)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// thread safe buffer for logging
|
|
type tsBuff struct {
|
|
buff bytes.Buffer
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func (t *tsBuff) String() string {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
return t.buff.String()
|
|
}
|
|
|
|
func (t *tsBuff) Write(p []byte) (n int, err error) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
return t.buff.Write(p)
|
|
}
|
|
|
|
// ListPersistentVolumeClaimsNamesIfConnected lists names of PersistentVolumeClaims present and the current k8s context
|
|
// returns empty list, if not connected to any cluster
|
|
func ListPersistentVolumeClaimsNamesIfConnected(ctx context.Context, namespaceOverride string) (names []string, err error) {
|
|
names, err = listPersistentVolumeClaimsNames(ctx, namespaceOverride)
|
|
if err != nil {
|
|
// not logged our authorized to access resources
|
|
if k8serrors.IsForbidden(err) || k8serrors.IsUnauthorized(err) || k8serrors.IsInvalid(err) || k8serrors.IsTimeout(err) {
|
|
return []string{}, nil
|
|
}
|
|
|
|
// non existent k8s cluster
|
|
var dnsErr *net.DNSError
|
|
if errors.As(err, &dnsErr) {
|
|
if dnsErr.IsNotFound || dnsErr.IsTemporary || dnsErr.IsTimeout {
|
|
return []string{}, nil
|
|
}
|
|
}
|
|
|
|
// connection refused
|
|
if errors.Is(err, syscall.ECONNREFUSED) {
|
|
return []string{}, nil
|
|
}
|
|
|
|
// invalid configuration: no configuration has been provided
|
|
if k8sclientcmd.IsEmptyConfig(err) {
|
|
return []string{}, nil
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func listPersistentVolumeClaimsNames(ctx context.Context, namespaceOverride string) (names []string, err error) {
|
|
client, namespace, err := NewClientAndResolvedNamespace(namespaceOverride)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
pvcs, err := client.CoreV1().PersistentVolumeClaims(namespace).List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
for _, pv := range pvcs.Items {
|
|
names = append(names, pv.Name)
|
|
}
|
|
|
|
return
|
|
}
|