344 lines
9.8 KiB
Go
344 lines
9.8 KiB
Go
/*
|
|
Copyright 2025 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package handlers
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"k8s.io/apimachinery/pkg/api/apitesting"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
|
endpointstesting "k8s.io/apiserver/pkg/endpoints/testing"
|
|
"k8s.io/client-go/dynamic"
|
|
restclient "k8s.io/client-go/rest"
|
|
)
|
|
|
|
// Fake API versions, similar to api/latest.go
|
|
const namedGroupPrefix = "apis"
|
|
const testAPIGroup = "test.group"
|
|
|
|
var testGroupV1 = schema.GroupVersion{Group: testAPIGroup, Version: "1"}
|
|
var testGroupV2 = schema.GroupVersion{Group: testAPIGroup, Version: "2"}
|
|
var testCodecV2 = codecs.LegacyCodec(testGroupV2)
|
|
|
|
func addTestTypesV2() {
|
|
scheme.AddKnownTypes(testGroupV2,
|
|
&endpointstesting.Simple{},
|
|
&endpointstesting.SimpleList{},
|
|
)
|
|
metav1.AddToGroupVersion(scheme, testGroupV2)
|
|
}
|
|
|
|
func init() {
|
|
addTestTypesV2()
|
|
}
|
|
|
|
func TestWatchHTTPErrors(t *testing.T) {
|
|
ctx := t.Context()
|
|
watcher := watch.NewFake()
|
|
timeoutCh := make(chan time.Time)
|
|
doneCh := make(chan struct{})
|
|
|
|
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
|
|
if !ok || info.StreamSerializer == nil {
|
|
t.Fatal(info)
|
|
}
|
|
serializer := info.StreamSerializer
|
|
|
|
// Setup a new watchserver
|
|
watchServer := &WatchServer{
|
|
Scope: &RequestScope{},
|
|
Watching: watcher,
|
|
|
|
MediaType: "testcase/json",
|
|
Framer: serializer.Framer,
|
|
Encoder: testCodecV2,
|
|
EmbeddedEncoder: testCodecV2,
|
|
|
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh: timeoutCh, done: doneCh},
|
|
}
|
|
|
|
s := httptest.NewServer(serveWatch(watcher, watchServer, nil))
|
|
defer s.Close()
|
|
|
|
// Setup a client
|
|
dest, _ := url.Parse(s.URL)
|
|
dest.Path = "/" + namedGroupPrefix + "/" + testGroupV2.Group + "/" + testGroupV2.Version + "/simple"
|
|
dest.RawQuery = "watch=true"
|
|
|
|
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, dest.String(), nil)
|
|
client := http.Client{}
|
|
resp, err := client.Do(req)
|
|
require.NoError(t, err)
|
|
defer apitesting.Close(t, resp.Body)
|
|
|
|
// Send error to server from storage
|
|
errStatus := apierrors.NewInternalError(fmt.Errorf("we got an error")).Status()
|
|
watcher.Error(&errStatus)
|
|
watcher.Stop()
|
|
|
|
// Make sure we can actually watch an endpoint
|
|
decoder := json.NewDecoder(resp.Body)
|
|
var got watchJSON
|
|
err = decoder.Decode(&got)
|
|
require.NoError(t, err)
|
|
require.Equal(t, watch.Error, got.Type)
|
|
status := &metav1.Status{}
|
|
err = json.Unmarshal(got.Object, status)
|
|
require.NoError(t, err)
|
|
expectedStatus := &metav1.Status{
|
|
TypeMeta: metav1.TypeMeta{
|
|
Kind: "Status",
|
|
APIVersion: "v1",
|
|
},
|
|
Code: 500,
|
|
Status: "Failure",
|
|
Message: "Internal error occurred: we got an error",
|
|
Reason: errStatus.Reason,
|
|
Details: errStatus.Details,
|
|
}
|
|
require.Equal(t, expectedStatus, status)
|
|
}
|
|
|
|
func TestWatchHTTPErrorsBeforeServe(t *testing.T) {
|
|
ctx := t.Context()
|
|
watcher := watch.NewFake()
|
|
timeoutCh := make(chan time.Time)
|
|
doneCh := make(chan struct{})
|
|
|
|
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
|
|
if !ok || info.StreamSerializer == nil {
|
|
t.Fatal(info)
|
|
}
|
|
serializer := info.StreamSerializer
|
|
|
|
// Setup a new watchserver
|
|
watchServer := &WatchServer{
|
|
Scope: &RequestScope{
|
|
Serializer: runtime.NewSimpleNegotiatedSerializer(info),
|
|
Kind: testGroupV1.WithKind("test"),
|
|
},
|
|
Watching: watcher,
|
|
|
|
MediaType: "testcase/json",
|
|
Framer: serializer.Framer,
|
|
Encoder: testCodecV2,
|
|
EmbeddedEncoder: testCodecV2,
|
|
|
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, doneCh},
|
|
}
|
|
|
|
statusErr := apierrors.NewInternalError(fmt.Errorf("we got an error"))
|
|
errStatus := statusErr.Status()
|
|
|
|
s := httptest.NewServer(serveWatch(watcher, watchServer, statusErr))
|
|
defer s.Close()
|
|
|
|
// Setup a client
|
|
dest, _ := url.Parse(s.URL)
|
|
dest.Path = "/" + namedGroupPrefix + "/" + testGroupV2.Group + "/" + testGroupV2.Version + "/simple"
|
|
dest.RawQuery = "watch=true"
|
|
|
|
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, dest.String(), nil)
|
|
client := http.Client{}
|
|
resp, err := client.Do(req)
|
|
require.NoError(t, err)
|
|
defer apitesting.Close(t, resp.Body)
|
|
|
|
// We had already got an error before watch serve started
|
|
decoder := json.NewDecoder(resp.Body)
|
|
var status *metav1.Status
|
|
err = decoder.Decode(&status)
|
|
require.NoError(t, err)
|
|
expectedStatus := &metav1.Status{
|
|
TypeMeta: metav1.TypeMeta{
|
|
Kind: "Status",
|
|
APIVersion: "v1",
|
|
},
|
|
Code: 500,
|
|
Status: "Failure",
|
|
Message: "Internal error occurred: we got an error",
|
|
Reason: errStatus.Reason,
|
|
Details: errStatus.Details,
|
|
}
|
|
require.Equal(t, expectedStatus, status)
|
|
|
|
// check for leaks
|
|
require.Truef(t, watcher.IsStopped(),
|
|
"Leaked watcher goruntine after request done")
|
|
}
|
|
|
|
func TestWatchHTTPDynamicClientErrors(t *testing.T) {
|
|
watcher := watch.NewFake()
|
|
timeoutCh := make(chan time.Time)
|
|
done := make(chan struct{})
|
|
|
|
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
|
|
if !ok || info.StreamSerializer == nil {
|
|
t.Fatal(info)
|
|
}
|
|
serializer := info.StreamSerializer
|
|
|
|
// Setup a new watchserver
|
|
watchServer := &WatchServer{
|
|
Scope: &RequestScope{},
|
|
Watching: watcher,
|
|
|
|
MediaType: "testcase/json",
|
|
Framer: serializer.Framer,
|
|
Encoder: testCodecV2,
|
|
EmbeddedEncoder: testCodecV2,
|
|
|
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
|
}
|
|
|
|
s := httptest.NewServer(serveWatch(watcher, watchServer, nil))
|
|
defer s.Close()
|
|
defer s.CloseClientConnections()
|
|
|
|
client := dynamic.NewForConfigOrDie(&restclient.Config{
|
|
Host: s.URL,
|
|
APIPath: "/" + namedGroupPrefix,
|
|
}).Resource(testGroupV2.WithResource("simple"))
|
|
|
|
_, err := client.Watch(context.TODO(), metav1.ListOptions{})
|
|
require.Equal(t, runtime.NegotiateError{Stream: true, ContentType: "testcase/json"}, err)
|
|
}
|
|
|
|
func TestWatchHTTPTimeout(t *testing.T) {
|
|
ctx := t.Context()
|
|
watcher := watch.NewFake()
|
|
timeoutCh := make(chan time.Time)
|
|
done := make(chan struct{})
|
|
|
|
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
|
|
if !ok || info.StreamSerializer == nil {
|
|
t.Fatal(info)
|
|
}
|
|
serializer := info.StreamSerializer
|
|
|
|
// Setup a new watchserver
|
|
watchServer := &WatchServer{
|
|
Scope: &RequestScope{},
|
|
Watching: watcher,
|
|
|
|
MediaType: "testcase/json",
|
|
Framer: serializer.Framer,
|
|
Encoder: testCodecV2,
|
|
EmbeddedEncoder: testCodecV2,
|
|
|
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
|
}
|
|
|
|
s := httptest.NewServer(serveWatch(watcher, watchServer, nil))
|
|
defer s.Close()
|
|
|
|
// Setup a client
|
|
dest, _ := url.Parse(s.URL)
|
|
dest.Path = "/" + namedGroupPrefix + "/" + testGroupV2.Group + "/" + testGroupV2.Version + "/simple"
|
|
dest.RawQuery = "watch=true"
|
|
|
|
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, dest.String(), nil)
|
|
client := http.Client{}
|
|
resp, err := client.Do(req)
|
|
require.NoError(t, err)
|
|
defer apitesting.Close(t, resp.Body)
|
|
|
|
// Send object added event to server from storage
|
|
watcher.Add(&endpointstesting.Simple{TypeMeta: metav1.TypeMeta{APIVersion: testGroupV2.String()}})
|
|
|
|
// Make sure we can actually watch an endpoint
|
|
decoder := json.NewDecoder(resp.Body)
|
|
var got watchJSON
|
|
err = decoder.Decode(&got)
|
|
require.NoError(t, err)
|
|
|
|
// Timeout and check for leaks
|
|
close(timeoutCh)
|
|
select {
|
|
case <-done:
|
|
eventCh := watcher.ResultChan()
|
|
select {
|
|
case _, opened := <-eventCh:
|
|
if opened {
|
|
t.Errorf("Watcher received unexpected event")
|
|
}
|
|
if !watcher.IsStopped() {
|
|
t.Errorf("Watcher is not stopped")
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Errorf("Leaked watch on timeout")
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Errorf("Failed to stop watcher after %s of timeout signal", wait.ForeverTestTimeout.String())
|
|
}
|
|
|
|
// Make sure we can't receive any more events through the timeout watch
|
|
err = decoder.Decode(&got)
|
|
require.Equal(t, io.EOF, err)
|
|
}
|
|
|
|
// watchJSON defines the expected JSON wire equivalent of watch.Event.
|
|
// Use this for testing instead of metav1.WatchEvent to ensure the wire format
|
|
// doesn't change and skip the automatic decoding of WatchEvent.Object.Raw into
|
|
// WatchEvent.Object.Object.
|
|
type watchJSON struct {
|
|
Type watch.EventType `json:"type,omitempty"`
|
|
Object json.RawMessage `json:"object,omitempty"`
|
|
}
|
|
|
|
type fakeTimeoutFactory struct {
|
|
timeoutCh chan time.Time
|
|
done chan struct{}
|
|
}
|
|
|
|
func (t *fakeTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
|
|
return t.timeoutCh, func() bool {
|
|
defer close(t.done)
|
|
return true
|
|
}
|
|
}
|
|
|
|
// serveWatch will serve a watch response according to the watcher and watchServer.
|
|
// Before watchServer.HandleHTTP, an error may occur like k8s.io/apiserver/pkg/endpoints/handlers/watch.go#serveWatch does.
|
|
func serveWatch(watcher watch.Interface, watchServer *WatchServer, preServeErr error) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, req *http.Request) {
|
|
defer watcher.Stop()
|
|
|
|
if preServeErr != nil {
|
|
responsewriters.ErrorNegotiated(preServeErr, watchServer.Scope.Serializer, watchServer.Scope.Kind.GroupVersion(), w, req)
|
|
return
|
|
}
|
|
|
|
watchServer.HandleHTTP(w, req)
|
|
}
|
|
}
|