Merge pull request #112046 from aojea/etcd_healthz

rate limite etcd healthcheck request

Kubernetes-commit: 0f37b3120643580f632ca12d3e174e7ec447948c
This commit is contained in:
Kubernetes Publisher 2022-09-12 12:01:27 -07:00
commit 99533703e1
5 changed files with 315 additions and 17 deletions

6
go.mod
View File

@ -35,12 +35,13 @@ require (
golang.org/x/net v0.0.0-20220722155237-a158d28d115b
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/grpc v1.47.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/square/go-jose.v2 v2.2.2
k8s.io/api v0.0.0-20220909223647-30ff9916664f
k8s.io/apimachinery v0.0.0-20220909223208-6d854d747c21
k8s.io/client-go v0.0.0-20220909224245-ab826d2728f3
k8s.io/client-go v0.0.0-20220912183627-9dae6917fba0
k8s.io/component-base v0.0.0-20220909225306-3f8aa5a81d31
k8s.io/klog/v2 v2.80.1
k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1
@ -109,7 +110,6 @@ require (
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect
google.golang.org/protobuf v1.28.1 // indirect
@ -121,6 +121,6 @@ require (
replace (
k8s.io/api => k8s.io/api v0.0.0-20220909223647-30ff9916664f
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20220909223208-6d854d747c21
k8s.io/client-go => k8s.io/client-go v0.0.0-20220909224245-ab826d2728f3
k8s.io/client-go => k8s.io/client-go v0.0.0-20220912183627-9dae6917fba0
k8s.io/component-base => k8s.io/component-base v0.0.0-20220909225306-3f8aa5a81d31
)

4
go.sum
View File

@ -971,8 +971,8 @@ k8s.io/api v0.0.0-20220909223647-30ff9916664f h1:NDgZks7RqnJwWUPDe0tPxMirAowsSG2
k8s.io/api v0.0.0-20220909223647-30ff9916664f/go.mod h1:mgirip+ylRYNjZVz7OqYQtrEdhksbpM2LSSH/QNc3wg=
k8s.io/apimachinery v0.0.0-20220909223208-6d854d747c21 h1:/RMUsMMVr3xRUWpyOQKagJNRAXt7OfC5R7nJm0BJvz0=
k8s.io/apimachinery v0.0.0-20220909223208-6d854d747c21/go.mod h1:uBlVnHT48nY5oV6uG8J4eVKMg56CZnmtHewbsBFKgJA=
k8s.io/client-go v0.0.0-20220909224245-ab826d2728f3 h1:PkVMH2O/rFAMo1rTkgR3Z350vHfprbaG295igiMTdWk=
k8s.io/client-go v0.0.0-20220909224245-ab826d2728f3/go.mod h1:0uMSNDHUsMhV/kWEx8KQztCZHSWwprrUaWZebaf3QPc=
k8s.io/client-go v0.0.0-20220912183627-9dae6917fba0 h1:cgoKCqSOa1s3Cd0LPl0H8XNOywKBmrf7+wHN8KHoPbY=
k8s.io/client-go v0.0.0-20220912183627-9dae6917fba0/go.mod h1:0uMSNDHUsMhV/kWEx8KQztCZHSWwprrUaWZebaf3QPc=
k8s.io/component-base v0.0.0-20220909225306-3f8aa5a81d31 h1:D6/Snav0hNqj59CtxCw1b/7uDpt0bh/LJ3EgaJexFdA=
k8s.io/component-base v0.0.0-20220909225306-3f8aa5a81d31/go.mod h1:vqup2ywrSckV+vGOpXZJ8wKhAiqse0XTB6qKJVUmQBc=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=

View File

@ -35,6 +35,7 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime"
@ -123,20 +124,42 @@ func newETCD3ReadyCheck(c storagebackend.Config, stopCh <-chan struct{}) (func()
return newETCD3Check(c, timeout, stopCh)
}
// atomic error acts as a cache for atomically store an error
// the error is only updated if the timestamp is more recent than
// current stored error.
type atomicLastError struct {
mu sync.RWMutex
err error
timestamp time.Time
}
func (a *atomicLastError) Store(err error, t time.Time) {
a.mu.Lock()
defer a.mu.Unlock()
if a.timestamp.IsZero() || a.timestamp.Before(t) {
a.err = err
a.timestamp = t
}
}
func (a *atomicLastError) Load() error {
a.mu.RLock()
defer a.mu.RUnlock()
return a.err
}
func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan struct{}) (func() error, error) {
// constructing the etcd v3 client blocks and times out if etcd is not available.
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
lock := sync.Mutex{}
lock := sync.RWMutex{}
var client *clientv3.Client
clientErr := fmt.Errorf("etcd client connection not yet established")
go wait.PollUntil(time.Second, func() (bool, error) {
newClient, err := newETCD3Client(c.Transport)
lock.Lock()
defer lock.Unlock()
// Ensure that server is already not shutting down.
select {
case <-stopCh:
@ -146,7 +169,6 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
return true, nil
default:
}
if err != nil {
clientErr = err
return false, nil
@ -169,25 +191,37 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
}
}()
// limit to a request every half of the configured timeout with a maximum burst of one
// rate limited requests will receive the last request sent error (note: not the last received response)
limiter := rate.NewLimiter(rate.Every(timeout/2), 1)
// initial state is the clientErr
lastError := &atomicLastError{err: fmt.Errorf("etcd client connection not yet established")}
return func() error {
// Given that client is closed on shutdown we hold the lock for
// the entire period of healthcheck call to ensure that client will
// not be closed during healthcheck.
// Given that healthchecks has a 2s timeout, worst case of blocking
// shutdown for additional 2s seems acceptable.
lock.Lock()
defer lock.Unlock()
lock.RLock()
defer lock.RUnlock()
if clientErr != nil {
return clientErr
}
if limiter.Allow() == false {
return lastError.Load()
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
now := time.Now()
_, err := client.Get(ctx, path.Join("/", c.Prefix, "health"))
if err == nil {
return nil
if err != nil {
err = fmt.Errorf("error getting data from etcd: %w", err)
}
return fmt.Errorf("error getting data from etcd: %w", err)
lastError.Store(err, now)
return err
}, nil
}

View File

@ -0,0 +1,48 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"errors"
"fmt"
"testing"
"time"
)
func Test_atomicLastError(t *testing.T) {
aError := &atomicLastError{err: fmt.Errorf("initial error")}
// no timestamp is always updated
aError.Store(errors.New("updated error"), time.Time{})
err := aError.Load()
if err.Error() != "updated error" {
t.Fatalf("Expected: \"updated error\" got: %s", err.Error())
}
// update to current time
now := time.Now()
aError.Store(errors.New("now error"), now)
err = aError.Load()
if err.Error() != "now error" {
t.Fatalf("Expected: \"now error\" got: %s", err.Error())
}
// no update to past time
past := now.Add(-5 * time.Second)
aError.Store(errors.New("past error"), past)
err = aError.Load()
if err.Error() != "now error" {
t.Fatalf("Expected: \"now error\" got: %s", err.Error())
}
}

View File

@ -19,6 +19,10 @@ package factory
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -105,8 +109,10 @@ func TestCreateHealthcheck(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ready := make(chan struct{})
tc.cfg.Transport.ServerList = client.Endpoints()
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
select {
@ -121,13 +127,14 @@ func TestCreateHealthcheck(t *testing.T) {
return client, nil
}
stop := make(chan struct{})
defer close(stop)
healthcheck, err := CreateHealthCheck(tc.cfg, stop)
if err != nil {
t.Fatal(err)
}
// Wait for healthcheck to establish connection
time.Sleep(2 * time.Second)
<-ready
got := healthcheck()
if !errors.Is(got, tc.want) {
@ -202,8 +209,10 @@ func TestCreateReadycheck(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ready := make(chan struct{})
tc.cfg.Transport.ServerList = client.Endpoints()
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
select {
@ -218,12 +227,14 @@ func TestCreateReadycheck(t *testing.T) {
return client, nil
}
stop := make(chan struct{})
defer close(stop)
healthcheck, err := CreateReadyCheck(tc.cfg, stop)
if err != nil {
t.Fatal(err)
}
// Wait for healthcheck to establish connection
time.Sleep(2 * time.Second)
<-ready
got := healthcheck()
@ -233,3 +244,208 @@ func TestCreateReadycheck(t *testing.T) {
})
}
}
func TestRateLimitHealthcheck(t *testing.T) {
etcdConfig := testserver.NewTestConfig(t)
client := testserver.RunEtcd(t, etcdConfig)
newETCD3ClientFn := newETCD3Client
defer func() {
newETCD3Client = newETCD3ClientFn
}()
cfg := storagebackend.Config{
Type: storagebackend.StorageTypeETCD3,
Transport: storagebackend.TransportConfig{},
HealthcheckTimeout: 5 * time.Second,
}
cfg.Transport.ServerList = client.Endpoints()
tests := []struct {
name string
want error
}{
{
name: "etcd ok",
},
{
name: "etcd down",
want: errors.New("etcd down"),
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ready := make(chan struct{})
var counter uint64
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
atomic.AddUint64(&counter, 1)
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, tc.want
}
},
}
client.KV = dummyKV
return client, nil
}
stop := make(chan struct{})
defer close(stop)
healthcheck, err := CreateHealthCheck(cfg, stop)
if err != nil {
t.Fatal(err)
}
// Wait for healthcheck to establish connection
<-ready
// run a first request to obtain the state
err = healthcheck()
if !errors.Is(err, tc.want) {
t.Errorf("healthcheck() mismatch want %v got %v", tc.want, err)
}
// run multiple request in parallel, they should have the same state that the first one
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := healthcheck()
if !errors.Is(err, tc.want) {
t.Errorf("healthcheck() mismatch want %v got %v", tc.want, err)
}
}()
}
// check the counter once the requests have finished
wg.Wait()
if counter != 1 {
t.Errorf("healthcheck() called etcd %d times, expected only one call", counter)
}
// wait until the rate limit allows new connections
time.Sleep(cfg.HealthcheckTimeout / 2)
// a new run on request should increment the counter only once
// run multiple request in parallel, they should have the same state that the first one
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := healthcheck()
if !errors.Is(err, tc.want) {
t.Errorf("healthcheck() mismatch want %v got %v", tc.want, err)
}
}()
}
wg.Wait()
if counter != 2 {
t.Errorf("healthcheck() called etcd %d times, expected only two calls", counter)
}
})
}
}
func TestTimeTravelHealthcheck(t *testing.T) {
etcdConfig := testserver.NewTestConfig(t)
client := testserver.RunEtcd(t, etcdConfig)
newETCD3ClientFn := newETCD3Client
defer func() {
newETCD3Client = newETCD3ClientFn
}()
cfg := storagebackend.Config{
Type: storagebackend.StorageTypeETCD3,
Transport: storagebackend.TransportConfig{},
HealthcheckTimeout: 5 * time.Second,
}
cfg.Transport.ServerList = client.Endpoints()
ready := make(chan struct{})
signal := make(chan struct{})
var counter uint64
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
atomic.AddUint64(&counter, 1)
val := atomic.LoadUint64(&counter)
// the first request wait for a custom timeout to trigger an error.
// We don't use the context timeout because we want to check that
// the cached answer is not overridden, and since the rate limit is
// based on cfg.HealthcheckTimeout / 2, the timeout will race with
// the race limiter to server the new request from the cache or allow
// it to go through
if val == 1 {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After((2 * cfg.HealthcheckTimeout) / 3):
return nil, fmt.Errorf("etcd down")
}
}
// subsequent requests will always work
return nil, nil
},
}
client.KV = dummyKV
return client, nil
}
stop := make(chan struct{})
defer close(stop)
healthcheck, err := CreateHealthCheck(cfg, stop)
if err != nil {
t.Fatal(err)
}
// Wait for healthcheck to establish connection
<-ready
// run a first request that fails after 2 seconds
go func() {
err := healthcheck()
if !strings.Contains(err.Error(), "etcd down") {
t.Errorf("healthcheck() mismatch want %v got %v", fmt.Errorf("etcd down"), err)
}
close(signal)
}()
// wait until the rate limit allows new connections
time.Sleep(cfg.HealthcheckTimeout / 2)
select {
case <-signal:
t.Errorf("first request should not return yet")
default:
}
// a new run on request should succeed and increment the counter
err = healthcheck()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
c := atomic.LoadUint64(&counter)
if c != 2 {
t.Errorf("healthcheck() called etcd %d times, expected only two calls", c)
}
// cached request should be success and not be overridden by the late error
<-signal
err = healthcheck()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
c = atomic.LoadUint64(&counter)
if c != 2 {
t.Errorf("healthcheck() called etcd %d times, expected only two calls", c)
}
}