mirror of https://github.com/grpc/grpc-go.git
grpclb: move restartableListener to testutils (#4919)
This commit is contained in:
parent
d6aca733b3
commit
6d465fe912
|
|
@ -369,7 +369,7 @@ func startBackendsAndRemoteLoadBalancer(numberOfBackends int, customUserAgent st
|
|||
beIPs = append(beIPs, beLis.Addr().(*net.TCPAddr).IP)
|
||||
bePorts = append(bePorts, beLis.Addr().(*net.TCPAddr).Port)
|
||||
|
||||
beListeners = append(beListeners, newRestartableListener(beLis))
|
||||
beListeners = append(beListeners, testutils.NewRestartableListener(beLis))
|
||||
}
|
||||
backends := startBackends(beServerName, false, beListeners...)
|
||||
|
||||
|
|
@ -379,7 +379,7 @@ func startBackendsAndRemoteLoadBalancer(numberOfBackends int, customUserAgent st
|
|||
err = fmt.Errorf("failed to create the listener for the load balancer %v", err)
|
||||
return
|
||||
}
|
||||
lbLis = newRestartableListener(lbLis)
|
||||
lbLis = testutils.NewRestartableListener(lbLis)
|
||||
lbCreds := &serverNameCheckCreds{
|
||||
sn: lbServerName,
|
||||
}
|
||||
|
|
@ -846,8 +846,8 @@ func (s) TestFallback(t *testing.T) {
|
|||
}
|
||||
|
||||
// Close backend and remote balancer connections, should use fallback.
|
||||
tss.beListeners[0].(*restartableListener).stopPreviousConns()
|
||||
tss.lbListener.(*restartableListener).stopPreviousConns()
|
||||
tss.beListeners[0].(*testutils.RestartableListener).Stop()
|
||||
tss.lbListener.(*testutils.RestartableListener).Stop()
|
||||
|
||||
var fallbackUsed bool
|
||||
for i := 0; i < 2000; i++ {
|
||||
|
|
@ -871,8 +871,8 @@ func (s) TestFallback(t *testing.T) {
|
|||
}
|
||||
|
||||
// Restart backend and remote balancer, should not use fallback backend.
|
||||
tss.beListeners[0].(*restartableListener).restart()
|
||||
tss.lbListener.(*restartableListener).restart()
|
||||
tss.beListeners[0].(*testutils.RestartableListener).Restart()
|
||||
tss.lbListener.(*testutils.RestartableListener).Restart()
|
||||
tss.ls.sls <- sl
|
||||
|
||||
var backendUsed2 bool
|
||||
|
|
|
|||
|
|
@ -1,86 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package grpclb
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type tempError struct{}
|
||||
|
||||
func (*tempError) Error() string {
|
||||
return "grpclb test temporary error"
|
||||
}
|
||||
func (*tempError) Temporary() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type restartableListener struct {
|
||||
net.Listener
|
||||
addr string
|
||||
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
conns []net.Conn
|
||||
}
|
||||
|
||||
func newRestartableListener(l net.Listener) *restartableListener {
|
||||
return &restartableListener{
|
||||
Listener: l,
|
||||
addr: l.Addr().String(),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *restartableListener) Accept() (net.Conn, error) {
|
||||
conn, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if l.closed {
|
||||
conn.Close()
|
||||
return nil, &tempError{}
|
||||
}
|
||||
l.conns = append(l.conns, conn)
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (l *restartableListener) Close() error {
|
||||
return l.Listener.Close()
|
||||
}
|
||||
|
||||
func (l *restartableListener) stopPreviousConns() {
|
||||
l.mu.Lock()
|
||||
l.closed = true
|
||||
tmp := l.conns
|
||||
l.conns = nil
|
||||
l.mu.Unlock()
|
||||
for _, conn := range tmp {
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (l *restartableListener) restart() {
|
||||
l.mu.Lock()
|
||||
l.closed = false
|
||||
l.mu.Unlock()
|
||||
}
|
||||
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package testutils
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type tempError struct{}
|
||||
|
||||
func (*tempError) Error() string {
|
||||
return "restartable listener temporary error"
|
||||
}
|
||||
func (*tempError) Temporary() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// RestartableListener wraps a net.Listener and supports stopping and restarting
|
||||
// the latter.
|
||||
type RestartableListener struct {
|
||||
lis net.Listener
|
||||
|
||||
mu sync.Mutex
|
||||
stopped bool
|
||||
conns []net.Conn
|
||||
}
|
||||
|
||||
// NewRestartableListener returns a new RestartableListener wrapping l.
|
||||
func NewRestartableListener(l net.Listener) *RestartableListener {
|
||||
return &RestartableListener{lis: l}
|
||||
}
|
||||
|
||||
// Accept waits for and returns the next connection to the listener.
|
||||
//
|
||||
// If the listener is currently not accepting new connections, because `Stop`
|
||||
// was called on it, the connection is immediately closed after accepting
|
||||
// without any bytes being sent on it.
|
||||
func (l *RestartableListener) Accept() (net.Conn, error) {
|
||||
conn, err := l.lis.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if l.stopped {
|
||||
conn.Close()
|
||||
return nil, &tempError{}
|
||||
}
|
||||
l.conns = append(l.conns, conn)
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// Close closes the listener.
|
||||
func (l *RestartableListener) Close() error {
|
||||
return l.lis.Close()
|
||||
}
|
||||
|
||||
// Addr returns the listener's network address.
|
||||
func (l *RestartableListener) Addr() net.Addr {
|
||||
return l.lis.Addr()
|
||||
}
|
||||
|
||||
// Stop closes existing connections on the listener and prevents new connections
|
||||
// from being accepted.
|
||||
func (l *RestartableListener) Stop() {
|
||||
l.mu.Lock()
|
||||
l.stopped = true
|
||||
tmp := l.conns
|
||||
l.conns = nil
|
||||
l.mu.Unlock()
|
||||
for _, conn := range tmp {
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Restart gets a previously stopped listener to start accepting connections.
|
||||
func (l *RestartableListener) Restart() {
|
||||
l.mu.Lock()
|
||||
l.stopped = false
|
||||
l.mu.Unlock()
|
||||
}
|
||||
Loading…
Reference in New Issue