apiserver/pkg/util/proxy/streamtranslator_test.go

968 lines
36 KiB
Go

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"bytes"
"context"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
"math"
mrand "math/rand"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"strings"
"testing"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
rcconstants "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/proxy/metrics"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
)
// TestStreamTranslator_LoopbackStdinToStdout returns random data sent on the client's
// STDIN channel back onto the client's STDOUT channel. There are two servers in this test: the
// upstream fake SPDY server, and the StreamTranslator server. The StreamTranslator proxys the
// data received from the websocket client upstream to the SPDY server (by translating the
// websocket data into spdy). The returned data read on the websocket client STDOUT is then
// compared the random data sent on STDIN to ensure they are the same.
func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
// Create upstream fake SPDY server which copies STDIN back onto STDOUT stream.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createSPDYServerStreams(w, req, Options{
Stdin: true,
Stdout: true,
})
if err != nil {
t.Errorf("error on createHTTPStreams: %v", err)
return
}
defer ctx.conn.Close()
// Loopback STDIN data onto STDOUT stream.
_, err = io.Copy(ctx.stdoutStream, ctx.stdinStream)
if err != nil {
t.Fatalf("error copying STDIN to STDOUT: %v", err)
}
}))
defer spdyServer.Close()
// Create StreamTranslatorHandler, which points upstream to fake SPDY server with
// streams STDIN and STDOUT. Create test server from StreamTranslatorHandler.
spdyLocation, err := url.Parse(spdyServer.URL)
if err != nil {
t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL)
}
spdyTransport, err := fakeTransport()
if err != nil {
t.Fatalf("Unexpected error creating transport: %v", err)
}
streams := Options{Stdin: true, Stdout: true}
streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams)
streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
streamTranslator.ServeHTTP(w, req)
}))
defer streamTranslatorServer.Close()
// Now create the websocket client (executor), and point it to the "streamTranslatorServer".
streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL)
if err != nil {
t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL)
}
exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL)
if err != nil {
t.Errorf("unexpected error creating websocket executor: %v", err)
}
// Generate random data, and set it up to stream on STDIN. The data will be
// returned on the STDOUT buffer.
randomSize := 1024 * 1024
randomData := make([]byte, randomSize)
if _, err := rand.Read(randomData); err != nil {
t.Errorf("unexpected error reading random data: %v", err)
}
var stdout bytes.Buffer
options := &remotecommand.StreamOptions{
Stdin: bytes.NewReader(randomData),
Stdout: &stdout,
}
errorChan := make(chan error)
go func() {
// Start the streaming on the WebSocket "exec" client.
errorChan <- exec.StreamWithContext(context.Background(), *options)
}()
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expect stream to be closed after connection is closed.")
case err := <-errorChan:
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
data, err := io.ReadAll(bytes.NewReader(stdout.Bytes()))
if err != nil {
t.Errorf("error reading the stream: %v", err)
return
}
// Check the random data sent on STDIN was the same returned on STDOUT.
if !bytes.Equal(randomData, data) {
t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData))
}
// Validate the streamtranslator metrics; should be one 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_LoopbackStdinToStderr returns random data sent on the client's
// STDIN channel back onto the client's STDERR channel. There are two servers in this test: the
// upstream fake SPDY server, and the StreamTranslator server. The StreamTranslator proxys the
// data received from the websocket client upstream to the SPDY server (by translating the
// websocket data into spdy). The returned data read on the websocket client STDERR is then
// compared the random data sent on STDIN to ensure they are the same.
func TestStreamTranslator_LoopbackStdinToStderr(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
// Create upstream fake SPDY server which copies STDIN back onto STDERR stream.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createSPDYServerStreams(w, req, Options{
Stdin: true,
Stderr: true,
})
if err != nil {
t.Errorf("error on createHTTPStreams: %v", err)
return
}
defer ctx.conn.Close()
// Loopback STDIN data onto STDERR stream.
_, err = io.Copy(ctx.stderrStream, ctx.stdinStream)
if err != nil {
t.Fatalf("error copying STDIN to STDERR: %v", err)
}
}))
defer spdyServer.Close()
// Create StreamTranslatorHandler, which points upstream to fake SPDY server with
// streams STDIN and STDERR. Create test server from StreamTranslatorHandler.
spdyLocation, err := url.Parse(spdyServer.URL)
if err != nil {
t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL)
}
spdyTransport, err := fakeTransport()
if err != nil {
t.Fatalf("Unexpected error creating transport: %v", err)
}
streams := Options{Stdin: true, Stderr: true}
streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams)
streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
streamTranslator.ServeHTTP(w, req)
}))
defer streamTranslatorServer.Close()
// Now create the websocket client (executor), and point it to the "streamTranslatorServer".
streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL)
if err != nil {
t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL)
}
exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL)
if err != nil {
t.Errorf("unexpected error creating websocket executor: %v", err)
}
// Generate random data, and set it up to stream on STDIN. The data will be
// returned on the STDERR buffer.
randomSize := 1024 * 1024
randomData := make([]byte, randomSize)
if _, err := rand.Read(randomData); err != nil {
t.Errorf("unexpected error reading random data: %v", err)
}
var stderr bytes.Buffer
options := &remotecommand.StreamOptions{
Stdin: bytes.NewReader(randomData),
Stderr: &stderr,
}
errorChan := make(chan error)
go func() {
// Start the streaming on the WebSocket "exec" client.
errorChan <- exec.StreamWithContext(context.Background(), *options)
}()
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expect stream to be closed after connection is closed.")
case err := <-errorChan:
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
data, err := io.ReadAll(bytes.NewReader(stderr.Bytes()))
if err != nil {
t.Errorf("error reading the stream: %v", err)
return
}
// Check the random data sent on STDIN was the same returned on STDERR.
if !bytes.Equal(randomData, data) {
t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData))
}
// Validate the streamtranslator metrics; should be one 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// Returns a random exit code in the range(1-127).
func randomExitCode() int {
errorCode := mrand.Intn(127) // Range: (0 - 126)
errorCode += 1 // Range: (1 - 127)
return errorCode
}
// TestStreamTranslator_ErrorStream tests the error stream by sending an error with a random
// exit code, then validating the error arrives on the error stream.
func TestStreamTranslator_ErrorStream(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
expectedExitCode := randomExitCode()
// Create upstream fake SPDY server, returning a non-zero exit code
// on error stream within the structured error.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createSPDYServerStreams(w, req, Options{
Stdout: true,
})
if err != nil {
t.Errorf("error on createHTTPStreams: %v", err)
return
}
defer ctx.conn.Close()
// Read/discard STDIN data before returning error on error stream.
_, err = io.Copy(io.Discard, ctx.stdinStream)
if err != nil {
t.Fatalf("error copying STDIN to DISCARD: %v", err)
}
// Force an non-zero exit code error returned on the error stream.
err = ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Reason: rcconstants.NonZeroExitCodeReason,
Details: &metav1.StatusDetails{
Causes: []metav1.StatusCause{
{
Type: rcconstants.ExitCodeCauseType,
Message: fmt.Sprintf("%d", expectedExitCode),
},
},
},
}})
if err != nil {
t.Fatalf("error writing status: %v", err)
}
}))
defer spdyServer.Close()
// Create StreamTranslatorHandler, which points upstream to fake SPDY server, and
// create a test server using the StreamTranslatorHandler.
spdyLocation, err := url.Parse(spdyServer.URL)
if err != nil {
t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL)
}
spdyTransport, err := fakeTransport()
if err != nil {
t.Fatalf("Unexpected error creating transport: %v", err)
}
streams := Options{Stdin: true}
streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams)
streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
streamTranslator.ServeHTTP(w, req)
}))
defer streamTranslatorServer.Close()
// Now create the websocket client (executor), and point it to the "streamTranslatorServer".
streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL)
if err != nil {
t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL)
}
exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL)
if err != nil {
t.Errorf("unexpected error creating websocket executor: %v", err)
}
// Generate random data, and set it up to stream on STDIN. The data will be discarded at
// upstream SDPY server.
randomSize := 1024 * 1024
randomData := make([]byte, randomSize)
if _, err := rand.Read(randomData); err != nil {
t.Errorf("unexpected error reading random data: %v", err)
}
options := &remotecommand.StreamOptions{
Stdin: bytes.NewReader(randomData),
}
errorChan := make(chan error)
go func() {
// Start the streaming on the WebSocket "exec" client.
errorChan <- exec.StreamWithContext(context.Background(), *options)
}()
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expect stream to be closed after connection is closed.")
case err := <-errorChan:
// Expect exit code error on error stream.
if err == nil {
t.Errorf("expected error, but received none")
}
expectedError := fmt.Sprintf("command terminated with exit code %d", expectedExitCode)
// Compare expected error with exit code to actual error.
if expectedError != err.Error() {
t.Errorf("expected error (%s), got (%s)", expectedError, err)
}
}
// Validate the streamtranslator metrics; an exit code error is considered 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_MultipleReadChannels tests two streams (STDOUT, STDERR) reading from
// the connections at the same time.
func TestStreamTranslator_MultipleReadChannels(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
// Create upstream fake SPDY server which copies STDIN back onto STDOUT and STDERR stream.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createSPDYServerStreams(w, req, Options{
Stdin: true,
Stdout: true,
Stderr: true,
})
if err != nil {
t.Errorf("error on createHTTPStreams: %v", err)
return
}
defer ctx.conn.Close()
// TeeReader copies data read on STDIN onto STDERR.
stdinReader := io.TeeReader(ctx.stdinStream, ctx.stderrStream)
// Also copy STDIN to STDOUT.
_, err = io.Copy(ctx.stdoutStream, stdinReader)
if err != nil {
t.Errorf("error copying STDIN to STDOUT: %v", err)
}
}))
defer spdyServer.Close()
// Create StreamTranslatorHandler, which points upstream to fake SPDY server with
// streams STDIN, STDOUT, and STDERR. Create test server from StreamTranslatorHandler.
spdyLocation, err := url.Parse(spdyServer.URL)
if err != nil {
t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL)
}
spdyTransport, err := fakeTransport()
if err != nil {
t.Fatalf("Unexpected error creating transport: %v", err)
}
streams := Options{Stdin: true, Stdout: true, Stderr: true}
streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams)
streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
streamTranslator.ServeHTTP(w, req)
}))
defer streamTranslatorServer.Close()
// Now create the websocket client (executor), and point it to the "streamTranslatorServer".
streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL)
if err != nil {
t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL)
}
exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL)
if err != nil {
t.Errorf("unexpected error creating websocket executor: %v", err)
}
// Generate random data, and set it up to stream on STDIN. The data will be
// returned on the STDOUT and STDERR buffer.
randomSize := 1024 * 1024
randomData := make([]byte, randomSize)
if _, err := rand.Read(randomData); err != nil {
t.Errorf("unexpected error reading random data: %v", err)
}
var stdout, stderr bytes.Buffer
options := &remotecommand.StreamOptions{
Stdin: bytes.NewReader(randomData),
Stdout: &stdout,
Stderr: &stderr,
}
errorChan := make(chan error)
go func() {
// Start the streaming on the WebSocket "exec" client.
errorChan <- exec.StreamWithContext(context.Background(), *options)
}()
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expect stream to be closed after connection is closed.")
case err := <-errorChan:
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
stdoutBytes, err := io.ReadAll(bytes.NewReader(stdout.Bytes()))
if err != nil {
t.Errorf("error reading the stream: %v", err)
return
}
// Check the random data sent on STDIN was the same returned on STDOUT.
if !bytes.Equal(stdoutBytes, randomData) {
t.Errorf("unexpected data received: %d sent: %d", len(stdoutBytes), len(randomData))
}
stderrBytes, err := io.ReadAll(bytes.NewReader(stderr.Bytes()))
if err != nil {
t.Errorf("error reading the stream: %v", err)
return
}
// Check the random data sent on STDIN was the same returned on STDERR.
if !bytes.Equal(stderrBytes, randomData) {
t.Errorf("unexpected data received: %d sent: %d", len(stderrBytes), len(randomData))
}
// Validate the streamtranslator metrics; should have one 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_ThrottleReadChannels tests two streams (STDOUT, STDERR) using rate limited streams.
func TestStreamTranslator_ThrottleReadChannels(t *testing.T) {
// Create upstream fake SPDY server which copies STDIN back onto STDOUT and STDERR stream.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createSPDYServerStreams(w, req, Options{
Stdin: true,
Stdout: true,
Stderr: true,
})
if err != nil {
t.Errorf("error on createHTTPStreams: %v", err)
return
}
defer ctx.conn.Close()
// TeeReader copies data read on STDIN onto STDERR.
stdinReader := io.TeeReader(ctx.stdinStream, ctx.stderrStream)
// Also copy STDIN to STDOUT.
_, err = io.Copy(ctx.stdoutStream, stdinReader)
if err != nil {
t.Errorf("error copying STDIN to STDOUT: %v", err)
}
}))
defer spdyServer.Close()
// Create StreamTranslatorHandler, which points upstream to fake SPDY server with
// streams STDIN, STDOUT, and STDERR. Create test server from StreamTranslatorHandler.
spdyLocation, err := url.Parse(spdyServer.URL)
if err != nil {
t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL)
}
spdyTransport, err := fakeTransport()
if err != nil {
t.Fatalf("Unexpected error creating transport: %v", err)
}
streams := Options{Stdin: true, Stdout: true, Stderr: true}
maxBytesPerSec := 900 * 1024 // slightly less than the 1MB that is being transferred to exercise throttling.
streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, int64(maxBytesPerSec), streams)
streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
streamTranslator.ServeHTTP(w, req)
}))
defer streamTranslatorServer.Close()
// Now create the websocket client (executor), and point it to the "streamTranslatorServer".
streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL)
if err != nil {
t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL)
}
exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL)
if err != nil {
t.Errorf("unexpected error creating websocket executor: %v", err)
}
// Generate random data, and set it up to stream on STDIN. The data will be
// returned on the STDOUT and STDERR buffer.
randomSize := 1024 * 1024
randomData := make([]byte, randomSize)
if _, err := rand.Read(randomData); err != nil {
t.Errorf("unexpected error reading random data: %v", err)
}
var stdout, stderr bytes.Buffer
options := &remotecommand.StreamOptions{
Stdin: bytes.NewReader(randomData),
Stdout: &stdout,
Stderr: &stderr,
}
errorChan := make(chan error)
go func() {
// Start the streaming on the WebSocket "exec" client.
errorChan <- exec.StreamWithContext(context.Background(), *options)
}()
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expect stream to be closed after connection is closed.")
case err := <-errorChan:
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
stdoutBytes, err := io.ReadAll(bytes.NewReader(stdout.Bytes()))
if err != nil {
t.Errorf("error reading the stream: %v", err)
return
}
// Check the random data sent on STDIN was the same returned on STDOUT.
if !bytes.Equal(stdoutBytes, randomData) {
t.Errorf("unexpected data received: %d sent: %d", len(stdoutBytes), len(randomData))
}
stderrBytes, err := io.ReadAll(bytes.NewReader(stderr.Bytes()))
if err != nil {
t.Errorf("error reading the stream: %v", err)
return
}
// Check the random data sent on STDIN was the same returned on STDERR.
if !bytes.Equal(stderrBytes, randomData) {
t.Errorf("unexpected data received: %d sent: %d", len(stderrBytes), len(randomData))
}
}
// fakeTerminalSizeQueue implements TerminalSizeQueue, returning a random set of
// "maxSizes" number of TerminalSizes, storing the TerminalSizes in "sizes" slice.
type fakeTerminalSizeQueue struct {
maxSizes int
terminalSizes []remotecommand.TerminalSize
}
// newTerminalSizeQueue returns a pointer to a fakeTerminalSizeQueue passing
// "max" number of random TerminalSizes created.
func newTerminalSizeQueue(max int) *fakeTerminalSizeQueue {
return &fakeTerminalSizeQueue{
maxSizes: max,
terminalSizes: make([]remotecommand.TerminalSize, 0, max),
}
}
// Next returns a pointer to the next random TerminalSize, or nil if we have
// already returned "maxSizes" TerminalSizes already. Stores the randomly
// created TerminalSize in "terminalSizes" field for later validation.
func (f *fakeTerminalSizeQueue) Next() *remotecommand.TerminalSize {
if len(f.terminalSizes) >= f.maxSizes {
return nil
}
size := randomTerminalSize()
f.terminalSizes = append(f.terminalSizes, size)
return &size
}
// randomTerminalSize returns a TerminalSize with random values in the
// range (0-65535) for the fields Width and Height.
func randomTerminalSize() remotecommand.TerminalSize {
randWidth := uint16(mrand.Intn(int(math.Pow(2, 16))))
randHeight := uint16(mrand.Intn(int(math.Pow(2, 16))))
return remotecommand.TerminalSize{
Width: randWidth,
Height: randHeight,
}
}
// TestStreamTranslator_MultipleWriteChannels
func TestStreamTranslator_TTYResizeChannel(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
// Create the fake terminal size queue and the actualTerminalSizes which
// will be received at the opposite websocket endpoint.
numSizeQueue := 10000
sizeQueue := newTerminalSizeQueue(numSizeQueue)
actualTerminalSizes := make([]remotecommand.TerminalSize, 0, numSizeQueue)
// Create upstream fake SPDY server which copies STDIN back onto STDERR stream.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createSPDYServerStreams(w, req, Options{
Tty: true,
})
if err != nil {
t.Errorf("error on createHTTPStreams: %v", err)
return
}
defer ctx.conn.Close()
// Read the terminal resize requests, storing them in actualTerminalSizes
for i := 0; i < numSizeQueue; i++ {
actualTerminalSize := <-ctx.resizeChan
actualTerminalSizes = append(actualTerminalSizes, actualTerminalSize)
}
}))
defer spdyServer.Close()
// Create StreamTranslatorHandler, which points upstream to fake SPDY server with
// resize (TTY resize) stream. Create test server from StreamTranslatorHandler.
spdyLocation, err := url.Parse(spdyServer.URL)
if err != nil {
t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL)
}
spdyTransport, err := fakeTransport()
if err != nil {
t.Fatalf("Unexpected error creating transport: %v", err)
}
streams := Options{Tty: true}
streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams)
streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
streamTranslator.ServeHTTP(w, req)
}))
defer streamTranslatorServer.Close()
// Now create the websocket client (executor), and point it to the "streamTranslatorServer".
streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL)
if err != nil {
t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL)
}
exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL)
if err != nil {
t.Errorf("unexpected error creating websocket executor: %v", err)
}
options := &remotecommand.StreamOptions{
Tty: true,
TerminalSizeQueue: sizeQueue,
}
errorChan := make(chan error)
go func() {
// Start the streaming on the WebSocket "exec" client.
errorChan <- exec.StreamWithContext(context.Background(), *options)
}()
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expect stream to be closed after connection is closed.")
case err := <-errorChan:
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
// Validate the random TerminalSizes sent on the resize stream are the same
// as the actual TerminalSizes received at the websocket server.
if len(actualTerminalSizes) != numSizeQueue {
t.Fatalf("expected to receive num terminal resizes (%d), got (%d)",
numSizeQueue, len(actualTerminalSizes))
}
for i, actual := range actualTerminalSizes {
expected := sizeQueue.terminalSizes[i]
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected terminal resize window %v, got %v", expected, actual)
}
}
// Validate the streamtranslator metrics; should have one 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_WebSocketServerErrors validates that when there is a problem creating
// the websocket server as the first step of the StreamTranslator an error is properly returned.
func TestStreamTranslator_WebSocketServerErrors(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
spdyLocation, err := url.Parse("http://127.0.0.1")
if err != nil {
t.Fatalf("Unable to parse spdy server URL")
}
spdyTransport, err := fakeTransport()
if err != nil {
t.Fatalf("Unexpected error creating transport: %v", err)
}
streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, Options{})
streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
streamTranslator.ServeHTTP(w, req)
}))
defer streamTranslatorServer.Close()
// Now create the websocket client (executor), and point it to the "streamTranslatorServer".
streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL)
if err != nil {
t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL)
}
exec, err := remotecommand.NewWebSocketExecutorForProtocols(
&rest.Config{Host: streamTranslatorLocation.Host},
"GET",
streamTranslatorServer.URL,
rcconstants.StreamProtocolV4Name, // RemoteCommand V4 protocol is unsupported
)
if err != nil {
t.Errorf("unexpected error creating websocket executor: %v", err)
}
errorChan := make(chan error)
go func() {
// Start the streaming on the WebSocket "exec" client. The WebSocket server within the
// StreamTranslator propagates an error here because the V4 protocol is not supported.
errorChan <- exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{})
}()
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expect stream to be closed after connection is closed.")
case err := <-errorChan:
// Must return "websocket unable to upgrade" (bad handshake) error.
if err == nil {
t.Fatalf("expected error, but received none")
}
if !strings.Contains(err.Error(), "unable to upgrade streaming request") {
t.Errorf("expected websocket bad handshake error, got (%s)", err)
}
}
// Validate the streamtranslator metrics; should have one 500 failure.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="400"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_BlockRedirects verifies that the StreamTranslator will *not* follow
// redirects; it will thrown an error instead.
func TestStreamTranslator_BlockRedirects(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
for _, statusCode := range []int{
http.StatusMovedPermanently, // 301
http.StatusFound, // 302
http.StatusSeeOther, // 303
http.StatusTemporaryRedirect, // 307
http.StatusPermanentRedirect, // 308
} {
// Create upstream fake SPDY server which returns a redirect.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Location", "/")
w.WriteHeader(statusCode)
}))
defer spdyServer.Close()
spdyLocation, err := url.Parse(spdyServer.URL)
if err != nil {
t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL)
}
spdyTransport, err := fakeTransport()
if err != nil {
t.Fatalf("Unexpected error creating transport: %v", err)
}
streams := Options{Stdout: true}
streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams)
streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
streamTranslator.ServeHTTP(w, req)
}))
defer streamTranslatorServer.Close()
// Now create the websocket client (executor), and point it to the "streamTranslatorServer".
streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL)
if err != nil {
t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL)
}
exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL)
if err != nil {
t.Errorf("unexpected error creating websocket executor: %v", err)
}
errorChan := make(chan error)
go func() {
// Start the streaming on the WebSocket "exec" client.
// Should return "redirect not allowed" error.
errorChan <- exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{})
}()
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expect stream to be closed after connection is closed.")
case err := <-errorChan:
// Must return "redirect now allowed" error.
if err == nil {
t.Fatalf("expected error, but received none")
}
if !strings.Contains(err.Error(), "redirect not allowed") {
t.Errorf("expected redirect not allowed error, got (%s)", err)
}
}
// Validate the streamtranslator metrics; should have one 500 failure each loop.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="500"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
metrics.ResetForTest() // Clear metrics each loop
}
}
// streamContext encapsulates the structures necessary to communicate through
// a SPDY connection, including the Reader/Writer streams.
type streamContext struct {
conn io.Closer
stdinStream io.ReadCloser
stdoutStream io.WriteCloser
stderrStream io.WriteCloser
resizeStream io.ReadCloser
resizeChan chan remotecommand.TerminalSize
writeStatus func(status *apierrors.StatusError) error
}
type streamAndReply struct {
httpstream.Stream
replySent <-chan struct{}
}
// CreateSPDYServerStreams upgrades the passed HTTP request to a SPDY bi-directional streaming
// connection with remote command streams defined in passed options. Returns a streamContext
// structure containing the Reader/Writer streams to communicate through the SDPY connection.
// Returns an error if unable to upgrade the HTTP connection to a SPDY connection.
func createSPDYServerStreams(w http.ResponseWriter, req *http.Request, opts Options) (*streamContext, error) {
_, err := httpstream.Handshake(req, w, []string{rcconstants.StreamProtocolV4Name})
if err != nil {
return nil, err
}
upgrader := spdy.NewResponseUpgrader()
streamCh := make(chan streamAndReply)
conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error {
streamCh <- streamAndReply{Stream: stream, replySent: replySent}
return nil
})
ctx := &streamContext{
conn: conn,
}
// wait for stream
replyChan := make(chan struct{}, 5)
defer close(replyChan)
receivedStreams := 0
expectedStreams := 1 // expect at least the error stream
if opts.Stdout {
expectedStreams++
}
if opts.Stdin {
expectedStreams++
}
if opts.Stderr {
expectedStreams++
}
if opts.Tty {
expectedStreams++
}
WaitForStreams:
for {
select {
case stream := <-streamCh:
streamType := stream.Headers().Get(v1.StreamType)
switch streamType {
case v1.StreamTypeError:
replyChan <- struct{}{}
ctx.writeStatus = v4WriteStatusFunc(stream)
case v1.StreamTypeStdout:
replyChan <- struct{}{}
ctx.stdoutStream = stream
case v1.StreamTypeStdin:
replyChan <- struct{}{}
ctx.stdinStream = stream
case v1.StreamTypeStderr:
replyChan <- struct{}{}
ctx.stderrStream = stream
case v1.StreamTypeResize:
replyChan <- struct{}{}
ctx.resizeStream = stream
default:
// add other stream ...
return nil, errors.New("unimplemented stream type")
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
}
}
if ctx.resizeStream != nil {
ctx.resizeChan = make(chan remotecommand.TerminalSize)
go handleResizeEvents(req.Context(), ctx.resizeStream, ctx.resizeChan)
}
return ctx, nil
}
func v4WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) error {
return func(status *apierrors.StatusError) error {
bs, err := json.Marshal(status.Status())
if err != nil {
return err
}
_, err = stream.Write(bs)
return err
}
}
func fakeTransport() (*http.Transport, error) {
cfg := &transport.Config{
TLS: transport.TLSConfig{
Insecure: true,
CAFile: "",
},
}
rt, err := transport.New(cfg)
if err != nil {
return nil, err
}
t, ok := rt.(*http.Transport)
if !ok {
return nil, fmt.Errorf("unknown transport type: %T", rt)
}
return t, nil
}