func/pkg/k8s/dialer.go

578 lines
13 KiB
Go

package k8s
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"os"
"regexp"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
coreV1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
)
var SocatImage = "ghcr.io/knative/func-utils:v2"
// NewInClusterDialer creates context dialer that will dial TCP connections via POD running in k8s cluster.
// This is useful when accessing k8s services that are not exposed outside cluster (e.g. openshift image registry).
//
// Usage:
//
// dialer, err := k8s.NewInClusterDialer(ctx)
// if err != nil {
// return err
// }
// defer dialer.Close()
//
// transport := &http.Transport{
// DialContext: dialer.DialContext,
// }
//
// var client = http.Client{
// Transport: transport,
// }
func NewInClusterDialer(ctx context.Context, clientConfig clientcmd.ClientConfig) (*contextDialer, error) {
c := &contextDialer{
clientConfig: clientConfig,
detachChan: make(chan struct{}),
}
err := c.startDialerPod(ctx)
if err != nil {
return nil, err
}
return c, nil
}
type contextDialer struct {
coreV1 v1.CoreV1Interface
clientConfig clientcmd.ClientConfig
restConf *restclient.Config
podName string
namespace string
detachChan chan struct{}
}
func (c *contextDialer) DialContext(ctx context.Context, network string, addr string) (net.Conn, error) {
if network != "tcp" && network != "tcp4" && network != "tcp6" {
return nil, fmt.Errorf("unsupported network: %q", network)
}
ctrStdin, ctrStdout, conn := newConn()
connectSuccess := make(chan struct{})
connectFailure := make(chan error, 1)
go func() {
stderrBuff := bytes.NewBuffer(nil)
ctrStderr := io.MultiWriter(stderrBuff, detectConnSuccess(connectSuccess))
err := c.exec(context.TODO(), addr, ctrStdin, ctrStdout, ctrStderr)
if err != nil {
stderrStr := stderrBuff.String()
socatErr := tryParseSocatError(network, addr, stderrStr)
if socatErr != nil {
err = fmt.Errorf("socat error: %w", socatErr)
} else {
err = fmt.Errorf("failed to exec in pod: %w (stderr: %q)", err, stderrStr)
}
}
_ = conn.closeWithError(err)
connectFailure <- err
}()
select {
case <-connectSuccess:
return conn, nil
case err := <-connectFailure:
return nil, err
case <-ctx.Done():
_ = conn.closeWithError(ctx.Err())
return nil, ctx.Err()
}
}
// Creates io.Writer which closes connectSuccess channel when string "successfully connected" is written to it.
func detectConnSuccess(connectSuccess chan struct{}) io.Writer {
return newKMPWriter("successfully connected", connectSuccess)
}
// kmpWriter is a writer that search word w using KMP algorithm in the text written to the writer.
// When searched word w appears in the input the channel ch is closed.
// This can be used to detect if particular character sequence was written to the writer.
type kmpWriter struct {
w string
k int
t []int
ch chan<- struct{}
found bool
}
func newKMPWriter(w string, ch chan<- struct{}) *kmpWriter {
// Building KMP table.
t := make([]int, len(w)+1)
t[0] = -1
pos := 1
cnd := 0
for pos < len(w) {
if w[pos] == w[cnd] {
t[pos] = t[cnd]
} else {
t[pos] = cnd
for cnd >= 0 && w[pos] != w[cnd] {
cnd = t[cnd]
}
}
pos++
cnd++
}
t[pos] = cnd
return &kmpWriter{
w: w,
t: t,
ch: ch,
}
}
func (d *kmpWriter) Write(s []byte) (n int, err error) {
if d.found {
return len(s), nil
}
j := 0
for j < len(s) {
if d.w[d.k] == s[j] {
j++
d.k++
if d.k == len(d.w) {
d.found = true
close(d.ch)
return len(s), nil
}
} else {
d.k = d.t[d.k]
if d.k < 0 {
j++
d.k++
}
}
}
return j, nil
}
var (
connectionRefusedErrorRE = regexp.MustCompile(`E connect\(\d+, AF=\d+ (?P<hostport>[\[\]0-9.:a-z]+), \d+\): Connection refused`)
nameResolutionErrorRE = regexp.MustCompile(`E getaddrinfo\("(?P<hostname>[a-zA-z-.0-9]+)",.*\): Name does not resolve`)
)
// tries to detect common errors from `socat` stderr
func tryParseSocatError(network, address, stderr string) error {
groups := nameResolutionErrorRE.FindStringSubmatch(stderr)
if groups != nil {
var name string
if len(groups) > 1 {
name = groups[1]
}
return &net.OpError{
Op: "dial",
Net: network,
Source: nil,
Addr: nil,
Err: &net.DNSError{
Err: "no such host",
Name: name,
IsNotFound: true,
},
}
}
groups = connectionRefusedErrorRE.FindStringSubmatch(stderr)
if groups != nil {
var (
addr net.IP
port int
zone string
)
if len(groups) > 1 {
h, p, err := net.SplitHostPort(groups[1])
if err == nil {
addr = net.ParseIP(h)
p, _ := strconv.ParseInt(p, 10, 16)
port = int(p)
}
}
return &net.OpError{
Op: "dial",
Net: network,
Addr: &net.TCPAddr{
IP: addr,
Port: port,
Zone: zone,
},
Err: &os.SyscallError{
Syscall: "connect",
Err: syscall.ECONNREFUSED,
},
}
}
return nil
}
func (c *contextDialer) Close() error {
// closing the channel will cause stdin of the attached container to return EOF
// as a result the pod exits -- it transits to Completed state
close(c.detachChan)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1)
defer cancel()
delOpts := metaV1.DeleteOptions{}
return c.coreV1.Pods(c.namespace).Delete(ctx, c.podName, delOpts)
}
func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
c.restConf, err = c.clientConfig.ClientConfig()
if err != nil {
return
}
c.restConf.WarningHandler = restclient.NoWarnings{}
err = setConfigDefaults(c.restConf)
if err != nil {
return
}
client, err := kubernetes.NewForConfig(c.restConf)
if err != nil {
return
}
c.coreV1 = client.CoreV1()
c.namespace, _, err = c.clientConfig.Namespace()
if err != nil {
return
}
pods := client.CoreV1().Pods(c.namespace)
c.podName = "in-cluster-dialer-" + rand.String(5)
defer func() {
if err != nil {
c.Close()
}
}()
pod := &coreV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: c.podName,
Labels: nil,
Annotations: nil,
},
Spec: coreV1.PodSpec{
SecurityContext: defaultPodSecurityContext(),
Containers: []coreV1.Container{
{
Name: c.podName,
Image: SocatImage,
Stdin: true,
StdinOnce: true,
Command: []string{"socat", "-u", "-", "OPEN:/dev/null"},
SecurityContext: defaultSecurityContext(client),
},
},
DNSPolicy: coreV1.DNSClusterFirst,
RestartPolicy: coreV1.RestartPolicyNever,
},
}
creatOpts := metaV1.CreateOptions{}
ready := podReady(ctx, c.coreV1, c.podName, c.namespace)
_, err = pods.Create(ctx, pod, creatOpts)
if err != nil {
return
}
select {
case err = <-ready:
case <-ctx.Done():
err = ctx.Err()
case <-time.After(time.Minute * 1):
err = errors.New("timeout")
}
if err != nil {
return fmt.Errorf("failed to start dialer container: %w", err)
}
// attaching to the stdin to automatically Complete the pod on exit
go func() {
_ = attach(context.TODO(), c.coreV1.RESTClient(), c.restConf, c.podName, c.namespace, emptyBlockingReader(c.detachChan), io.Discard, io.Discard)
}()
return nil
}
// reader that returns no data and blocks until
// the channel is closed or data are sent to the channel
type emptyBlockingReader chan struct{}
func (e emptyBlockingReader) Read(p []byte) (n int, err error) {
<-e
return 0, io.EOF
}
func (c *contextDialer) exec(ctx context.Context, hostPort string, in io.Reader, out, errOut io.Writer) error {
restClient := c.coreV1.RESTClient()
req := restClient.Post().
Resource("pods").
Name(c.podName).
Namespace(c.namespace).
SubResource("exec")
req.VersionedParams(&coreV1.PodExecOptions{
Command: []string{"socat", "-dd", "-", fmt.Sprintf("TCP:%s", hostPort)},
Container: c.podName,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
executor, err := remotecommand.NewSPDYExecutor(c.restConf, "POST", req.URL())
if err != nil {
return err
}
return executor.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: in,
Stdout: out,
Stderr: errOut,
Tty: false,
})
}
func attach(ctx context.Context, restClient restclient.Interface, restConf *restclient.Config, podName, namespace string, in io.Reader, out, errOut io.Writer) error {
req := restClient.Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("attach")
req.VersionedParams(&coreV1.PodAttachOptions{
Container: podName,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
executor, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
if err != nil {
return err
}
return executor.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: in,
Stdout: out,
Stderr: errOut,
Tty: false,
})
}
func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace string) (errChan <-chan error) {
d := make(chan error)
errChan = d
pods := core.Pods(namespace)
nameSelector := fields.OneTermEqualSelector("metadata.name", podName).String()
listOpts := metaV1.ListOptions{
Watch: true,
FieldSelector: nameSelector,
}
watcher, err := pods.Watch(ctx, listOpts)
if err != nil {
return
}
go func() {
defer watcher.Stop()
ch := watcher.ResultChan()
for event := range ch {
pod, ok := event.Object.(*coreV1.Pod)
if !ok {
continue
}
if event.Type == watch.Modified {
for _, status := range pod.Status.ContainerStatuses {
if status.Ready {
d <- nil
return
}
if status.State.Terminated != nil {
msg, _ := GetPodLogs(ctx, namespace, podName, podName)
d <- fmt.Errorf("pod prematurely exited (output: %q, exitcode: %d)", msg, status.State.Terminated.ExitCode)
return
}
if status.State.Waiting != nil {
switch status.State.Waiting.Reason {
case "ErrImagePull",
"CreateContainerError",
"CreateContainerConfigError",
"InvalidImageName",
"CrashLoopBackOff",
"ImagePullBackOff":
d <- fmt.Errorf("reason: %v, message: %v",
status.State.Waiting.Reason,
status.State.Waiting.Message)
return
default:
continue
}
}
}
}
}
}()
return
}
func setConfigDefaults(config *restclient.Config) error {
gv := coreV1.SchemeGroupVersion
config.GroupVersion = &gv
config.APIPath = "/api"
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent()
}
return nil
}
type addr struct{}
func (a addr) Network() string {
return "pod-stdio"
}
func (a addr) String() string {
return "pod-stdio"
}
type conn struct {
pr *io.PipeReader
pw *io.PipeWriter
err atomic.Pointer[error]
}
func (c *conn) Read(b []byte) (n int, err error) {
n, err = c.pr.Read(b)
if errors.Is(err, io.ErrClosedPipe) {
if p := c.err.Load(); p != nil {
err = *p
}
}
return
}
func (c *conn) Write(b []byte) (n int, err error) {
n, err = c.pw.Write(b)
if errors.Is(err, io.ErrClosedPipe) {
if p := c.err.Load(); p != nil {
err = *p
}
}
return
}
func (c *conn) closeWithError(err error) error {
if err == nil {
err = net.ErrClosed
}
{
e := err
c.err.CompareAndSwap(nil, &e)
}
err = c.pw.CloseWithError(io.EOF)
if err != nil {
return fmt.Errorf("failed to close writer: %w", err)
}
err = c.pr.CloseWithError(net.ErrClosed)
if err != nil {
return fmt.Errorf("failed to close reader: %w", err)
}
return nil
}
func (c *conn) Close() error {
return c.closeWithError(nil)
}
func (c *conn) LocalAddr() net.Addr {
return addr{}
}
func (c *conn) RemoteAddr() net.Addr {
return addr{}
}
func (c *conn) SetDeadline(t time.Time) error { return nil }
func (c *conn) SetReadDeadline(t time.Time) error { return nil }
func (c *conn) SetWriteDeadline(t time.Time) error { return nil }
func newConn() (*io.PipeReader, *io.PipeWriter, *conn) {
pr0, pw0 := io.Pipe()
pr1, pw1 := io.Pipe()
rwc := &conn{pr: pr0, pw: pw1}
return pr1, pw0, rwc
}
func NewLazyInitInClusterDialer(clientConfig clientcmd.ClientConfig) *lazyInitInClusterDialer {
return &lazyInitInClusterDialer{
clientConfig: clientConfig,
}
}
type lazyInitInClusterDialer struct {
clientConfig clientcmd.ClientConfig
contextDialer *contextDialer
initErr error
o sync.Once
}
func (l *lazyInitInClusterDialer) DialContext(ctx context.Context, network string, addr string) (net.Conn, error) {
l.o.Do(func() {
l.contextDialer, l.initErr = NewInClusterDialer(ctx, l.clientConfig)
})
if l.initErr != nil {
return nil, l.initErr
}
return l.contextDialer.DialContext(ctx, network, addr)
}
func (l *lazyInitInClusterDialer) Close() error {
if l.contextDialer != nil {
return l.contextDialer.Close()
}
return nil
}