From e80bcb7e66283967ad05845a2244409174e33eec Mon Sep 17 00:00:00 2001 From: Martin Gencur Date: Thu, 14 May 2020 20:11:00 +0200 Subject: [PATCH] Functions common to HA tests (#1259) * Functions common to HA tests * Fix format errors * More robust GetEndpoints function * Do not use listOptions in getting endpoints * re-use more existing code * Fix format * Rename GetLeader to WaitForNewLeader * Fix fmt * Simplify WaitForPodDeleted * Remove span from WaitForChangedEndpoints --- test/ha/ha.go | 45 +++++++++++++++++++++++++++++++++++++++ test/kube_checks.go | 52 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 test/ha/ha.go diff --git a/test/ha/ha.go b/test/ha/ha.go new file mode 100644 index 000000000..26326358f --- /dev/null +++ b/test/ha/ha.go @@ -0,0 +1,45 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ha + +import ( + "context" + "fmt" + "strings" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "knative.dev/pkg/test" + "knative.dev/pkg/test/logging" +) + +// WaitForNewLeader waits until the holder of the given lease is different from the previousLeader. +func WaitForNewLeader(client *test.KubeClient, lease, namespace, previousLeader string) (string, error) { + span := logging.GetEmitableSpan(context.Background(), "WaitForNewLeader/"+lease) + defer span.End() + var leader string + err := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { + lease, err := client.Kube.CoordinationV1().Leases(namespace).Get(lease, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("error getting lease %s: %w", lease, err) + } + leader = strings.Split(*lease.Spec.HolderIdentity, "_")[0] + return leader != previousLeader, nil + }) + return leader, err +} diff --git a/test/kube_checks.go b/test/kube_checks.go index 35bcbd201..61a259ac9 100644 --- a/test/kube_checks.go +++ b/test/kube_checks.go @@ -25,8 +25,10 @@ import ( "strings" "time" + "github.com/google/go-cmp/cmp" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" k8styped "k8s.io/client-go/kubernetes/typed/core/v1" @@ -93,6 +95,19 @@ func WaitForPodState(client *KubeClient, inState func(p *corev1.Pod) (bool, erro }) } +// WaitForPodDeleted waits for the given pod to disappear from the given namespace. +func WaitForPodDeleted(client *KubeClient, name, namespace string) error { + if err := WaitForPodState(client, func(p *corev1.Pod) (bool, error) { + // Always return false. We're oly interested in the error which indicates pod deletion or timeout. + return false, nil + }, name, namespace); err != nil { + if !apierrs.IsNotFound(err) { + return err + } + } + return nil +} + // WaitForServiceHasAtLeastOneEndpoint polls the status of the specified Service // from client every interval until number of service endpoints = numOfEndpoints func WaitForServiceEndpoints(client *KubeClient, svcName string, svcNamespace string, numOfEndpoints int) error { @@ -121,6 +136,29 @@ func countEndpointsNum(e *corev1.Endpoints) int { return num } +// GetEndpointAddresses returns addresses of endpoints for the given service. +func GetEndpointAddresses(client *KubeClient, svcName, svcNamespace string) ([]string, error) { + endpoints, err := client.Kube.CoreV1().Endpoints(svcNamespace).Get(svcName, metav1.GetOptions{}) + if err != nil || countEndpointsNum(endpoints) == 0 { + return nil, fmt.Errorf("no endpoints or error: %w", err) + } + var hosts []string + for _, sub := range endpoints.Subsets { + for _, addr := range sub.Addresses { + hosts = append(hosts, addr.IP) + } + } + return hosts, nil +} + +// WaitForChangedEndpoints waits until the endpoints for the given service differ from origEndpoints. +func WaitForChangedEndpoints(client *KubeClient, svcName, svcNamespace string, origEndpoints []string) error { + return wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) { + newEndpoints, err := GetEndpointAddresses(client, svcName, svcNamespace) + return !cmp.Equal(origEndpoints, newEndpoints), err + }) +} + // GetConfigMap gets the configmaps for a given namespace func GetConfigMap(client *KubeClient, namespace string) k8styped.ConfigMapInterface { return client.Kube.CoreV1().ConfigMaps(namespace) @@ -176,3 +214,17 @@ func PodsRunning(podList *corev1.PodList) (bool, error) { func PodRunning(pod *corev1.Pod) bool { return pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodSucceeded } + +// WaitForDeploymentScale waits until the given deployment has the expected scale. +func WaitForDeploymentScale(client *KubeClient, name, namespace string, scale int) error { + return WaitForDeploymentState( + client, + name, + func(d *appsv1.Deployment) (bool, error) { + return d.Status.ReadyReplicas == int32(scale), nil + }, + "DeploymentIsScaled", + namespace, + time.Minute, + ) +}