feat: add Kubernetes-NodePort network plugin (#138)

Signed-off-by: ChrisLiu <chrisliu1995@163.com>
This commit is contained in:
ChrisLiu 2024-04-26 11:22:45 +08:00 committed by GitHub
parent 56d9071c01
commit d547f61323
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 442 additions and 0 deletions

View File

@ -0,0 +1,259 @@
/*
Copyright 2024 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubernetes
import (
"context"
gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1"
"github.com/openkruise/kruise-game/cloudprovider"
cperrors "github.com/openkruise/kruise-game/cloudprovider/errors"
"github.com/openkruise/kruise-game/cloudprovider/utils"
"github.com/openkruise/kruise-game/pkg/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"strings"
)
const (
NodePortNetwork = "Kubernetes-NodePort"
PortProtocolsConfigName = "PortProtocols"
)
type NodePortPlugin struct {
}
func (n *NodePortPlugin) Name() string {
return NodePortNetwork
}
func (n *NodePortPlugin) Alias() string {
return ""
}
func (n *NodePortPlugin) Init(client client.Client, options cloudprovider.CloudProviderOptions, ctx context.Context) error {
return nil
}
func (n *NodePortPlugin) OnPodAdded(client client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) {
return pod, nil
}
func (n *NodePortPlugin) OnPodUpdated(client client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) {
networkManager := utils.NewNetworkManager(pod, client)
networkStatus, _ := networkManager.GetNetworkStatus()
networkConfig := networkManager.GetNetworkConfig()
npc, err := parseNodePortConfig(networkConfig)
if err != nil {
return pod, cperrors.NewPluginError(cperrors.ParameterError, err.Error())
}
if networkStatus == nil {
pod, err := networkManager.UpdateNetworkStatus(gamekruiseiov1alpha1.NetworkStatus{
CurrentNetworkState: gamekruiseiov1alpha1.NetworkNotReady,
}, pod)
return pod, cperrors.ToPluginError(err, cperrors.InternalError)
}
// get svc
svc := &corev1.Service{}
err = client.Get(ctx, types.NamespacedName{
Name: pod.GetName(),
Namespace: pod.GetNamespace(),
}, svc)
if err != nil {
if errors.IsNotFound(err) {
return pod, cperrors.ToPluginError(client.Create(ctx, consNodePortSvc(npc, pod, client, ctx)), cperrors.ApiCallError)
}
return pod, cperrors.NewPluginError(cperrors.ApiCallError, err.Error())
}
// update svc
if util.GetHash(npc) != svc.GetAnnotations()[ServiceHashKey] {
networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady
pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod)
if err != nil {
return pod, cperrors.NewPluginError(cperrors.InternalError, err.Error())
}
return pod, cperrors.ToPluginError(client.Update(ctx, consNodePortSvc(npc, pod, client, ctx)), cperrors.ApiCallError)
}
// allow not ready containers
if util.IsAllowNotReadyContainers(networkManager.GetNetworkConfig()) {
toUpDateSvc, err := utils.AllowNotReadyContainers(client, ctx, pod, svc, false)
if err != nil {
return pod, err
}
if toUpDateSvc {
err := client.Update(ctx, svc)
if err != nil {
return pod, cperrors.ToPluginError(err, cperrors.ApiCallError)
}
}
}
// network ready
node := &corev1.Node{}
err = client.Get(ctx, types.NamespacedName{
Name: pod.Spec.NodeName,
}, node)
if err != nil {
return pod, cperrors.NewPluginError(cperrors.ApiCallError, err.Error())
}
if pod.Status.PodIP == "" {
// Pod IP not exist, Network NotReady
networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady
pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod)
return pod, cperrors.ToPluginError(err, cperrors.InternalError)
}
internalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0)
externalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0)
for _, port := range svc.Spec.Ports {
instrIPort := port.TargetPort
if port.NodePort == 0 {
networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady
pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod)
return pod, cperrors.ToPluginError(err, cperrors.InternalError)
}
instrEPort := intstr.FromInt(int(port.NodePort))
internalAddress := gamekruiseiov1alpha1.NetworkAddress{
IP: pod.Status.PodIP,
Ports: []gamekruiseiov1alpha1.NetworkPort{
{
Name: instrIPort.String(),
Port: &instrIPort,
Protocol: port.Protocol,
},
},
}
externalAddress := gamekruiseiov1alpha1.NetworkAddress{
IP: getAddress(node),
Ports: []gamekruiseiov1alpha1.NetworkPort{
{
Name: instrIPort.String(),
Port: &instrEPort,
Protocol: port.Protocol,
},
},
}
internalAddresses = append(internalAddresses, internalAddress)
externalAddresses = append(externalAddresses, externalAddress)
}
networkStatus.InternalAddresses = internalAddresses
networkStatus.ExternalAddresses = externalAddresses
networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkReady
pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod)
return pod, cperrors.ToPluginError(err, cperrors.InternalError)
}
func (n *NodePortPlugin) OnPodDeleted(client client.Client, pod *corev1.Pod, ctx context.Context) cperrors.PluginError {
return nil
}
func init() {
kubernetesProvider.registerPlugin(&NodePortPlugin{})
}
type nodePortConfig struct {
ports []int
protocols []corev1.Protocol
isFixed bool
}
func parseNodePortConfig(conf []gamekruiseiov1alpha1.NetworkConfParams) (*nodePortConfig, error) {
var ports []int
var protocols []corev1.Protocol
isFixed := false
for _, c := range conf {
switch c.Name {
case PortProtocolsConfigName:
ports, protocols = parsePortProtocols(c.Value)
case FixedKey:
var err error
isFixed, err = strconv.ParseBool(c.Value)
if err != nil {
return nil, err
}
}
}
return &nodePortConfig{
ports: ports,
protocols: protocols,
isFixed: isFixed,
}, nil
}
func parsePortProtocols(value string) ([]int, []corev1.Protocol) {
ports := make([]int, 0)
protocols := make([]corev1.Protocol, 0)
for _, pp := range strings.Split(value, ",") {
ppSlice := strings.Split(pp, "/")
port, err := strconv.Atoi(ppSlice[0])
if err != nil {
continue
}
ports = append(ports, port)
if len(ppSlice) != 2 {
protocols = append(protocols, corev1.ProtocolTCP)
} else {
protocols = append(protocols, corev1.Protocol(ppSlice[1]))
}
}
return ports, protocols
}
func consNodePortSvc(npc *nodePortConfig, pod *corev1.Pod, c client.Client, ctx context.Context) *corev1.Service {
svcPorts := make([]corev1.ServicePort, 0)
for i := 0; i < len(npc.ports); i++ {
svcPorts = append(svcPorts, corev1.ServicePort{
Name: strconv.Itoa(npc.ports[i]),
Port: int32(npc.ports[i]),
Protocol: npc.protocols[i],
TargetPort: intstr.FromInt(npc.ports[i]),
})
}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: pod.GetName(),
Namespace: pod.GetNamespace(),
Annotations: map[string]string{
ServiceHashKey: util.GetHash(npc),
},
OwnerReferences: consOwnerReference(c, ctx, pod, npc.isFixed),
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Selector: map[string]string{
SvcSelectorKey: pod.GetName(),
},
Ports: svcPorts,
},
}
return svc
}

View File

@ -0,0 +1,183 @@
package kubernetes
import (
gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1"
"github.com/openkruise/kruise-game/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/pointer"
"reflect"
"testing"
)
func TestParseNPConfig(t *testing.T) {
tests := []struct {
conf []gamekruiseiov1alpha1.NetworkConfParams
podNetConfig *nodePortConfig
}{
{
conf: []gamekruiseiov1alpha1.NetworkConfParams{
{
Name: PortProtocolsConfigName,
Value: "80",
},
},
podNetConfig: &nodePortConfig{
ports: []int{80},
protocols: []corev1.Protocol{corev1.ProtocolTCP},
},
},
{
conf: []gamekruiseiov1alpha1.NetworkConfParams{
{
Name: PortProtocolsConfigName,
Value: "8021/UDP",
},
},
podNetConfig: &nodePortConfig{
ports: []int{8021},
protocols: []corev1.Protocol{corev1.ProtocolUDP},
},
},
}
for _, test := range tests {
podNetConfig, _ := parseNodePortConfig(test.conf)
if !reflect.DeepEqual(podNetConfig, test.podNetConfig) {
t.Errorf("expect podNetConfig: %v, but actual: %v", test.podNetConfig, podNetConfig)
}
}
}
func TestConsNPSvc(t *testing.T) {
pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
Namespace: "ns",
UID: "bff0afd6-bb30-4641-8607-8329547324eb",
},
}
// case 0
npcCase0 := &nodePortConfig{
ports: []int{
80,
8080,
},
protocols: []corev1.Protocol{
corev1.ProtocolTCP,
corev1.ProtocolTCP,
},
isFixed: false,
}
svcCase0 := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
Namespace: "ns",
Annotations: map[string]string{
ServiceHashKey: util.GetHash(npcCase0),
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: "pod-3",
UID: "bff0afd6-bb30-4641-8607-8329547324eb",
Controller: pointer.BoolPtr(true),
BlockOwnerDeletion: pointer.BoolPtr(true),
},
},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Selector: map[string]string{
SvcSelectorKey: "pod-3",
},
Ports: []corev1.ServicePort{
{
Name: "80",
Port: int32(80),
TargetPort: intstr.FromInt(80),
Protocol: corev1.ProtocolTCP,
},
{
Name: "8080",
Port: int32(8080),
TargetPort: intstr.FromInt(8080),
Protocol: corev1.ProtocolTCP,
},
},
},
}
// case 1
npcCase1 := &nodePortConfig{
ports: []int{
8021,
},
protocols: []corev1.Protocol{
corev1.ProtocolUDP,
},
isFixed: false,
}
svcCase1 := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
Namespace: "ns",
Annotations: map[string]string{
ServiceHashKey: util.GetHash(npcCase1),
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: "pod-3",
UID: "bff0afd6-bb30-4641-8607-8329547324eb",
Controller: pointer.BoolPtr(true),
BlockOwnerDeletion: pointer.BoolPtr(true),
},
},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Selector: map[string]string{
SvcSelectorKey: "pod-3",
},
Ports: []corev1.ServicePort{
{
Name: "8021",
Port: int32(8021),
TargetPort: intstr.FromInt(8021),
Protocol: corev1.ProtocolUDP,
},
},
},
}
tests := []struct {
npc *nodePortConfig
svc *corev1.Service
}{
{
npc: npcCase0,
svc: svcCase0,
},
{
npc: npcCase1,
svc: svcCase1,
},
}
for i, test := range tests {
actual := consNodePortSvc(test.npc, pod, nil, nil)
if !reflect.DeepEqual(actual, test.svc) {
t.Errorf("case %d: expect service: %v , but actual: %v", i, test.svc, actual)
}
}
}