add a new generic filter goaway
Kubernetes-commit: 81f46b64a35f3af096d50620dfcc78b003de8263
This commit is contained in:
		
							parent
							
								
									31094a2234
								
							
						
					
					
						commit
						e5c6ec44de
					
				|  | @ -197,6 +197,12 @@ type Config struct { | |||
| 	// Predicate which is true for paths of long-running http requests
 | ||||
| 	LongRunningFunc apirequest.LongRunningRequestCheck | ||||
| 
 | ||||
| 	// GoawayChance is the probability that send a GOAWAY to HTTP/2 clients. When client received
 | ||||
| 	// GOAWAY, the in-flight requests will not be affected and new requests will use
 | ||||
| 	// a new TCP connection to triggering re-balancing to another server behind the load balance.
 | ||||
| 	// Default to 0, means never send GOAWAY. Max is 0.02 to prevent break the apiserver.
 | ||||
| 	GoawayChance float64 | ||||
| 
 | ||||
| 	// MergedResourceConfig indicates which groupVersion enabled and its resources enabled/disabled.
 | ||||
| 	// This is composed of genericapiserver defaultAPIResourceConfig and those parsed from flags.
 | ||||
| 	// If not specify any in flags, then genericapiserver will only enable defaultAPIResourceConfig.
 | ||||
|  | @ -671,6 +677,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { | |||
| 	handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout) | ||||
| 	handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup) | ||||
| 	handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) | ||||
| 	if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 { | ||||
| 		handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance) | ||||
| 	} | ||||
| 	handler = genericfilters.WithPanicRecovery(handler) | ||||
| 	return handler | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,88 @@ | |||
| /* | ||||
| Copyright 2020 The Kubernetes Authors. | ||||
| 
 | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
| 
 | ||||
|     http://www.apache.org/licenses/LICENSE-2.0
 | ||||
| 
 | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| package filters | ||||
| 
 | ||||
| import ( | ||||
| 	"math/rand" | ||||
| 	"net/http" | ||||
| 	"sync" | ||||
| ) | ||||
| 
 | ||||
| // GoawayDecider decides if server should send a GOAWAY
 | ||||
| type GoawayDecider interface { | ||||
| 	Goaway(r *http.Request) bool | ||||
| } | ||||
| 
 | ||||
| var ( | ||||
| 	// randPool used to get a rand.Rand and generate a random number thread-safely,
 | ||||
| 	// which improve the performance of using rand.Rand with a locker
 | ||||
| 	randPool = &sync.Pool{ | ||||
| 		New: func() interface{} { | ||||
| 			return rand.New(rand.NewSource(rand.Int63())) | ||||
| 		}, | ||||
| 	} | ||||
| ) | ||||
| 
 | ||||
| // WithProbabilisticGoaway returns an http.Handler that send GOAWAY probabilistically
 | ||||
| // according to the given chance for HTTP2 requests. After client receive GOAWAY,
 | ||||
| // the in-flight long-running requests will not be influenced, and the new requests
 | ||||
| // will use a new TCP connection to re-balancing to another server behind the load balance.
 | ||||
| func WithProbabilisticGoaway(inner http.Handler, chance float64) http.Handler { | ||||
| 	return &goaway{ | ||||
| 		handler: inner, | ||||
| 		decider: &probabilisticGoawayDecider{ | ||||
| 			chance: chance, | ||||
| 			next: func() float64 { | ||||
| 				rnd := randPool.Get().(*rand.Rand) | ||||
| 				ret := rnd.Float64() | ||||
| 				randPool.Put(rnd) | ||||
| 				return ret | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // goaway send a GOAWAY to client according to decider for HTTP2 requests
 | ||||
| type goaway struct { | ||||
| 	handler http.Handler | ||||
| 	decider GoawayDecider | ||||
| } | ||||
| 
 | ||||
| // ServeHTTP implement HTTP handler
 | ||||
| func (h *goaway) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||
| 	if r.Proto == "HTTP/2.0" && h.decider.Goaway(r) { | ||||
| 		// Send a GOAWAY and tear down the TCP connection when idle.
 | ||||
| 		w.Header().Set("Connection", "close") | ||||
| 	} | ||||
| 
 | ||||
| 	h.handler.ServeHTTP(w, r) | ||||
| } | ||||
| 
 | ||||
| // probabilisticGoawayDecider send GOAWAY probabilistically according to chance
 | ||||
| type probabilisticGoawayDecider struct { | ||||
| 	chance float64 | ||||
| 	next   func() float64 | ||||
| } | ||||
| 
 | ||||
| // Goaway implement GoawayDecider
 | ||||
| func (p *probabilisticGoawayDecider) Goaway(r *http.Request) bool { | ||||
| 	if p.next() < p.chance { | ||||
| 		return true | ||||
| 	} | ||||
| 
 | ||||
| 	return false | ||||
| } | ||||
|  | @ -0,0 +1,309 @@ | |||
| /* | ||||
| Copyright 2020 The Kubernetes Authors. | ||||
| 
 | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
| 
 | ||||
|     http://www.apache.org/licenses/LICENSE-2.0
 | ||||
| 
 | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| package filters | ||||
| 
 | ||||
| import ( | ||||
| 	"crypto/tls" | ||||
| 	"io" | ||||
| 	"math/rand" | ||||
| 	"net" | ||||
| 	"net/http" | ||||
| 	"net/http/httptest" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"golang.org/x/net/http2" | ||||
| ) | ||||
| 
 | ||||
| func TestProbabilisticGoawayDecider(t *testing.T) { | ||||
| 	cases := []struct { | ||||
| 		name         string | ||||
| 		chance       float64 | ||||
| 		nextFn       func(chance float64) func() float64 | ||||
| 		expectGOAWAY bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name:   "always not GOAWAY", | ||||
| 			chance: 0, | ||||
| 			nextFn: func(chance float64) func() float64 { | ||||
| 				return rand.Float64 | ||||
| 			}, | ||||
| 			expectGOAWAY: false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:   "always GOAWAY", | ||||
| 			chance: 1, | ||||
| 			nextFn: func(chance float64) func() float64 { | ||||
| 				return rand.Float64 | ||||
| 			}, | ||||
| 			expectGOAWAY: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:   "hit GOAWAY", | ||||
| 			chance: rand.Float64() + 0.01, | ||||
| 			nextFn: func(chance float64) func() float64 { | ||||
| 				return func() float64 { | ||||
| 					return chance - 0.001 | ||||
| 				} | ||||
| 			}, | ||||
| 			expectGOAWAY: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:   "does not hit GOAWAY", | ||||
| 			chance: rand.Float64() + 0.01, | ||||
| 			nextFn: func(chance float64) func() float64 { | ||||
| 				return func() float64 { | ||||
| 					return chance + 0.001 | ||||
| 				} | ||||
| 			}, | ||||
| 			expectGOAWAY: false, | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	for _, tc := range cases { | ||||
| 		t.Run(tc.name, func(t *testing.T) { | ||||
| 			d := probabilisticGoawayDecider{chance: tc.chance, next: tc.nextFn(tc.chance)} | ||||
| 			result := d.Goaway(nil) | ||||
| 			if result != tc.expectGOAWAY { | ||||
| 				t.Errorf("expect GOAWAY: %v, got: %v", tc.expectGOAWAY, result) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // TestClientReceivedGOAWAY tests the in-flight watch requests will not be affected and new requests use a
 | ||||
| // connection after client received GOAWAY, and server response watch request with GOAWAY will not break client
 | ||||
| // watching body read.
 | ||||
| func TestClientReceivedGOAWAY(t *testing.T) { | ||||
| 	const ( | ||||
| 		urlNormal          = "/normal" | ||||
| 		urlWatch           = "/watch" | ||||
| 		urlGoaway          = "/goaway" | ||||
| 		urlWatchWithGoaway = "/watch-with-goaway" | ||||
| 	) | ||||
| 
 | ||||
| 	const ( | ||||
| 		// indicate the bytes watch request will be sent
 | ||||
| 		// used to check if watch request was broke by GOAWAY
 | ||||
| 		watchExpectSendBytes = 5 | ||||
| 	) | ||||
| 
 | ||||
| 	watchHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		timer := time.NewTicker(time.Second) | ||||
| 
 | ||||
| 		w.Header().Set("Transfer-Encoding", "chunked") | ||||
| 		w.WriteHeader(200) | ||||
| 
 | ||||
| 		flusher, _ := w.(http.Flusher) | ||||
| 		flusher.Flush() | ||||
| 
 | ||||
| 		count := 0 | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-timer.C: | ||||
| 				n, err := w.Write([]byte("w")) | ||||
| 				if err != nil { | ||||
| 					return | ||||
| 				} | ||||
| 				flusher.Flush() | ||||
| 				count += n | ||||
| 				if count == watchExpectSendBytes { | ||||
| 					return | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	}) | ||||
| 
 | ||||
| 	mux := http.NewServeMux() | ||||
| 	mux.Handle(urlNormal, WithProbabilisticGoaway(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		w.WriteHeader(http.StatusOK) | ||||
| 		w.Write([]byte("hello")) | ||||
| 		return | ||||
| 	}), 0)) | ||||
| 	mux.Handle(urlWatch, WithProbabilisticGoaway(watchHandler, 0)) | ||||
| 	mux.Handle(urlGoaway, WithProbabilisticGoaway(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		w.WriteHeader(http.StatusOK) | ||||
| 		w.Write([]byte("hello")) | ||||
| 		return | ||||
| 	}), 1)) | ||||
| 	mux.Handle(urlWatchWithGoaway, WithProbabilisticGoaway(watchHandler, 1)) | ||||
| 
 | ||||
| 	s := httptest.NewUnstartedServer(mux) | ||||
| 
 | ||||
| 	http2Options := &http2.Server{} | ||||
| 
 | ||||
| 	if err := http2.ConfigureServer(s.Config, http2Options); err != nil { | ||||
| 		t.Fatalf("failed to configure test server to be HTTP2 server, err: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	s.TLS = s.Config.TLSConfig | ||||
| 	s.StartTLS() | ||||
| 	defer s.Close() | ||||
| 
 | ||||
| 	tlsConfig := &tls.Config{ | ||||
| 		InsecureSkipVerify: true, | ||||
| 		NextProtos:         []string{http2.NextProtoTLS}, | ||||
| 	} | ||||
| 
 | ||||
| 	cases := []struct { | ||||
| 		name string | ||||
| 		reqs []string | ||||
| 		// expectConnections always equals to GOAWAY requests(urlGoaway or urlWatchWithGoaway) + 1
 | ||||
| 		expectConnections int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name:              "all normal requests use only one connection", | ||||
| 			reqs:              []string{urlNormal, urlNormal, urlNormal}, | ||||
| 			expectConnections: 1, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:              "got GOAWAY after set-up watch", | ||||
| 			reqs:              []string{urlNormal, urlWatch, urlGoaway, urlNormal, urlNormal}, | ||||
| 			expectConnections: 2, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:              "got GOAWAY after set-up watch, and set-up a new watch", | ||||
| 			reqs:              []string{urlNormal, urlWatch, urlGoaway, urlWatch, urlNormal, urlNormal}, | ||||
| 			expectConnections: 2, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:              "got 2 GOAWAY after set-up watch", | ||||
| 			reqs:              []string{urlNormal, urlWatch, urlGoaway, urlGoaway, urlNormal, urlNormal}, | ||||
| 			expectConnections: 3, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:              "combine with watch-with-goaway", | ||||
| 			reqs:              []string{urlNormal, urlWatchWithGoaway, urlNormal, urlWatch, urlGoaway, urlNormal, urlNormal}, | ||||
| 			expectConnections: 3, | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	for _, tc := range cases { | ||||
| 		t.Run(tc.name, func(t *testing.T) { | ||||
| 			// localAddr indicates how many TCP connection set up
 | ||||
| 			localAddr := make([]string, 0) | ||||
| 
 | ||||
| 			// init HTTP2 client
 | ||||
| 			client := http.Client{ | ||||
| 				Transport: &http2.Transport{ | ||||
| 					TLSClientConfig: tlsConfig, | ||||
| 					DialTLS: func(network, addr string, cfg *tls.Config) (conn net.Conn, err error) { | ||||
| 						conn, err = tls.Dial(network, addr, cfg) | ||||
| 						if err != nil { | ||||
| 							t.Fatalf("unexpect connection err: %v", err) | ||||
| 						} | ||||
| 						localAddr = append(localAddr, conn.LocalAddr().String()) | ||||
| 						return | ||||
| 					}, | ||||
| 				}, | ||||
| 			} | ||||
| 
 | ||||
| 			watchChs := make([]chan int, 0) | ||||
| 			for _, url := range tc.reqs { | ||||
| 				req, err := http.NewRequest(http.MethodGet, s.URL+url, nil) | ||||
| 				if err != nil { | ||||
| 					t.Fatalf("unexpect new request error: %v", err) | ||||
| 				} | ||||
| 				resp, err := client.Do(req) | ||||
| 				if err != nil { | ||||
| 					t.Fatalf("failed request test server, err: %v", err) | ||||
| 				} | ||||
| 
 | ||||
| 				// encounter watch bytes received, does not expect to be broken
 | ||||
| 				if url == urlWatch || url == urlWatchWithGoaway { | ||||
| 					ch := make(chan int) | ||||
| 					watchChs = append(watchChs, ch) | ||||
| 					go func() { | ||||
| 						count := 0 | ||||
| 						for { | ||||
| 							buffer := make([]byte, 1) | ||||
| 							n, err := resp.Body.Read(buffer) | ||||
| 							if err != nil { | ||||
| 								// urlWatch will receive io.EOF,
 | ||||
| 								// urlWatchWithGoaway will receive http2.GoAwayError
 | ||||
| 								if err != io.EOF { | ||||
| 									if _, ok := err.(http2.GoAwayError); !ok { | ||||
| 										t.Errorf("watch received not EOF err: %v", err) | ||||
| 									} | ||||
| 								} | ||||
| 								ch <- count | ||||
| 								return | ||||
| 							} | ||||
| 							count += n | ||||
| 						} | ||||
| 					}() | ||||
| 				} | ||||
| 			} | ||||
| 
 | ||||
| 			// check TCP connection count
 | ||||
| 			if tc.expectConnections != len(localAddr) { | ||||
| 				t.Fatalf("expect TCP connection: %d, actual: %d", tc.expectConnections, len(localAddr)) | ||||
| 			} | ||||
| 
 | ||||
| 			// check if watch request is broken by GOAWAY response
 | ||||
| 			watchTimeout := time.NewTimer(time.Second * 10) | ||||
| 			for _, watchCh := range watchChs { | ||||
| 				select { | ||||
| 				case n := <-watchCh: | ||||
| 					if n != watchExpectSendBytes { | ||||
| 						t.Fatalf("in-flight watch was broken by GOAWAY response, expect go bytes: %d, actual got: %d", watchExpectSendBytes, n) | ||||
| 					} | ||||
| 				case <-watchTimeout.C: | ||||
| 					t.Error("watch receive timeout") | ||||
| 				} | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestHTTP1Requests(t *testing.T) { | ||||
| 	s := httptest.NewUnstartedServer(WithProbabilisticGoaway(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		w.WriteHeader(http.StatusOK) | ||||
| 		w.Write([]byte("hello")) | ||||
| 		return | ||||
| 	}), 1)) | ||||
| 
 | ||||
| 	http2Options := &http2.Server{} | ||||
| 
 | ||||
| 	if err := http2.ConfigureServer(s.Config, http2Options); err != nil { | ||||
| 		t.Fatalf("failed to configure test server to be HTTP2 server, err: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	s.TLS = s.Config.TLSConfig | ||||
| 	s.StartTLS() | ||||
| 	defer s.Close() | ||||
| 
 | ||||
| 	tlsConfig := &tls.Config{ | ||||
| 		InsecureSkipVerify: true, | ||||
| 		NextProtos:         []string{"http/1.1"}, | ||||
| 	} | ||||
| 
 | ||||
| 	client := http.Client{ | ||||
| 		Transport: &http.Transport{ | ||||
| 			TLSClientConfig: tlsConfig, | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	resp, err := client.Get(s.URL) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("failed to request the server, err: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if v := resp.Header.Get("Connection"); v != "" { | ||||
| 		t.Errorf("expect response HTTP header Connection to be empty, but got: %s", v) | ||||
| 	} | ||||
| } | ||||
|  | @ -38,6 +38,7 @@ type ServerRunOptions struct { | |||
| 	MaxRequestsInFlight         int | ||||
| 	MaxMutatingRequestsInFlight int | ||||
| 	RequestTimeout              time.Duration | ||||
| 	GoawayChance                float64 | ||||
| 	LivezGracePeriod            time.Duration | ||||
| 	MinRequestTimeout           int | ||||
| 	ShutdownDelayDuration       time.Duration | ||||
|  | @ -76,6 +77,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error { | |||
| 	c.MaxMutatingRequestsInFlight = s.MaxMutatingRequestsInFlight | ||||
| 	c.LivezGracePeriod = s.LivezGracePeriod | ||||
| 	c.RequestTimeout = s.RequestTimeout | ||||
| 	c.GoawayChance = s.GoawayChance | ||||
| 	c.MinRequestTimeout = s.MinRequestTimeout | ||||
| 	c.ShutdownDelayDuration = s.ShutdownDelayDuration | ||||
| 	c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes | ||||
|  | @ -125,6 +127,10 @@ func (s *ServerRunOptions) Validate() []error { | |||
| 		errors = append(errors, fmt.Errorf("--request-timeout can not be negative value")) | ||||
| 	} | ||||
| 
 | ||||
| 	if s.GoawayChance < 0 || s.GoawayChance > 0.02 { | ||||
| 		errors = append(errors, fmt.Errorf("--goaway-chance can not be less than 0 or greater than 0.02")) | ||||
| 	} | ||||
| 
 | ||||
| 	if s.MinRequestTimeout < 0 { | ||||
| 		errors = append(errors, fmt.Errorf("--min-request-timeout can not be negative value")) | ||||
| 	} | ||||
|  | @ -182,6 +188,12 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { | |||
| 		"it out. This is the default request timeout for requests but may be overridden by flags such as "+ | ||||
| 		"--min-request-timeout for specific types of requests.") | ||||
| 
 | ||||
| 	fs.Float64Var(&s.GoawayChance, "goaway-chance", s.GoawayChance, ""+ | ||||
| 		"To prevent HTTP/2 clients from getting stuck on a single apiserver, randomly close a connection (GOAWAY). "+ | ||||
| 		"The client's other in-flight requests won't be affected, and the client will reconnect, likely landing on a different apiserver after going through the load balancer again. "+ | ||||
| 		"This argument sets the fraction of requests that will be sent a GOAWAY. Clusters with single apiservers, or which don't use a load balancer, should NOT enable this. "+ | ||||
| 		"Min is 0 (off), Max is .02 (1/50 requests); .001 (1/1000) is a recommended starting point.") | ||||
| 
 | ||||
| 	fs.DurationVar(&s.LivezGracePeriod, "livez-grace-period", s.LivezGracePeriod, ""+ | ||||
| 		"This option represents the maximum amount of time it should take for apiserver to complete its startup sequence "+ | ||||
| 		"and become live. From apiserver's start time to when this amount of time has elapsed, /livez will assume "+ | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue