mirror of https://github.com/knative/func.git
fix: bad connection handling for in cluster dialer (#1800)
* fix: bad connection handling for in cluster dialer Connections were closed from wrong end of io.Pipe which resulted in confusing error logs. Signed-off-by: Matej Vasek <mvasek@redhat.com> * feat: DialContext() better error handling Now DialContext() returns some errors immediately instead of deferring it Read/Write operation on the returned connection. Signed-off-by: Matej Vasek <mvasek@redhat.com> * feat: DialContext() more better error handling Now ContextDial() tries to parse socat's stderr and translate it to Go's net.OpError instead of just creating error with whole stderr embedded in it. Signed-off-by: Matej Vasek <mvasek@redhat.com> * Apply suggestions from code review Co-authored-by: Lance Ball <lball@redhat.com> --------- Signed-off-by: Matej Vasek <mvasek@redhat.com> Co-authored-by: Lance Ball <lball@redhat.com>
This commit is contained in:
parent
cabba3f9d3
commit
3d1176680d
|
@ -1,13 +1,19 @@
|
|||
package k8s
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
coreV1 "k8s.io/api/core/v1"
|
||||
|
@ -66,21 +72,108 @@ func (c *contextDialer) DialContext(ctx context.Context, network string, addr st
|
|||
return nil, fmt.Errorf("unsupported network: %q", network)
|
||||
}
|
||||
|
||||
execDone := make(chan struct{})
|
||||
pr, pw, conn := newConn(execDone)
|
||||
|
||||
ctrStdin, ctrStdout, conn := newConn()
|
||||
connectSuccess := make(chan struct{})
|
||||
connectFailure := make(chan error, 1)
|
||||
go func() {
|
||||
defer close(execDone)
|
||||
errOut := bytes.NewBuffer(nil)
|
||||
err := c.exec(addr, pr, pw, errOut)
|
||||
stderrBuff := bytes.NewBuffer(nil)
|
||||
ctrStderr := io.MultiWriter(stderrBuff, detectConnSuccess(connectSuccess))
|
||||
|
||||
err := c.exec(addr, ctrStdin, ctrStdout, ctrStderr)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to exec in pod: %w (stderr: %q)", err, errOut.String())
|
||||
_ = pr.CloseWithError(err)
|
||||
_ = pw.CloseWithError(err)
|
||||
stderrStr := stderrBuff.String()
|
||||
socatErr := tryParseSocatError(network, addr, stderrStr)
|
||||
if socatErr != nil {
|
||||
err = fmt.Errorf("socat error: %w", socatErr)
|
||||
} else {
|
||||
err = fmt.Errorf("failed to exec in pod: %w (stderr: %q)", err, stderrStr)
|
||||
}
|
||||
}
|
||||
_ = conn.closeWithError(err)
|
||||
connectFailure <- err
|
||||
}()
|
||||
|
||||
return conn, nil
|
||||
select {
|
||||
case <-connectSuccess:
|
||||
return conn, nil
|
||||
case err := <-connectFailure:
|
||||
return nil, err
|
||||
case <-ctx.Done():
|
||||
_ = conn.closeWithError(ctx.Err())
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
var connSuccessfulRE = regexp.MustCompile("successfully connected")
|
||||
|
||||
// Creates io.Writer which closes connectSuccess channel when string "successfully connected" is written to it.
|
||||
func detectConnSuccess(connectSuccess chan struct{}) io.Writer {
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
ok := connSuccessfulRE.MatchReader(bufio.NewReader(pr))
|
||||
if ok {
|
||||
close(connectSuccess)
|
||||
}
|
||||
_, _ = io.Copy(io.Discard, pr)
|
||||
}()
|
||||
return pw
|
||||
}
|
||||
|
||||
var (
|
||||
connectionRefusedErrorRE = regexp.MustCompile(`E connect\(\d+, AF=\d+ (?P<hostport>[\[\]0-9.:a-z]+), \d+\): Connection refused`)
|
||||
nameResolutionErrorRE = regexp.MustCompile(`E getaddrinfo\("(?P<hostname>[a-zA-z-.0-9]+)",.*\): Name does not resolve`)
|
||||
)
|
||||
|
||||
// tries to detect common errors from `socat` stderr
|
||||
func tryParseSocatError(network, address, stderr string) error {
|
||||
groups := nameResolutionErrorRE.FindStringSubmatch(stderr)
|
||||
if groups != nil {
|
||||
var name string
|
||||
if len(groups) > 1 {
|
||||
name = groups[1]
|
||||
}
|
||||
return &net.OpError{
|
||||
Op: "dial",
|
||||
Net: network,
|
||||
Source: nil,
|
||||
Addr: nil,
|
||||
Err: &net.DNSError{
|
||||
Err: "no such host",
|
||||
Name: name,
|
||||
IsNotFound: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
groups = connectionRefusedErrorRE.FindStringSubmatch(stderr)
|
||||
if groups != nil {
|
||||
var (
|
||||
addr net.IP
|
||||
port int
|
||||
zone string
|
||||
)
|
||||
if len(groups) > 1 {
|
||||
h, p, err := net.SplitHostPort(groups[1])
|
||||
if err == nil {
|
||||
addr = net.ParseIP(h)
|
||||
p, _ := strconv.ParseInt(p, 10, 16)
|
||||
port = int(p)
|
||||
}
|
||||
}
|
||||
return &net.OpError{
|
||||
Op: "dial",
|
||||
Net: network,
|
||||
Addr: &net.TCPAddr{
|
||||
IP: addr,
|
||||
Port: port,
|
||||
Zone: zone,
|
||||
},
|
||||
Err: &os.SyscallError{
|
||||
Syscall: "connect",
|
||||
Err: syscall.ECONNREFUSED,
|
||||
},
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *contextDialer) Close() error {
|
||||
|
@ -197,7 +290,7 @@ func (c *contextDialer) exec(hostPort string, in io.Reader, out, errOut io.Write
|
|||
Namespace(c.namespace).
|
||||
SubResource("exec")
|
||||
req.VersionedParams(&coreV1.PodExecOptions{
|
||||
Command: []string{"socat", "-", fmt.Sprintf("TCP:%s", hostPort)},
|
||||
Command: []string{"socat", "-dd", "-", fmt.Sprintf("TCP:%s", hostPort)},
|
||||
Container: c.podName,
|
||||
Stdin: true,
|
||||
Stdout: true,
|
||||
|
@ -324,50 +417,73 @@ func (a addr) String() string {
|
|||
}
|
||||
|
||||
type conn struct {
|
||||
pr *io.PipeReader
|
||||
pw *io.PipeWriter
|
||||
execDone <-chan struct{}
|
||||
pr *io.PipeReader
|
||||
pw *io.PipeWriter
|
||||
err atomic.Pointer[error]
|
||||
}
|
||||
|
||||
func (c conn) Read(b []byte) (n int, err error) {
|
||||
return c.pr.Read(b)
|
||||
func (c *conn) Read(b []byte) (n int, err error) {
|
||||
n, err = c.pr.Read(b)
|
||||
if errors.Is(err, io.ErrClosedPipe) {
|
||||
if p := c.err.Load(); p != nil {
|
||||
err = *p
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c conn) Write(b []byte) (n int, err error) {
|
||||
return c.pw.Write(b)
|
||||
func (c *conn) Write(b []byte) (n int, err error) {
|
||||
n, err = c.pw.Write(b)
|
||||
if errors.Is(err, io.ErrClosedPipe) {
|
||||
if p := c.err.Load(); p != nil {
|
||||
err = *p
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c conn) Close() error {
|
||||
err := c.pw.Close()
|
||||
func (c *conn) closeWithError(err error) error {
|
||||
if err == nil {
|
||||
err = net.ErrClosed
|
||||
}
|
||||
|
||||
{
|
||||
e := err
|
||||
c.err.CompareAndSwap(nil, &e)
|
||||
}
|
||||
err = c.pw.CloseWithError(io.EOF)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to close writer: %w", err)
|
||||
}
|
||||
<-c.execDone
|
||||
err = c.pr.Close()
|
||||
err = c.pr.CloseWithError(net.ErrClosed)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to close reader: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c conn) LocalAddr() net.Addr {
|
||||
func (c *conn) Close() error {
|
||||
return c.closeWithError(nil)
|
||||
}
|
||||
|
||||
func (c *conn) LocalAddr() net.Addr {
|
||||
return addr{}
|
||||
}
|
||||
|
||||
func (c conn) RemoteAddr() net.Addr {
|
||||
func (c *conn) RemoteAddr() net.Addr {
|
||||
return addr{}
|
||||
}
|
||||
|
||||
func (c conn) SetDeadline(t time.Time) error { return nil }
|
||||
func (c *conn) SetDeadline(t time.Time) error { return nil }
|
||||
|
||||
func (c conn) SetReadDeadline(t time.Time) error { return nil }
|
||||
func (c *conn) SetReadDeadline(t time.Time) error { return nil }
|
||||
|
||||
func (c conn) SetWriteDeadline(t time.Time) error { return nil }
|
||||
func (c *conn) SetWriteDeadline(t time.Time) error { return nil }
|
||||
|
||||
func newConn(execDone <-chan struct{}) (*io.PipeReader, *io.PipeWriter, conn) {
|
||||
func newConn() (*io.PipeReader, *io.PipeWriter, *conn) {
|
||||
pr0, pw0 := io.Pipe()
|
||||
pr1, pw1 := io.Pipe()
|
||||
rwc := conn{pr: pr0, pw: pw1, execDone: execDone}
|
||||
rwc := &conn{pr: pr0, pw: pw1}
|
||||
return pr1, pw0, rwc
|
||||
}
|
||||
|
||||
|
|
|
@ -168,20 +168,21 @@ func TestDialUnreachable(t *testing.T) {
|
|||
dialer.Close()
|
||||
})
|
||||
|
||||
transport := &http.Transport{
|
||||
DialContext: dialer.DialContext,
|
||||
}
|
||||
|
||||
var client = http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
_, err = client.Get("http://does-not.exists.svc")
|
||||
_, err = dialer.DialContext(ctx, "tcp", "does-not.exists.svc:80")
|
||||
if err == nil {
|
||||
t.Error("error was expected but got nil")
|
||||
return
|
||||
}
|
||||
if !strings.Contains(err.Error(), "not resolve") {
|
||||
t.Errorf("error %q doesn't containe expected sub-string: ", err.Error())
|
||||
if !strings.Contains(err.Error(), "no such host") {
|
||||
t.Errorf("error %q doesn't contain expected substring: ", err.Error())
|
||||
}
|
||||
|
||||
_, err = dialer.DialContext(ctx, "tcp", "localhost:80")
|
||||
if err == nil {
|
||||
t.Error("error was expected but got nil")
|
||||
return
|
||||
}
|
||||
if !strings.Contains(err.Error(), "connection refused") {
|
||||
t.Errorf("error %q doesn't contain expected substring: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue