437 lines
12 KiB
Go
437 lines
12 KiB
Go
/*
|
|
Copyright 2020 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package rke2
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"testing"
|
|
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
"github.com/rancher/cluster-api-provider-rke2/pkg/etcd"
|
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
rbacv1 "k8s.io/api/rbac/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
|
|
etcdfake "github.com/rancher/cluster-api-provider-rke2/pkg/etcd/fake"
|
|
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
|
|
"sigs.k8s.io/cluster-api/util/patch"
|
|
)
|
|
|
|
var _ = Describe("ETCD safe member removal", Ordered, func() {
|
|
var (
|
|
err error
|
|
ns *corev1.Namespace
|
|
node *corev1.Node
|
|
nodeName = "test-node"
|
|
machine *clusterv1.Machine
|
|
w *Workload
|
|
patchHelper *patch.Helper
|
|
)
|
|
BeforeAll(func() {
|
|
ns, err = testEnv.CreateNamespace(ctx, "ns")
|
|
Expect(err).ToNot(HaveOccurred())
|
|
node = &corev1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: nodeName,
|
|
Labels: map[string]string{labelNodeRoleControlPlane: "true"},
|
|
},
|
|
}
|
|
machine = &clusterv1.Machine{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: nodeName,
|
|
Namespace: ns.Name,
|
|
},
|
|
Spec: clusterv1.MachineSpec{
|
|
ClusterName: "foo",
|
|
InfrastructureRef: corev1.ObjectReference{
|
|
Kind: "Pod",
|
|
APIVersion: "v1",
|
|
Name: "stub",
|
|
Namespace: ns.Name,
|
|
},
|
|
},
|
|
}
|
|
|
|
machineStatus := clusterv1.MachineStatus{
|
|
NodeRef: &corev1.ObjectReference{
|
|
Kind: "Node",
|
|
APIVersion: "v1",
|
|
Name: nodeName,
|
|
},
|
|
}
|
|
Expect(testEnv.Create(ctx, node)).Should(Succeed())
|
|
Expect(testEnv.Create(ctx, machine)).Should(Succeed())
|
|
machine.Status = machineStatus
|
|
Expect(testEnv.Status().Update(ctx, machine)).Should(Succeed())
|
|
})
|
|
AfterAll(func() {
|
|
testEnv.Cleanup(ctx, node, machine, ns)
|
|
})
|
|
It("Should mark the node for etcd member removal", func() {
|
|
patchHelper, err = patch.NewHelper(node, testEnv.GetClient())
|
|
Expect(err).ShouldNot(HaveOccurred())
|
|
w = &Workload{
|
|
Client: testEnv.GetClient(),
|
|
Nodes: map[string]*corev1.Node{node.Name: node},
|
|
nodePatchHelpers: map[string]*patch.Helper{node.Name: patchHelper},
|
|
}
|
|
Expect(w.RemoveEtcdMemberForMachine(ctx, machine)).Should(Succeed())
|
|
|
|
Expect(testEnv.Get(ctx, client.ObjectKeyFromObject(node), node)).Should(Succeed())
|
|
Expect(node.GetAnnotations()).ShouldNot(BeNil(), "Node should have annotations")
|
|
value, found := node.GetAnnotations()[EtcdNodeRemoveAnnotation]
|
|
Expect(found).Should(BeTrue(), "remove annotation must be present")
|
|
Expect(value).Should(Equal("true"))
|
|
})
|
|
It("Should wait for removed node name annotation", func() {
|
|
safelyRemoved, err := w.IsEtcdMemberSafelyRemovedForMachine(ctx, machine)
|
|
Expect(err).ShouldNot(HaveOccurred())
|
|
Expect(safelyRemoved).Should(BeFalse(), "etcd member not removed yet")
|
|
|
|
//Explicitly test the value can contain anything, not necessarily the node name
|
|
node.Annotations[EtcdNodeRemovedNodeNameAnnotation] = "foo"
|
|
Expect(patchHelper.Patch(ctx, node)).Should(Succeed())
|
|
|
|
safelyRemoved, err = w.IsEtcdMemberSafelyRemovedForMachine(ctx, machine)
|
|
Expect(err).ShouldNot(HaveOccurred())
|
|
Expect(safelyRemoved).Should(BeTrue(), "etcd member is removed")
|
|
})
|
|
})
|
|
|
|
func TestForwardEtcdLeadership(t *testing.T) {
|
|
t.Run("handles errors correctly", func(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
machine *clusterv1.Machine
|
|
leaderCandidate *clusterv1.Machine
|
|
etcdClientGenerator etcd.ClientFor
|
|
k8sClient client.Client
|
|
expectErr bool
|
|
}{
|
|
{
|
|
name: "does nothing if the machine is nil",
|
|
machine: nil,
|
|
expectErr: false,
|
|
},
|
|
{
|
|
name: "does nothing if machine's NodeRef is nil",
|
|
machine: defaultMachine(func(m *clusterv1.Machine) {
|
|
m.Status.NodeRef = nil
|
|
}),
|
|
expectErr: false,
|
|
},
|
|
{
|
|
name: "returns an error if the leader candidate is nil",
|
|
machine: defaultMachine(),
|
|
leaderCandidate: nil,
|
|
expectErr: true,
|
|
},
|
|
{
|
|
name: "returns an error if the leader candidate's noderef is nil",
|
|
machine: defaultMachine(),
|
|
leaderCandidate: defaultMachine(func(m *clusterv1.Machine) {
|
|
m.Status.NodeRef = nil
|
|
}),
|
|
expectErr: true,
|
|
},
|
|
{
|
|
name: "returns an error if it can't retrieve the list of control plane nodes",
|
|
machine: defaultMachine(),
|
|
leaderCandidate: defaultMachine(),
|
|
etcdClientGenerator: &fakeEtcdClientGenerator{},
|
|
k8sClient: &fakeClient{listErr: errors.New("failed to list nodes")},
|
|
expectErr: true,
|
|
},
|
|
{
|
|
name: "returns an error if it can't create an etcd client",
|
|
machine: defaultMachine(),
|
|
leaderCandidate: defaultMachine(),
|
|
k8sClient: &fakeClient{},
|
|
etcdClientGenerator: &fakeEtcdClientGenerator{forLeaderErr: errors.New("no etcdClient")},
|
|
expectErr: true,
|
|
},
|
|
{
|
|
name: "returns error if it fails to get etcd members",
|
|
machine: defaultMachine(),
|
|
leaderCandidate: defaultMachine(),
|
|
k8sClient: &fakeClient{},
|
|
etcdClientGenerator: &fakeEtcdClientGenerator{
|
|
forLeaderClient: &etcd.Client{
|
|
EtcdClient: &etcdfake.FakeEtcdClient{
|
|
ErrorResponse: errors.New("cannot get etcd members"),
|
|
},
|
|
},
|
|
},
|
|
expectErr: true,
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
g := NewWithT(t)
|
|
w := &Workload{
|
|
Client: tt.k8sClient,
|
|
etcdClientGenerator: tt.etcdClientGenerator,
|
|
}
|
|
err := w.ForwardEtcdLeadership(ctx, tt.machine, tt.leaderCandidate)
|
|
if tt.expectErr {
|
|
g.Expect(err).To(HaveOccurred())
|
|
return
|
|
}
|
|
g.Expect(err).ToNot(HaveOccurred())
|
|
})
|
|
}
|
|
})
|
|
|
|
t.Run("does nothing if the machine is not the leader", func(t *testing.T) {
|
|
g := NewWithT(t)
|
|
fakeEtcdClient := &etcdfake.FakeEtcdClient{
|
|
MemberListResponse: &clientv3.MemberListResponse{
|
|
Members: []*pb.Member{
|
|
{Name: "machine-node", ID: uint64(101)},
|
|
},
|
|
},
|
|
AlarmResponse: &clientv3.AlarmResponse{
|
|
Alarms: []*pb.AlarmMember{},
|
|
},
|
|
}
|
|
etcdClientGenerator := &fakeEtcdClientGenerator{
|
|
forLeaderClient: &etcd.Client{
|
|
EtcdClient: fakeEtcdClient,
|
|
LeaderID: 555,
|
|
},
|
|
}
|
|
|
|
w := &Workload{
|
|
Client: &fakeClient{list: &corev1.NodeList{
|
|
Items: []corev1.Node{nodeNamed("leader-node")},
|
|
}},
|
|
etcdClientGenerator: etcdClientGenerator,
|
|
}
|
|
err := w.ForwardEtcdLeadership(ctx, defaultMachine(), defaultMachine())
|
|
g.Expect(err).ToNot(HaveOccurred())
|
|
g.Expect(fakeEtcdClient.MovedLeader).To(BeEquivalentTo(0))
|
|
})
|
|
|
|
t.Run("move etcd leader", func(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
leaderCandidate *clusterv1.Machine
|
|
etcdMoveErr error
|
|
expectedMoveLeader uint64
|
|
expectErr bool
|
|
}{
|
|
{
|
|
name: "it moves the etcd leadership to the leader candidate",
|
|
leaderCandidate: defaultMachine(func(m *clusterv1.Machine) {
|
|
m.Status.NodeRef.Name = "candidate-node"
|
|
}),
|
|
expectedMoveLeader: 12345,
|
|
},
|
|
{
|
|
name: "returns error if failed to move to the leader candidate",
|
|
leaderCandidate: defaultMachine(func(m *clusterv1.Machine) {
|
|
m.Status.NodeRef.Name = "candidate-node"
|
|
}),
|
|
etcdMoveErr: errors.New("move err"),
|
|
expectErr: true,
|
|
},
|
|
{
|
|
name: "returns error if the leader candidate doesn't exist in etcd",
|
|
leaderCandidate: defaultMachine(func(m *clusterv1.Machine) {
|
|
m.Status.NodeRef.Name = "some other node"
|
|
}),
|
|
expectErr: true,
|
|
},
|
|
}
|
|
|
|
currentLeader := defaultMachine(func(m *clusterv1.Machine) {
|
|
m.Status.NodeRef.Name = "current-leader"
|
|
})
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
g := NewWithT(t)
|
|
fakeEtcdClient := &etcdfake.FakeEtcdClient{
|
|
ErrorResponse: tt.etcdMoveErr,
|
|
MemberListResponse: &clientv3.MemberListResponse{
|
|
Members: []*pb.Member{
|
|
{Name: currentLeader.Status.NodeRef.Name, ID: uint64(101)},
|
|
{Name: "other-node", ID: uint64(1034)},
|
|
{Name: "candidate-node", ID: uint64(12345)},
|
|
},
|
|
},
|
|
AlarmResponse: &clientv3.AlarmResponse{
|
|
Alarms: []*pb.AlarmMember{},
|
|
},
|
|
}
|
|
|
|
etcdClientGenerator := &fakeEtcdClientGenerator{
|
|
forLeaderClient: &etcd.Client{
|
|
EtcdClient: fakeEtcdClient,
|
|
// this etcdClient belongs to the machine-node
|
|
LeaderID: 101,
|
|
},
|
|
}
|
|
|
|
w := &Workload{
|
|
etcdClientGenerator: etcdClientGenerator,
|
|
Client: &fakeClient{list: &corev1.NodeList{
|
|
Items: []corev1.Node{nodeNamed("leader-node"), nodeNamed("other-node"), nodeNamed("candidate-node")},
|
|
}},
|
|
}
|
|
err := w.ForwardEtcdLeadership(ctx, currentLeader, tt.leaderCandidate)
|
|
if tt.expectErr {
|
|
g.Expect(err).To(HaveOccurred())
|
|
return
|
|
}
|
|
g.Expect(err).ToNot(HaveOccurred())
|
|
g.Expect(fakeEtcdClient.MovedLeader).To(BeEquivalentTo(tt.expectedMoveLeader))
|
|
})
|
|
}
|
|
})
|
|
}
|
|
|
|
type fakeEtcdClientGenerator struct {
|
|
forNodesClient *etcd.Client
|
|
forNodesClientFunc func([]string) (*etcd.Client, error)
|
|
forLeaderClient *etcd.Client
|
|
forNodesErr error
|
|
forLeaderErr error
|
|
}
|
|
|
|
func (c *fakeEtcdClientGenerator) ForFirstAvailableNode(_ context.Context, n []string) (*etcd.Client, error) {
|
|
if c.forNodesClientFunc != nil {
|
|
return c.forNodesClientFunc(n)
|
|
}
|
|
return c.forNodesClient, c.forNodesErr
|
|
}
|
|
|
|
func (c *fakeEtcdClientGenerator) ForLeader(_ context.Context, _ []string) (*etcd.Client, error) {
|
|
return c.forLeaderClient, c.forLeaderErr
|
|
}
|
|
|
|
type fakeClient struct {
|
|
client.Client
|
|
list interface{}
|
|
|
|
createErr error
|
|
get map[string]interface{}
|
|
getCalled bool
|
|
updateCalled bool
|
|
getErr error
|
|
patchErr error
|
|
updateErr error
|
|
listErr error
|
|
}
|
|
|
|
func (f *fakeClient) Get(_ context.Context, key client.ObjectKey, obj client.Object, _ ...client.GetOption) error {
|
|
f.getCalled = true
|
|
if f.getErr != nil {
|
|
return f.getErr
|
|
}
|
|
item := f.get[key.String()]
|
|
switch l := item.(type) {
|
|
case *corev1.Pod:
|
|
l.DeepCopyInto(obj.(*corev1.Pod))
|
|
case *rbacv1.RoleBinding:
|
|
l.DeepCopyInto(obj.(*rbacv1.RoleBinding))
|
|
case *rbacv1.Role:
|
|
l.DeepCopyInto(obj.(*rbacv1.Role))
|
|
case *appsv1.DaemonSet:
|
|
l.DeepCopyInto(obj.(*appsv1.DaemonSet))
|
|
case *corev1.ConfigMap:
|
|
l.DeepCopyInto(obj.(*corev1.ConfigMap))
|
|
case *corev1.Secret:
|
|
l.DeepCopyInto(obj.(*corev1.Secret))
|
|
case nil:
|
|
return apierrors.NewNotFound(schema.GroupResource{}, key.Name)
|
|
default:
|
|
return fmt.Errorf("unknown type: %s", l)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *fakeClient) List(_ context.Context, list client.ObjectList, _ ...client.ListOption) error {
|
|
if f.listErr != nil {
|
|
return f.listErr
|
|
}
|
|
switch l := f.list.(type) {
|
|
case *clusterv1.MachineList:
|
|
l.DeepCopyInto(list.(*clusterv1.MachineList))
|
|
case *corev1.NodeList:
|
|
l.DeepCopyInto(list.(*corev1.NodeList))
|
|
case *corev1.PodList:
|
|
l.DeepCopyInto(list.(*corev1.PodList))
|
|
default:
|
|
return fmt.Errorf("unknown type: %s", l)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *fakeClient) Create(_ context.Context, _ client.Object, _ ...client.CreateOption) error {
|
|
if f.createErr != nil {
|
|
return f.createErr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *fakeClient) Patch(_ context.Context, _ client.Object, _ client.Patch, _ ...client.PatchOption) error {
|
|
if f.patchErr != nil {
|
|
return f.patchErr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *fakeClient) Update(_ context.Context, _ client.Object, _ ...client.UpdateOption) error {
|
|
f.updateCalled = true
|
|
if f.updateErr != nil {
|
|
return f.updateErr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func defaultMachine(transforms ...func(m *clusterv1.Machine)) *clusterv1.Machine {
|
|
m := &clusterv1.Machine{
|
|
Status: clusterv1.MachineStatus{
|
|
NodeRef: &corev1.ObjectReference{
|
|
Name: "machine-node",
|
|
},
|
|
},
|
|
}
|
|
for _, t := range transforms {
|
|
t(m)
|
|
}
|
|
return m
|
|
}
|
|
|
|
func nodeNamed(name string) corev1.Node {
|
|
node := corev1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: name,
|
|
},
|
|
}
|
|
return node
|
|
}
|