Move etcdtest to testing
Kubernetes-commit: 28fa6de62933bfb5d5b7c36db60873a8c8d85dea
This commit is contained in:
parent
01c4fe2028
commit
c8fac58c15
|
@ -1,17 +0,0 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcdtest // import "k8s.io/apiserver/pkg/storage/etcd/etcdtest"
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcdtest
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
// Returns the prefix set via the ETCD_PREFIX environment variable (if any).
|
||||
func PathPrefix() string {
|
||||
pref := os.Getenv("ETCD_PREFIX")
|
||||
if pref == "" {
|
||||
pref = "registry"
|
||||
}
|
||||
return path.Join("/", pref)
|
||||
}
|
||||
|
||||
// Adds the ETCD_PREFIX to the provided key
|
||||
func AddPrefix(in string) string {
|
||||
return path.Join(PathPrefix(), in)
|
||||
}
|
|
@ -0,0 +1,303 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package testing
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/storage/etcd/testing/testingcert"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
|
||||
"context"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
|
||||
type EtcdTestServer struct {
|
||||
// The following are lumped etcd2 test server params
|
||||
// TODO: Deprecate in a post 1.5 release
|
||||
etcdserver.ServerConfig
|
||||
PeerListeners, ClientListeners []net.Listener
|
||||
Client etcd.Client
|
||||
|
||||
CertificatesDir string
|
||||
CertFile string
|
||||
KeyFile string
|
||||
CAFile string
|
||||
|
||||
raftHandler http.Handler
|
||||
s *etcdserver.EtcdServer
|
||||
hss []*httptest.Server
|
||||
|
||||
// The following are lumped etcd3 test server params
|
||||
v3Cluster *integration.ClusterV3
|
||||
V3Client *clientv3.Client
|
||||
}
|
||||
|
||||
// newLocalListener opens a port localhost using any port
|
||||
func newLocalListener(t *testing.T) net.Listener {
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
// newSecuredLocalListener opens a port localhost using any port
|
||||
// with SSL enable
|
||||
func newSecuredLocalListener(t *testing.T, certFile, keyFile, caFile string) net.Listener {
|
||||
var l net.Listener
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
CAFile: caFile,
|
||||
}
|
||||
tlscfg, err := tlsInfo.ServerConfig()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected serverConfig error: %v", err)
|
||||
}
|
||||
l, err = transport.NewKeepAliveListener(l, "https", tlscfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
func newHttpTransport(t *testing.T, certFile, keyFile, caFile string) etcd.CancelableTransport {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
CAFile: caFile,
|
||||
}
|
||||
tr, err := transport.NewTransport(tlsInfo, time.Second)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return tr
|
||||
}
|
||||
|
||||
// configureTestCluster will set the params to start an etcd server
|
||||
func configureTestCluster(t *testing.T, name string, https bool) *EtcdTestServer {
|
||||
var err error
|
||||
m := &EtcdTestServer{}
|
||||
|
||||
pln := newLocalListener(t)
|
||||
m.PeerListeners = []net.Listener{pln}
|
||||
m.PeerURLs, err = types.NewURLs([]string{"http://" + pln.Addr().String()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Allow test launches to control where etcd data goes, for space or performance reasons
|
||||
baseDir := os.Getenv("TEST_ETCD_DIR")
|
||||
if len(baseDir) == 0 {
|
||||
baseDir = os.TempDir()
|
||||
}
|
||||
|
||||
if https {
|
||||
m.CertificatesDir, err = ioutil.TempDir(baseDir, "etcd_certificates")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.CertFile = path.Join(m.CertificatesDir, "etcdcert.pem")
|
||||
if err = ioutil.WriteFile(m.CertFile, []byte(testingcert.CertFileContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.KeyFile = path.Join(m.CertificatesDir, "etcdkey.pem")
|
||||
if err = ioutil.WriteFile(m.KeyFile, []byte(testingcert.KeyFileContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.CAFile = path.Join(m.CertificatesDir, "ca.pem")
|
||||
if err = ioutil.WriteFile(m.CAFile, []byte(testingcert.CAFileContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cln := newSecuredLocalListener(t, m.CertFile, m.KeyFile, m.CAFile)
|
||||
m.ClientListeners = []net.Listener{cln}
|
||||
m.ClientURLs, err = types.NewURLs([]string{"https://" + cln.Addr().String()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
} else {
|
||||
cln := newLocalListener(t)
|
||||
m.ClientListeners = []net.Listener{cln}
|
||||
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
m.AuthToken = "simple"
|
||||
m.Name = name
|
||||
m.DataDir, err = ioutil.TempDir(baseDir, "etcd")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
clusterStr := fmt.Sprintf("%s=http://%s", name, pln.Addr().String())
|
||||
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.InitialClusterToken = "TestEtcd"
|
||||
m.NewCluster = true
|
||||
m.ForceNewCluster = false
|
||||
m.ElectionTicks = 10
|
||||
m.TickMs = uint(10)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// launch will attempt to start the etcd server
|
||||
func (m *EtcdTestServer) launch(t *testing.T) error {
|
||||
var err error
|
||||
if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
|
||||
return fmt.Errorf("failed to initialize the etcd server: %v", err)
|
||||
}
|
||||
m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
|
||||
m.s.Start()
|
||||
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
|
||||
for _, ln := range m.PeerListeners {
|
||||
hs := &httptest.Server{
|
||||
Listener: ln,
|
||||
Config: &http.Server{Handler: m.raftHandler},
|
||||
}
|
||||
hs.Start()
|
||||
m.hss = append(m.hss, hs)
|
||||
}
|
||||
for _, ln := range m.ClientListeners {
|
||||
hs := &httptest.Server{
|
||||
Listener: ln,
|
||||
Config: &http.Server{Handler: v2http.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
|
||||
}
|
||||
hs.Start()
|
||||
m.hss = append(m.hss, hs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForEtcd wait until etcd is propagated correctly
|
||||
func (m *EtcdTestServer) waitUntilUp() error {
|
||||
membersAPI := etcd.NewMembersAPI(m.Client)
|
||||
for start := time.Now(); time.Since(start) < wait.ForeverTestTimeout; time.Sleep(10 * time.Millisecond) {
|
||||
members, err := membersAPI.List(context.TODO())
|
||||
if err != nil {
|
||||
klog.Errorf("Error when getting etcd cluster members")
|
||||
continue
|
||||
}
|
||||
if len(members) == 1 && len(members[0].ClientURLs) > 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("timeout on waiting for etcd cluster")
|
||||
}
|
||||
|
||||
// Terminate will shutdown the running etcd server
|
||||
func (m *EtcdTestServer) Terminate(t *testing.T) {
|
||||
if m.v3Cluster != nil {
|
||||
m.v3Cluster.Terminate(t)
|
||||
} else {
|
||||
m.Client = nil
|
||||
m.s.Stop()
|
||||
// TODO: This is a pretty ugly hack to workaround races during closing
|
||||
// in-memory etcd server in unit tests - see #18928 for more details.
|
||||
// We should get rid of it as soon as we have a proper fix - etcd clients
|
||||
// have overwritten transport counting opened connections (probably by
|
||||
// overwriting Dial function) and termination function waiting for all
|
||||
// connections to be closed and stopping accepting new ones.
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
for _, hs := range m.hss {
|
||||
hs.CloseClientConnections()
|
||||
hs.Close()
|
||||
}
|
||||
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(m.CertificatesDir) > 0 {
|
||||
if err := os.RemoveAll(m.CertificatesDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewEtcdTestClientServer DEPRECATED creates a new client and server for testing
|
||||
func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
|
||||
server := configureTestCluster(t, "foo", true)
|
||||
err := server.launch(t)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start etcd server error=%v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
cfg := etcd.Config{
|
||||
Endpoints: server.ClientURLs.StringSlice(),
|
||||
Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile),
|
||||
}
|
||||
server.Client, err = etcd.New(cfg)
|
||||
if err != nil {
|
||||
server.Terminate(t)
|
||||
t.Fatalf("Unexpected error in NewEtcdTestClientServer (%v)", err)
|
||||
return nil
|
||||
}
|
||||
if err := server.waitUntilUp(); err != nil {
|
||||
server.Terminate(t)
|
||||
t.Fatalf("Unexpected error in waitUntilUp (%v)", err)
|
||||
return nil
|
||||
}
|
||||
return server
|
||||
}
|
||||
|
||||
// NewEtcd3TestClientServer creates a new client and server for testing
|
||||
func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) {
|
||||
server := &EtcdTestServer{
|
||||
v3Cluster: integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}),
|
||||
}
|
||||
server.V3Client = server.v3Cluster.RandClient()
|
||||
config := &storagebackend.Config{
|
||||
Type: "etcd3",
|
||||
Prefix: PathPrefix(),
|
||||
Transport: storagebackend.TransportConfig{
|
||||
ServerList: server.V3Client.Endpoints(),
|
||||
},
|
||||
Paging: true,
|
||||
}
|
||||
return server, config
|
||||
}
|
|
@ -17,288 +17,20 @@ limitations under the License.
|
|||
package testing
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/storage/etcd/etcdtest"
|
||||
"k8s.io/apiserver/pkg/storage/etcd/testing/testingcert"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
|
||||
"context"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
|
||||
type EtcdTestServer struct {
|
||||
// The following are lumped etcd2 test server params
|
||||
// TODO: Deprecate in a post 1.5 release
|
||||
etcdserver.ServerConfig
|
||||
PeerListeners, ClientListeners []net.Listener
|
||||
Client etcd.Client
|
||||
|
||||
CertificatesDir string
|
||||
CertFile string
|
||||
KeyFile string
|
||||
CAFile string
|
||||
|
||||
raftHandler http.Handler
|
||||
s *etcdserver.EtcdServer
|
||||
hss []*httptest.Server
|
||||
|
||||
// The following are lumped etcd3 test server params
|
||||
v3Cluster *integration.ClusterV3
|
||||
V3Client *clientv3.Client
|
||||
// Returns the prefix set via the ETCD_PREFIX environment variable (if any).
|
||||
func PathPrefix() string {
|
||||
pref := os.Getenv("ETCD_PREFIX")
|
||||
if pref == "" {
|
||||
pref = "registry"
|
||||
}
|
||||
return path.Join("/", pref)
|
||||
}
|
||||
|
||||
// newLocalListener opens a port localhost using any port
|
||||
func newLocalListener(t *testing.T) net.Listener {
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
// newSecuredLocalListener opens a port localhost using any port
|
||||
// with SSL enable
|
||||
func newSecuredLocalListener(t *testing.T, certFile, keyFile, caFile string) net.Listener {
|
||||
var l net.Listener
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
CAFile: caFile,
|
||||
}
|
||||
tlscfg, err := tlsInfo.ServerConfig()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected serverConfig error: %v", err)
|
||||
}
|
||||
l, err = transport.NewKeepAliveListener(l, "https", tlscfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
func newHttpTransport(t *testing.T, certFile, keyFile, caFile string) etcd.CancelableTransport {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
CAFile: caFile,
|
||||
}
|
||||
tr, err := transport.NewTransport(tlsInfo, time.Second)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return tr
|
||||
}
|
||||
|
||||
// configureTestCluster will set the params to start an etcd server
|
||||
func configureTestCluster(t *testing.T, name string, https bool) *EtcdTestServer {
|
||||
var err error
|
||||
m := &EtcdTestServer{}
|
||||
|
||||
pln := newLocalListener(t)
|
||||
m.PeerListeners = []net.Listener{pln}
|
||||
m.PeerURLs, err = types.NewURLs([]string{"http://" + pln.Addr().String()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Allow test launches to control where etcd data goes, for space or performance reasons
|
||||
baseDir := os.Getenv("TEST_ETCD_DIR")
|
||||
if len(baseDir) == 0 {
|
||||
baseDir = os.TempDir()
|
||||
}
|
||||
|
||||
if https {
|
||||
m.CertificatesDir, err = ioutil.TempDir(baseDir, "etcd_certificates")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.CertFile = path.Join(m.CertificatesDir, "etcdcert.pem")
|
||||
if err = ioutil.WriteFile(m.CertFile, []byte(testingcert.CertFileContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.KeyFile = path.Join(m.CertificatesDir, "etcdkey.pem")
|
||||
if err = ioutil.WriteFile(m.KeyFile, []byte(testingcert.KeyFileContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.CAFile = path.Join(m.CertificatesDir, "ca.pem")
|
||||
if err = ioutil.WriteFile(m.CAFile, []byte(testingcert.CAFileContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cln := newSecuredLocalListener(t, m.CertFile, m.KeyFile, m.CAFile)
|
||||
m.ClientListeners = []net.Listener{cln}
|
||||
m.ClientURLs, err = types.NewURLs([]string{"https://" + cln.Addr().String()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
} else {
|
||||
cln := newLocalListener(t)
|
||||
m.ClientListeners = []net.Listener{cln}
|
||||
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
m.AuthToken = "simple"
|
||||
m.Name = name
|
||||
m.DataDir, err = ioutil.TempDir(baseDir, "etcd")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
clusterStr := fmt.Sprintf("%s=http://%s", name, pln.Addr().String())
|
||||
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.InitialClusterToken = "TestEtcd"
|
||||
m.NewCluster = true
|
||||
m.ForceNewCluster = false
|
||||
m.ElectionTicks = 10
|
||||
m.TickMs = uint(10)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// launch will attempt to start the etcd server
|
||||
func (m *EtcdTestServer) launch(t *testing.T) error {
|
||||
var err error
|
||||
if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
|
||||
return fmt.Errorf("failed to initialize the etcd server: %v", err)
|
||||
}
|
||||
m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
|
||||
m.s.Start()
|
||||
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
|
||||
for _, ln := range m.PeerListeners {
|
||||
hs := &httptest.Server{
|
||||
Listener: ln,
|
||||
Config: &http.Server{Handler: m.raftHandler},
|
||||
}
|
||||
hs.Start()
|
||||
m.hss = append(m.hss, hs)
|
||||
}
|
||||
for _, ln := range m.ClientListeners {
|
||||
hs := &httptest.Server{
|
||||
Listener: ln,
|
||||
Config: &http.Server{Handler: v2http.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
|
||||
}
|
||||
hs.Start()
|
||||
m.hss = append(m.hss, hs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForEtcd wait until etcd is propagated correctly
|
||||
func (m *EtcdTestServer) waitUntilUp() error {
|
||||
membersAPI := etcd.NewMembersAPI(m.Client)
|
||||
for start := time.Now(); time.Since(start) < wait.ForeverTestTimeout; time.Sleep(10 * time.Millisecond) {
|
||||
members, err := membersAPI.List(context.TODO())
|
||||
if err != nil {
|
||||
klog.Errorf("Error when getting etcd cluster members")
|
||||
continue
|
||||
}
|
||||
if len(members) == 1 && len(members[0].ClientURLs) > 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("timeout on waiting for etcd cluster")
|
||||
}
|
||||
|
||||
// Terminate will shutdown the running etcd server
|
||||
func (m *EtcdTestServer) Terminate(t *testing.T) {
|
||||
if m.v3Cluster != nil {
|
||||
m.v3Cluster.Terminate(t)
|
||||
} else {
|
||||
m.Client = nil
|
||||
m.s.Stop()
|
||||
// TODO: This is a pretty ugly hack to workaround races during closing
|
||||
// in-memory etcd server in unit tests - see #18928 for more details.
|
||||
// We should get rid of it as soon as we have a proper fix - etcd clients
|
||||
// have overwritten transport counting opened connections (probably by
|
||||
// overwriting Dial function) and termination function waiting for all
|
||||
// connections to be closed and stopping accepting new ones.
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
for _, hs := range m.hss {
|
||||
hs.CloseClientConnections()
|
||||
hs.Close()
|
||||
}
|
||||
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(m.CertificatesDir) > 0 {
|
||||
if err := os.RemoveAll(m.CertificatesDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewEtcdTestClientServer DEPRECATED creates a new client and server for testing
|
||||
func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
|
||||
server := configureTestCluster(t, "foo", true)
|
||||
err := server.launch(t)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start etcd server error=%v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
cfg := etcd.Config{
|
||||
Endpoints: server.ClientURLs.StringSlice(),
|
||||
Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile),
|
||||
}
|
||||
server.Client, err = etcd.New(cfg)
|
||||
if err != nil {
|
||||
server.Terminate(t)
|
||||
t.Fatalf("Unexpected error in NewEtcdTestClientServer (%v)", err)
|
||||
return nil
|
||||
}
|
||||
if err := server.waitUntilUp(); err != nil {
|
||||
server.Terminate(t)
|
||||
t.Fatalf("Unexpected error in waitUntilUp (%v)", err)
|
||||
return nil
|
||||
}
|
||||
return server
|
||||
}
|
||||
|
||||
// NewEtcd3TestClientServer creates a new client and server for testing
|
||||
func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) {
|
||||
server := &EtcdTestServer{
|
||||
v3Cluster: integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}),
|
||||
}
|
||||
server.V3Client = server.v3Cluster.RandClient()
|
||||
config := &storagebackend.Config{
|
||||
Type: "etcd3",
|
||||
Prefix: etcdtest.PathPrefix(),
|
||||
Transport: storagebackend.TransportConfig{
|
||||
ServerList: server.V3Client.Endpoints(),
|
||||
},
|
||||
Paging: true,
|
||||
}
|
||||
return server, config
|
||||
// Adds the ETCD_PREFIX to the provided key
|
||||
func AddPrefix(in string) string {
|
||||
return path.Join(PathPrefix(), in)
|
||||
}
|
||||
|
|
|
@ -44,7 +44,6 @@ import (
|
|||
"k8s.io/apiserver/pkg/storage"
|
||||
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
|
||||
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
|
||||
"k8s.io/apiserver/pkg/storage/etcd/etcdtest"
|
||||
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||
|
@ -148,7 +147,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl
|
|||
}
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
|
@ -179,7 +178,7 @@ func TestGet(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGetToList(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
|
@ -235,7 +234,7 @@ func TestGetToList(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
|
@ -316,7 +315,7 @@ func TestList(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestInfiniteList(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher, v := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
|
@ -370,7 +369,7 @@ func (self *injectListError) List(ctx context.Context, key string, resourceVersi
|
|||
}
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||
// Inject one list error to make sure we test the relist case.
|
||||
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
|
||||
defer server.Terminate(t)
|
||||
|
@ -447,7 +446,7 @@ func TestWatch(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWatcherTimeout(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
|
@ -489,7 +488,7 @@ func TestWatcherTimeout(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFiltering(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
|
@ -551,7 +550,7 @@ func TestFiltering(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestStartingResourceVersion(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher, v := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
|
@ -599,7 +598,7 @@ func TestStartingResourceVersion(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEmptyWatchEventCache(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
|
||||
// add a few objects
|
||||
|
@ -663,7 +662,7 @@ func TestEmptyWatchEventCache(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRandomWatchDeliver(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher, v := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
|
@ -789,7 +788,7 @@ func TestCacherListerWatcherPagination(t *testing.T) {
|
|||
func TestWatchDispatchBookmarkEvents(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
|
||||
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher, v := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
|
@ -851,7 +850,7 @@ func TestWatchDispatchBookmarkEvents(t *testing.T) {
|
|||
func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
|
||||
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher, v := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
|
|
Loading…
Reference in New Issue