Merge pull request #4452 from airbnb/es--grpc-expander-plugin

Add gRPC expander plugin
This commit is contained in:
Kubernetes Prow Robot 2022-02-21 06:54:14 -08:00 committed by GitHub
commit 0123869b7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1691 additions and 5 deletions

View File

@ -71,6 +71,10 @@ type AutoscalingOptions struct {
EstimatorName string
// ExpanderNames sets the chain of node group expanders to be used in scale up
ExpanderNames string
// GRPCExpanderCert is the location of the cert passed to the gRPC server for TLS when using the gRPC expander
GRPCExpanderCert string
// GRPCExpanderURL is the url of the gRPC server when using the gRPC expander
GRPCExpanderURL string
// IgnoreDaemonSetsUtilization is whether CA will ignore DaemonSet pods when calculating resource utilization for scaling down
IgnoreDaemonSetsUtilization bool
// IgnoreMirrorPodsUtilization is whether CA will ignore Mirror pods when calculating resource utilization for scaling down

View File

@ -105,8 +105,8 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error {
opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions)
}
if opts.ExpanderStrategy == nil {
expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","),
opts.CloudProvider, opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace)
expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","), opts.CloudProvider,
opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace, opts.GRPCExpanderCert, opts.GRPCExpanderURL)
if err != nil {
return err
}

View File

@ -24,7 +24,7 @@ import (
var (
// AvailableExpanders is a list of available expander options
AvailableExpanders = []string{RandomExpanderName, MostPodsExpanderName, LeastWasteExpanderName, PriceBasedExpanderName, PriorityBasedExpanderName}
AvailableExpanders = []string{RandomExpanderName, MostPodsExpanderName, LeastWasteExpanderName, PriceBasedExpanderName, PriorityBasedExpanderName, GRPCExpanderName}
// RandomExpanderName selects a node group at random
RandomExpanderName = "random"
// MostPodsExpanderName selects a node group that fits the most pods
@ -36,6 +36,8 @@ var (
PriceBasedExpanderName = "price"
// PriorityBasedExpanderName selects a node group based on a user-configured priorities assigned to group names
PriorityBasedExpanderName = "priority"
// GRPCExpanderName uses the gRPC client expander to call to an external gRPC server to select a node group for scale up
GRPCExpanderName = "grpc"
)
// Option describes an option to expand the cluster.

View File

@ -20,6 +20,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin"
"k8s.io/autoscaler/cluster-autoscaler/expander/mostpods"
"k8s.io/autoscaler/cluster-autoscaler/expander/price"
"k8s.io/autoscaler/cluster-autoscaler/expander/priority"
@ -27,14 +28,14 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander/waste"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_client "k8s.io/client-go/kubernetes"
)
// ExpanderStrategyFromStrings creates an expander.Strategy according to the names of the expanders passed in
// take in whole opts and access stuff here
func ExpanderStrategyFromStrings(expanderFlags []string, cloudProvider cloudprovider.CloudProvider,
autoscalingKubeClients *context.AutoscalingKubeClients, kubeClient kube_client.Interface,
configNamespace string) (expander.Strategy, errors.AutoscalerError) {
configNamespace string, GRPCExpanderCert string, GRPCExpanderURL string) (expander.Strategy, errors.AutoscalerError) {
var filters []expander.Filter
seenExpanders := map[string]struct{}{}
strategySeen := false
@ -67,6 +68,8 @@ func ExpanderStrategyFromStrings(expanderFlags []string, cloudProvider cloudprov
stopChannel := make(chan struct{})
lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace)
filters = append(filters, priority.NewFilter(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder))
case expander.GRPCExpanderName:
filters = append(filters, grpcplugin.NewFilter(GRPCExpanderCert, GRPCExpanderURL))
default:
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag)
}

View File

@ -0,0 +1,41 @@
# gRPC Expander for Cluster Autoscaler
## Introduction
This expander functions as a gRPC client, and will pass expansion options to an external gRPC server.
The external server will use this information to make a decision on which Node Group to expand, and return an option to expand.
## Motivation
This expander gives users very fine grained control over which option they'd like to expand.
The gRPC server must be implemented by the user, but the logic can be developed out of band with Cluster Autoscaler.
There are a wide variety of use cases here. Some examples are as follows:
* A tiered weighted random strategy can be implemented, instead of a static priority ladder offered by the priority expander.
* A strategy to encapsulate business logic specific to a user but not all users of Cluster Autoscaler
* A strategy to take into account the dynamic fluctuating prices of the spot instance market
## Configuration options
As using this expander requires communication with another service, users must specify a few options as CLI arguments.
```yaml
--grpcExpanderUrl
```
URL of the gRPC Expander server, for CA to communicate with.
```yaml
--grpcExpanderCert
```
Location of the volume mounted certificate of the gRPC server if it is configured to communicate over TLS
## gRPC Expander Server Setup
The gRPC server can be set up in many ways, but a simple example is described below.
An example of a barebones gRPC Exapnder Server can be found in the `example` directory under `fake_grpc_server.go` file. This is meant to be copied elsewhere and deployed as a separate
service. Note that the `protos/expander.pb.go` generated protobuf code will also need to be copied and used to serialize/deserizle the Options passed from CA.
Communication between Cluster Autoscaler and the gRPC Server will occur over native kube-proxy. To use this, note the Service and Namespace the gRPC server is deployed in.
Deploy the gRPC Expander Server as a separate app, listening on a specifc port number.
Start Cluster Autoscaler with the `--grpcExapnderURl=SERVICE_NAME.NAMESPACE_NAME.svc.cluster.local:PORT_NUMBER` flag, as well as `--grpcExpanderCert` pointed at the location of the volume mounted certificate of the gRPC server.
## Details
The gRPC client currently transforms nodeInfo objects passed into the expander to v1.Node objects to save rpc call throughput. As such, the gRPC server will not have access to daemonsets and static pods running on each node.

View File

@ -0,0 +1,104 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package example
import (
"context"
"fmt"
"log"
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos"
)
// This code is meant to be used as starter code, deployed as a separate app, not in Cluster Autoscaler.
// This serves as the gRPC Expander Server counterpart to the client which lives in this repo
// main.go of said application should simply pass in paths to (optional)cert, (optional)private key, and port, and call Serve to start listening
// copy the protos/expander.pb.go to your other application's repo, so it has access to the protobuf definitions
// Serve should be called by the main() function in main.go of the Expander Server repo to start serving
func Serve(certPath string, keyPath string, port uint) {
var grpcServer *grpc.Server
// If credentials are passed in, use them
if certPath != "" && keyPath != "" {
log.Printf("Using certFile: %v and keyFile: %v", certPath, keyPath)
tlsCredentials, err := credentials.NewServerTLSFromFile(certPath, keyPath)
if err != nil {
log.Fatal("cannot load TLS credentials: ", err)
}
grpcServer = grpc.NewServer(grpc.Creds(tlsCredentials))
} else {
grpcServer = grpc.NewServer()
}
netListener := getNetListener(port)
expanderServerImpl := NewExpanderServerImpl()
protos.RegisterExpanderServer(grpcServer, expanderServerImpl)
// start the server
log.Println("Starting server on port ", port)
if err := grpcServer.Serve(netListener); err != nil {
log.Fatalf("failed to serve: %s", err)
}
}
func getNetListener(port uint) net.Listener {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
panic(fmt.Sprintf("failed to listen: %v", err))
}
return lis
}
// ExpanderServerImpl is an implementation of Expander Server from proto definition
type ExpanderServerImpl struct{}
// NewExpanderServerImpl is this Expander's implementation of the server
func NewExpanderServerImpl() *ExpanderServerImpl {
return &ExpanderServerImpl{}
}
// BestOptions method filters out the best options of all options passed from the gRPC Client in CA, according to the defined strategy.
func (ServerImpl *ExpanderServerImpl) BestOptions(ctx context.Context, req *protos.BestOptionsRequest) (*protos.BestOptionsResponse, error) {
opts := req.GetOptions()
log.Printf("Received BestOption Request with %v options", len(opts))
// This strategy simply chooses the Option with the longest NodeGroupID name, but can be replaced with any arbitrary logic
longest := 0
var choice *protos.Option
for _, opt := range opts {
log.Println(opt.NodeGroupId)
if len(opt.NodeGroupId) > longest {
choice = opt
}
}
log.Print("returned bestOptions with option: ", choice.NodeGroupId)
// Return just one option for now
return &protos.BestOptionsResponse{
Options: []*protos.Option{choice},
}, nil
}

View File

@ -0,0 +1,30 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package example
import "flag"
func main() {
certPath := flag.String("cert-path", "", "Path to cert file for gRPC Expander Server")
keyPath := flag.String("key-path", "", "Path to private key for gRPC Expander Server")
port := flag.Uint("port", 7000, "Port number for server to listen on")
flag.Parse()
Serve(*certPath, *keyPath, *port)
}

View File

@ -0,0 +1,143 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grpcplugin
import (
"context"
"log"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const gRPCTimeout = 5 * time.Second
type grpcclientstrategy struct {
grpcClient protos.ExpanderClient
}
// NewFilter returns an expansion filter that creates a gRPC client, and calls out to a gRPC server
func NewFilter(expanderCert string, expanderUrl string) expander.Filter {
client := createGRPCClient(expanderCert, expanderUrl)
if client == nil {
return &grpcclientstrategy{grpcClient: nil}
}
return &grpcclientstrategy{grpcClient: client}
}
func createGRPCClient(expanderCert string, expanderUrl string) protos.ExpanderClient {
var dialOpt grpc.DialOption
if expanderCert == "" {
log.Fatalf("GRPC Expander Cert not specified, insecure connections not allowed")
return nil
}
creds, err := credentials.NewClientTLSFromFile(expanderCert, "")
if err != nil {
log.Fatalf("Failed to create TLS credentials %v", err)
return nil
}
dialOpt = grpc.WithTransportCredentials(creds)
klog.V(2).Infof("Dialing: %s with dialopt: %v", expanderUrl, dialOpt)
conn, err := grpc.Dial(expanderUrl, dialOpt)
if err != nil {
log.Fatalf("Fail to dial server: %v", err)
return nil
}
return protos.NewExpanderClient(conn)
}
func (g *grpcclientstrategy) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
if g.grpcClient == nil {
klog.Errorf("Incorrect gRPC client config, filtering no options")
return expansionOptions
}
// Transform inputs to gRPC inputs
grpcOptionsSlice, nodeGroupIDOptionMap := populateOptionsForGRPC(expansionOptions)
grpcNodeMap := populateNodeInfoForGRPC(nodeInfo)
// call gRPC server to get BestOption
klog.V(2).Infof("GPRC call of best options to server with %v options", len(nodeGroupIDOptionMap))
ctx, cancel := context.WithTimeout(context.Background(), gRPCTimeout)
defer cancel()
bestOptionsResponse, err := g.grpcClient.BestOptions(ctx, &protos.BestOptionsRequest{Options: grpcOptionsSlice, NodeMap: grpcNodeMap})
if err != nil {
klog.V(4).Info("GRPC call timed out, no options filtered")
return expansionOptions
}
if bestOptionsResponse == nil || bestOptionsResponse.Options == nil {
klog.V(4).Info("GRPC returned nil bestOptions, no options filtered")
return expansionOptions
}
// Transform back options slice
options := transformAndSanitizeOptionsFromGRPC(bestOptionsResponse.Options, nodeGroupIDOptionMap)
if options == nil {
klog.V(4).Info("Unable to sanitize GPRC returned bestOptions, no options filtered")
return expansionOptions
}
return options
}
// populateOptionsForGRPC creates a map of nodegroup ID and options, as well as a slice of Options objects for the gRPC call
func populateOptionsForGRPC(expansionOptions []expander.Option) ([]*protos.Option, map[string]expander.Option) {
grpcOptionsSlice := []*protos.Option{}
nodeGroupIDOptionMap := make(map[string]expander.Option)
for _, option := range expansionOptions {
nodeGroupIDOptionMap[option.NodeGroup.Id()] = option
grpcOptionsSlice = append(grpcOptionsSlice, newOptionMessage(option.NodeGroup.Id(), int32(option.NodeCount), option.Debug, option.Pods))
}
return grpcOptionsSlice, nodeGroupIDOptionMap
}
// populateNodeInfoForGRPC looks at the corresponding v1.Node object per NodeInfo object, and populates the grpcNodeInfoMap with these to pass over grpc
func populateNodeInfoForGRPC(nodeInfos map[string]*schedulerframework.NodeInfo) map[string]*v1.Node {
grpcNodeInfoMap := make(map[string]*v1.Node)
for nodeId, nodeInfo := range nodeInfos {
grpcNodeInfoMap[nodeId] = nodeInfo.Node()
}
return grpcNodeInfoMap
}
func transformAndSanitizeOptionsFromGRPC(bestOptionsResponseOptions []*protos.Option, nodeGroupIDOptionMap map[string]expander.Option) []expander.Option {
var options []expander.Option
for _, option := range bestOptionsResponseOptions {
if option == nil {
klog.Errorf("GRPC server returned nil Option")
continue
}
if _, ok := nodeGroupIDOptionMap[option.NodeGroupId]; ok {
options = append(options, nodeGroupIDOptionMap[option.NodeGroupId])
} else {
klog.Errorf("GRPC server returned invalid nodeGroup ID: ", option.NodeGroupId)
continue
}
}
return options
}
func newOptionMessage(nodeGroupId string, nodeCount int32, debug string, pods []*v1.Pod) *protos.Option {
return &protos.Option{NodeGroupId: nodeGroupId, NodeCount: nodeCount, Debug: debug, Pod: pods}
}

View File

@ -0,0 +1,276 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grpcplugin
import (
"errors"
"testing"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos"
"k8s.io/autoscaler/cluster-autoscaler/expander/mocks"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/expander"
_ "github.com/golang/mock/mockgen/model"
)
var (
nodes = []*v1.Node{
BuildTestNode("n1", 1000, 1000),
BuildTestNode("n2", 1000, 1000),
BuildTestNode("n3", 1000, 1000),
BuildTestNode("n4", 1000, 1000),
}
eoT2Micro = expander.Option{
Debug: "t2.micro",
NodeGroup: test.NewTestNodeGroup("my-asg.t2.micro", 10, 1, 1, true, false, "t2.micro", nil, nil),
}
eoT2Large = expander.Option{
Debug: "t2.large",
NodeGroup: test.NewTestNodeGroup("my-asg.t2.large", 10, 1, 1, true, false, "t2.large", nil, nil),
}
eoT3Large = expander.Option{
Debug: "t3.large",
NodeGroup: test.NewTestNodeGroup("my-asg.t3.large", 10, 1, 1, true, false, "t3.large", nil, nil),
}
eoM44XLarge = expander.Option{
Debug: "m4.4xlarge",
NodeGroup: test.NewTestNodeGroup("my-asg.m4.4xlarge", 10, 1, 1, true, false, "m4.4xlarge", nil, nil),
}
options = []expander.Option{eoT2Micro, eoT2Large, eoT3Large, eoM44XLarge}
grpcEoT2Micro = protos.Option{
NodeGroupId: eoT2Micro.NodeGroup.Id(),
NodeCount: int32(eoT2Micro.NodeCount),
Debug: eoT2Micro.Debug,
Pod: eoT2Micro.Pods,
}
grpcEoT2Large = protos.Option{
NodeGroupId: eoT2Large.NodeGroup.Id(),
NodeCount: int32(eoT2Large.NodeCount),
Debug: eoT2Large.Debug,
Pod: eoT2Large.Pods,
}
grpcEoT3Large = protos.Option{
NodeGroupId: eoT3Large.NodeGroup.Id(),
NodeCount: int32(eoT3Large.NodeCount),
Debug: eoT3Large.Debug,
Pod: eoT3Large.Pods,
}
grpcEoM44XLarge = protos.Option{
NodeGroupId: eoM44XLarge.NodeGroup.Id(),
NodeCount: int32(eoM44XLarge.NodeCount),
Debug: eoM44XLarge.Debug,
Pod: eoM44XLarge.Pods,
}
)
func TestPopulateOptionsForGrpc(t *testing.T) {
testCases := []struct {
desc string
opts []expander.Option
expectedOpts []*protos.Option
expectedMap map[string]expander.Option
}{
{
desc: "empty options",
opts: []expander.Option{},
expectedOpts: []*protos.Option{},
expectedMap: map[string]expander.Option{},
},
{
desc: "one option",
opts: []expander.Option{eoT2Micro},
expectedOpts: []*protos.Option{&grpcEoT2Micro},
expectedMap: map[string]expander.Option{eoT2Micro.NodeGroup.Id(): eoT2Micro},
},
{
desc: "many options",
opts: options,
expectedOpts: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge},
expectedMap: map[string]expander.Option{
eoT2Micro.NodeGroup.Id(): eoT2Micro,
eoT2Large.NodeGroup.Id(): eoT2Large,
eoT3Large.NodeGroup.Id(): eoT3Large,
eoM44XLarge.NodeGroup.Id(): eoM44XLarge,
},
},
}
for _, tc := range testCases {
grpcOptionsSlice, nodeGroupIDOptionMap := populateOptionsForGRPC(tc.opts)
assert.Equal(t, tc.expectedOpts, grpcOptionsSlice)
assert.Equal(t, tc.expectedMap, nodeGroupIDOptionMap)
}
}
func makeFakeNodeInfos() map[string]*schedulerframework.NodeInfo {
nodeInfos := make(map[string]*schedulerframework.NodeInfo)
for i, opt := range options {
nodeInfo := schedulerframework.NewNodeInfo()
nodeInfo.SetNode(nodes[i])
nodeInfos[opt.NodeGroup.Id()] = nodeInfo
}
return nodeInfos
}
func TestPopulateNodeInfoForGRPC(t *testing.T) {
nodeInfos := makeFakeNodeInfos()
grpcNodeInfoMap := populateNodeInfoForGRPC(nodeInfos)
expectedGrpcNodeInfoMap := make(map[string]*v1.Node)
for i, opt := range options {
expectedGrpcNodeInfoMap[opt.NodeGroup.Id()] = nodes[i]
}
assert.Equal(t, expectedGrpcNodeInfoMap, grpcNodeInfoMap)
}
func TestValidTransformAndSanitizeOptionsFromGRPC(t *testing.T) {
responseOptionsSlice := []*protos.Option{&grpcEoT2Micro, &grpcEoT3Large, &grpcEoM44XLarge}
nodeGroupIDOptionMap := map[string]expander.Option{
eoT2Micro.NodeGroup.Id(): eoT2Micro,
eoT2Large.NodeGroup.Id(): eoT2Large,
eoT3Large.NodeGroup.Id(): eoT3Large,
eoM44XLarge.NodeGroup.Id(): eoM44XLarge,
}
expectedOptions := []expander.Option{eoT2Micro, eoT3Large, eoM44XLarge}
ret := transformAndSanitizeOptionsFromGRPC(responseOptionsSlice, nodeGroupIDOptionMap)
assert.Equal(t, expectedOptions, ret)
}
func TestAnInvalidTransformAndSanitizeOptionsFromGRPC(t *testing.T) {
responseOptionsSlice := []*protos.Option{&grpcEoT2Micro, &grpcEoT3Large, &grpcEoM44XLarge}
nodeGroupIDOptionMap := map[string]expander.Option{
eoT2Micro.NodeGroup.Id(): eoT2Micro,
eoT2Large.NodeGroup.Id(): eoT2Large,
eoT3Large.NodeGroup.Id(): eoT3Large,
}
ret := transformAndSanitizeOptionsFromGRPC(responseOptionsSlice, nodeGroupIDOptionMap)
assert.Equal(t, []expander.Option{eoT2Micro, eoT3Large}, ret)
}
func TestBestOptionsValid(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockClient := mocks.NewMockExpanderClient(ctrl)
g := &grpcclientstrategy{mockClient}
nodeInfos := makeFakeNodeInfos()
grpcNodeInfoMap := make(map[string]*v1.Node)
for i, opt := range options {
grpcNodeInfoMap[opt.NodeGroup.Id()] = nodes[i]
}
expectedBestOptionsReq := &protos.BestOptionsRequest{
Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge},
NodeMap: grpcNodeInfoMap,
}
mockClient.EXPECT().BestOptions(
gomock.Any(), gomock.Eq(expectedBestOptionsReq),
).Return(&protos.BestOptionsResponse{Options: []*protos.Option{&grpcEoT3Large}}, nil)
resp := g.BestOptions(options, nodeInfos)
assert.Equal(t, resp, []expander.Option{eoT3Large})
}
// All test cases should error, and no options should be filtered
func TestBestOptionsErrors(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockClient := mocks.NewMockExpanderClient(ctrl)
g := grpcclientstrategy{mockClient}
badProtosOption := protos.Option{
NodeGroupId: "badID",
NodeCount: int32(eoM44XLarge.NodeCount),
Debug: eoM44XLarge.Debug,
Pod: eoM44XLarge.Pods,
}
testCases := []struct {
desc string
client grpcclientstrategy
nodeInfo map[string]*schedulerframework.NodeInfo
mockResponse protos.BestOptionsResponse
errResponse error
}{
{
desc: "Bad gRPC client config",
client: grpcclientstrategy{nil},
nodeInfo: makeFakeNodeInfos(),
mockResponse: protos.BestOptionsResponse{},
errResponse: nil,
},
{
desc: "gRPC error response",
client: g,
nodeInfo: makeFakeNodeInfos(),
mockResponse: protos.BestOptionsResponse{},
errResponse: errors.New("timeout error"),
},
{
desc: "bad bestOptions response",
client: g,
nodeInfo: makeFakeNodeInfos(),
mockResponse: protos.BestOptionsResponse{},
errResponse: nil,
},
{
desc: "bad bestOptions response, options nil",
client: g,
nodeInfo: makeFakeNodeInfos(),
mockResponse: protos.BestOptionsResponse{Options: nil},
errResponse: nil,
},
{
desc: "bad bestOptions response, options invalid - nil",
client: g,
nodeInfo: makeFakeNodeInfos(),
mockResponse: protos.BestOptionsResponse{Options: []*protos.Option{&grpcEoT2Micro, nil, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}},
errResponse: nil,
},
{
desc: "bad bestOptions response, options invalid - nonExistent nodeID",
client: g,
nodeInfo: makeFakeNodeInfos(),
mockResponse: protos.BestOptionsResponse{Options: []*protos.Option{&grpcEoT2Micro, &badProtosOption, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}},
errResponse: nil,
},
}
for _, tc := range testCases {
grpcNodeInfoMap := populateNodeInfoForGRPC(tc.nodeInfo)
mockClient.EXPECT().BestOptions(
gomock.Any(), gomock.Eq(
&protos.BestOptionsRequest{
Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge},
NodeMap: grpcNodeInfoMap,
})).Return(&tc.mockResponse, tc.errResponse)
resp := g.BestOptions(options, tc.nodeInfo)
assert.Equal(t, resp, options)
}
}

View File

@ -0,0 +1,440 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package protos
import (
context "context"
reflect "reflect"
sync "sync"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
v1 "k8s.io/api/core/v1"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type BestOptionsRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Options []*Option `protobuf:"bytes,1,rep,name=options,proto3" json:"options,omitempty"`
// key is node id from options
NodeMap map[string]*v1.Node `protobuf:"bytes,2,rep,name=nodeMap,proto3" json:"nodeMap,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (x *BestOptionsRequest) Reset() {
*x = BestOptionsRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *BestOptionsRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*BestOptionsRequest) ProtoMessage() {}
func (x *BestOptionsRequest) ProtoReflect() protoreflect.Message {
mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use BestOptionsRequest.ProtoReflect.Descriptor instead.
func (*BestOptionsRequest) Descriptor() ([]byte, []int) {
return file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescGZIP(), []int{0}
}
func (x *BestOptionsRequest) GetOptions() []*Option {
if x != nil {
return x.Options
}
return nil
}
func (x *BestOptionsRequest) GetNodeMap() map[string]*v1.Node {
if x != nil {
return x.NodeMap
}
return nil
}
type BestOptionsResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Options []*Option `protobuf:"bytes,1,rep,name=options,proto3" json:"options,omitempty"`
}
func (x *BestOptionsResponse) Reset() {
*x = BestOptionsResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *BestOptionsResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*BestOptionsResponse) ProtoMessage() {}
func (x *BestOptionsResponse) ProtoReflect() protoreflect.Message {
mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use BestOptionsResponse.ProtoReflect.Descriptor instead.
func (*BestOptionsResponse) Descriptor() ([]byte, []int) {
return file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescGZIP(), []int{1}
}
func (x *BestOptionsResponse) GetOptions() []*Option {
if x != nil {
return x.Options
}
return nil
}
type Option struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// only need the ID of node to uniquely identify the nodeGroup, used in the nodeInfo map.
NodeGroupId string `protobuf:"bytes,1,opt,name=nodeGroupId,proto3" json:"nodeGroupId,omitempty"`
NodeCount int32 `protobuf:"varint,2,opt,name=nodeCount,proto3" json:"nodeCount,omitempty"`
Debug string `protobuf:"bytes,3,opt,name=debug,proto3" json:"debug,omitempty"`
Pod []*v1.Pod `protobuf:"bytes,4,rep,name=pod,proto3" json:"pod,omitempty"`
}
func (x *Option) Reset() {
*x = Option{}
if protoimpl.UnsafeEnabled {
mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Option) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Option) ProtoMessage() {}
func (x *Option) ProtoReflect() protoreflect.Message {
mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Option.ProtoReflect.Descriptor instead.
func (*Option) Descriptor() ([]byte, []int) {
return file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescGZIP(), []int{2}
}
func (x *Option) GetNodeGroupId() string {
if x != nil {
return x.NodeGroupId
}
return ""
}
func (x *Option) GetNodeCount() int32 {
if x != nil {
return x.NodeCount
}
return 0
}
func (x *Option) GetDebug() string {
if x != nil {
return x.Debug
}
return ""
}
func (x *Option) GetPod() []*v1.Pod {
if x != nil {
return x.Pod
}
return nil
}
var File_cluster_autoscaler_expander_grpcplugin_protos_expander_proto protoreflect.FileDescriptor
var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc = []byte{
0x0a, 0x3c, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2d, 0x61, 0x75, 0x74, 0x6f, 0x73, 0x63,
0x61, 0x6c, 0x65, 0x72, 0x2f, 0x65, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x2f, 0x67, 0x72,
0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f,
0x65, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a,
0x67, 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x1a, 0x22, 0x6b, 0x38, 0x73, 0x2e,
0x69, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x67,
0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xdf,
0x01, 0x0a, 0x12, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75,
0x67, 0x69, 0x6e, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69,
0x6f, 0x6e, 0x73, 0x12, 0x45, 0x0a, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x70, 0x18, 0x02,
0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69,
0x6e, 0x2e, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72,
0x79, 0x52, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x70, 0x1a, 0x54, 0x0a, 0x0c, 0x4e, 0x6f,
0x64, 0x65, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65,
0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6b, 0x38,
0x73, 0x2e, 0x69, 0x6f, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31,
0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01,
0x22, 0x43, 0x0a, 0x13, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f,
0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x70,
0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x6f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x89, 0x01, 0x0a, 0x06, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e,
0x12, 0x20, 0x0a, 0x0b, 0x6e, 0x6f, 0x64, 0x65, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x49, 0x64, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6e, 0x6f, 0x64, 0x65, 0x47, 0x72, 0x6f, 0x75, 0x70,
0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18,
0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74,
0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x62, 0x75, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x05, 0x64, 0x65, 0x62, 0x75, 0x67, 0x12, 0x29, 0x0a, 0x03, 0x70, 0x6f, 0x64, 0x18, 0x04, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6b, 0x38, 0x73, 0x2e, 0x69, 0x6f, 0x2e, 0x61, 0x70, 0x69,
0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x6f, 0x64, 0x52, 0x03, 0x70, 0x6f,
0x64, 0x32, 0x5c, 0x0a, 0x08, 0x45, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x12, 0x50, 0x0a,
0x0b, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1e, 0x2e, 0x67,
0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x67,
0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42,
0x2f, 0x5a, 0x2d, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2d, 0x61, 0x75, 0x74, 0x6f, 0x73,
0x63, 0x61, 0x6c, 0x65, 0x72, 0x2f, 0x65, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x2f, 0x67,
0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescOnce sync.Once
file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescData = file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc
)
func file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescGZIP() []byte {
file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescOnce.Do(func() {
file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescData = protoimpl.X.CompressGZIP(file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescData)
})
return file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescData
}
var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_goTypes = []interface{}{
(*BestOptionsRequest)(nil), // 0: grpcplugin.BestOptionsRequest
(*BestOptionsResponse)(nil), // 1: grpcplugin.BestOptionsResponse
(*Option)(nil), // 2: grpcplugin.Option
nil, // 3: grpcplugin.BestOptionsRequest.NodeMapEntry
(*v1.Pod)(nil), // 4: k8s.io.api.core.v1.Pod
(*v1.Node)(nil), // 5: k8s.io.api.core.v1.Node
}
var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_depIdxs = []int32{
2, // 0: grpcplugin.BestOptionsRequest.options:type_name -> grpcplugin.Option
3, // 1: grpcplugin.BestOptionsRequest.nodeMap:type_name -> grpcplugin.BestOptionsRequest.NodeMapEntry
2, // 2: grpcplugin.BestOptionsResponse.options:type_name -> grpcplugin.Option
4, // 3: grpcplugin.Option.pod:type_name -> k8s.io.api.core.v1.Pod
5, // 4: grpcplugin.BestOptionsRequest.NodeMapEntry.value:type_name -> k8s.io.api.core.v1.Node
0, // 5: grpcplugin.Expander.BestOptions:input_type -> grpcplugin.BestOptionsRequest
1, // 6: grpcplugin.Expander.BestOptions:output_type -> grpcplugin.BestOptionsResponse
6, // [6:7] is the sub-list for method output_type
5, // [5:6] is the sub-list for method input_type
5, // [5:5] is the sub-list for extension type_name
5, // [5:5] is the sub-list for extension extendee
0, // [0:5] is the sub-list for field type_name
}
func init() { file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_init() }
func file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_init() {
if File_cluster_autoscaler_expander_grpcplugin_protos_expander_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*BestOptionsRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*BestOptionsResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Option); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_goTypes,
DependencyIndexes: file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_depIdxs,
MessageInfos: file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes,
}.Build()
File_cluster_autoscaler_expander_grpcplugin_protos_expander_proto = out.File
file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc = nil
file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_goTypes = nil
file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_depIdxs = nil
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// ExpanderClient is the client API for Expander service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type ExpanderClient interface {
BestOptions(ctx context.Context, in *BestOptionsRequest, opts ...grpc.CallOption) (*BestOptionsResponse, error)
}
type expanderClient struct {
cc grpc.ClientConnInterface
}
func NewExpanderClient(cc grpc.ClientConnInterface) ExpanderClient {
return &expanderClient{cc}
}
func (c *expanderClient) BestOptions(ctx context.Context, in *BestOptionsRequest, opts ...grpc.CallOption) (*BestOptionsResponse, error) {
out := new(BestOptionsResponse)
err := c.cc.Invoke(ctx, "/grpcplugin.Expander/BestOptions", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ExpanderServer is the server API for Expander service.
type ExpanderServer interface {
BestOptions(context.Context, *BestOptionsRequest) (*BestOptionsResponse, error)
}
// UnimplementedExpanderServer can be embedded to have forward compatible implementations.
type UnimplementedExpanderServer struct {
}
func (*UnimplementedExpanderServer) BestOptions(context.Context, *BestOptionsRequest) (*BestOptionsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method BestOptions not implemented")
}
func RegisterExpanderServer(s *grpc.Server, srv ExpanderServer) {
s.RegisterService(&_Expander_serviceDesc, srv)
}
func _Expander_BestOptions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BestOptionsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ExpanderServer).BestOptions(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/grpcplugin.Expander/BestOptions",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ExpanderServer).BestOptions(ctx, req.(*BestOptionsRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Expander_serviceDesc = grpc.ServiceDesc{
ServiceName: "grpcplugin.Expander",
HandlerType: (*ExpanderServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "BestOptions",
Handler: _Expander_BestOptions_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "cluster-autoscaler/expander/grpcplugin/protos/expander.proto",
}

View File

@ -0,0 +1,30 @@
syntax = "proto3";
package grpcplugin;
import "k8s.io/api/core/v1/generated.proto";
option go_package = "cluster-autoscaler/expander/grpcplugin/protos";
// Interface for Expander
service Expander {
rpc BestOptions (BestOptionsRequest)
returns (BestOptionsResponse) {}
}
message BestOptionsRequest {
repeated Option options = 1;
// key is node id from options
map<string, k8s.io.api.core.v1.Node> nodeMap = 2;
}
message BestOptionsResponse {
repeated Option options = 1;
}
message Option {
// only need the ID of node to uniquely identify the nodeGroup, used in the nodeInfo map.
string nodeGroupId = 1;
int32 nodeCount = 2;
string debug = 3;
repeated k8s.io.api.core.v1.Pod pod = 4;
}

View File

@ -0,0 +1,107 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mocks
import (
context "context"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
grpc "google.golang.org/grpc"
"k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos"
)
// MockExpanderClient is a mock of ExpanderClient interface.
type MockExpanderClient struct {
ctrl *gomock.Controller
recorder *MockExpanderClientMockRecorder
}
// MockExpanderClientMockRecorder is the mock recorder for MockExpanderClient.
type MockExpanderClientMockRecorder struct {
mock *MockExpanderClient
}
// NewMockExpanderClient creates a new mock instance.
func NewMockExpanderClient(ctrl *gomock.Controller) *MockExpanderClient {
mock := &MockExpanderClient{ctrl: ctrl}
mock.recorder = &MockExpanderClientMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockExpanderClient) EXPECT() *MockExpanderClientMockRecorder {
return m.recorder
}
// BestOptions mocks base method.
func (m *MockExpanderClient) BestOptions(ctx context.Context, in *protos.BestOptionsRequest, opts ...grpc.CallOption) (*protos.BestOptionsResponse, error) {
m.ctrl.T.Helper()
varargs := []interface{}{ctx, in}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "BestOptions", varargs...)
ret0, _ := ret[0].(*protos.BestOptionsResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// BestOptions indicates an expected call of BestOptions.
func (mr *MockExpanderClientMockRecorder) BestOptions(ctx, in interface{}, opts ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{ctx, in}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BestOptions", reflect.TypeOf((*MockExpanderClient)(nil).BestOptions), varargs...)
}
// MockExpanderServer is a mock of ExpanderServer interface.
type MockExpanderServer struct {
ctrl *gomock.Controller
recorder *MockExpanderServerMockRecorder
}
// MockExpanderServerMockRecorder is the mock recorder for MockExpanderServer.
type MockExpanderServerMockRecorder struct {
mock *MockExpanderServer
}
// NewMockExpanderServer creates a new mock instance.
func NewMockExpanderServer(ctrl *gomock.Controller) *MockExpanderServer {
mock := &MockExpanderServer{ctrl: ctrl}
mock.recorder = &MockExpanderServerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockExpanderServer) EXPECT() *MockExpanderServerMockRecorder {
return m.recorder
}
// BestOptions mocks base method.
func (m *MockExpanderServer) BestOptions(arg0 context.Context, arg1 *protos.BestOptionsRequest) (*protos.BestOptionsResponse, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "BestOptions", arg0, arg1)
ret0, _ := ret[0].(*protos.BestOptionsResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// BestOptions indicates an expected call of BestOptions.
func (mr *MockExpanderServerMockRecorder) BestOptions(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BestOptions", reflect.TypeOf((*MockExpanderServer)(nil).BestOptions), arg0, arg1)
}

View File

@ -26,6 +26,8 @@ require (
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f
google.golang.org/api v0.46.0
google.golang.org/grpc v1.40.0
google.golang.org/protobuf v1.27.1
gopkg.in/gcfg.v1 v1.2.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.24.0-alpha.2

View File

@ -155,6 +155,9 @@ var (
expanderFlag = flag.String("expander", expander.RandomExpanderName, "Type of node group expander to be used in scale up. Available values: ["+strings.Join(expander.AvailableExpanders, ",")+"]. Specifying multiple values separated by commas will call the expanders in succession until there is only one option remaining. Ties still existing after this process are broken randomly.")
grpcExpanderCert = flag.String("grpc-expander-cert", "", "Path to cert used by gRPC server over TLS")
grpcExpanderURL = flag.String("grpc-expander-url", "", "URL to reach gRPC expander server.")
ignoreDaemonSetsUtilization = flag.Bool("ignore-daemonsets-utilization", false,
"Should CA ignore DaemonSet pods when calculating resource utilization for scaling down")
ignoreMirrorPodsUtilization = flag.Bool("ignore-mirror-pods-utilization", false,
@ -221,6 +224,8 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ScaleUpFromZero: *scaleUpFromZero,
EstimatorName: *estimatorFlag,
ExpanderNames: *expanderFlag,
GRPCExpanderCert: *grpcExpanderCert,
GRPCExpanderURL: *grpcExpanderURL,
IgnoreDaemonSetsUtilization: *ignoreDaemonSetsUtilization,
IgnoreMirrorPodsUtilization: *ignoreMirrorPodsUtilization,
MaxBulkSoftTaintCount: *maxBulkSoftTaintCount,

View File

@ -0,0 +1,495 @@
// Copyright 2012 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package model contains the data model necessary for generating mock implementations.
package model
import (
"encoding/gob"
"fmt"
"io"
"reflect"
"strings"
)
// pkgPath is the importable path for package model
const pkgPath = "github.com/golang/mock/mockgen/model"
// Package is a Go package. It may be a subset.
type Package struct {
Name string
PkgPath string
Interfaces []*Interface
DotImports []string
}
// Print writes the package name and its exported interfaces.
func (pkg *Package) Print(w io.Writer) {
_, _ = fmt.Fprintf(w, "package %s\n", pkg.Name)
for _, intf := range pkg.Interfaces {
intf.Print(w)
}
}
// Imports returns the imports needed by the Package as a set of import paths.
func (pkg *Package) Imports() map[string]bool {
im := make(map[string]bool)
for _, intf := range pkg.Interfaces {
intf.addImports(im)
}
return im
}
// Interface is a Go interface.
type Interface struct {
Name string
Methods []*Method
}
// Print writes the interface name and its methods.
func (intf *Interface) Print(w io.Writer) {
_, _ = fmt.Fprintf(w, "interface %s\n", intf.Name)
for _, m := range intf.Methods {
m.Print(w)
}
}
func (intf *Interface) addImports(im map[string]bool) {
for _, m := range intf.Methods {
m.addImports(im)
}
}
// AddMethod adds a new method, de-duplicating by method name.
func (intf *Interface) AddMethod(m *Method) {
for _, me := range intf.Methods {
if me.Name == m.Name {
return
}
}
intf.Methods = append(intf.Methods, m)
}
// Method is a single method of an interface.
type Method struct {
Name string
In, Out []*Parameter
Variadic *Parameter // may be nil
}
// Print writes the method name and its signature.
func (m *Method) Print(w io.Writer) {
_, _ = fmt.Fprintf(w, " - method %s\n", m.Name)
if len(m.In) > 0 {
_, _ = fmt.Fprintf(w, " in:\n")
for _, p := range m.In {
p.Print(w)
}
}
if m.Variadic != nil {
_, _ = fmt.Fprintf(w, " ...:\n")
m.Variadic.Print(w)
}
if len(m.Out) > 0 {
_, _ = fmt.Fprintf(w, " out:\n")
for _, p := range m.Out {
p.Print(w)
}
}
}
func (m *Method) addImports(im map[string]bool) {
for _, p := range m.In {
p.Type.addImports(im)
}
if m.Variadic != nil {
m.Variadic.Type.addImports(im)
}
for _, p := range m.Out {
p.Type.addImports(im)
}
}
// Parameter is an argument or return parameter of a method.
type Parameter struct {
Name string // may be empty
Type Type
}
// Print writes a method parameter.
func (p *Parameter) Print(w io.Writer) {
n := p.Name
if n == "" {
n = `""`
}
_, _ = fmt.Fprintf(w, " - %v: %v\n", n, p.Type.String(nil, ""))
}
// Type is a Go type.
type Type interface {
String(pm map[string]string, pkgOverride string) string
addImports(im map[string]bool)
}
func init() {
gob.Register(&ArrayType{})
gob.Register(&ChanType{})
gob.Register(&FuncType{})
gob.Register(&MapType{})
gob.Register(&NamedType{})
gob.Register(&PointerType{})
// Call gob.RegisterName to make sure it has the consistent name registered
// for both gob decoder and encoder.
//
// For a non-pointer type, gob.Register will try to get package full path by
// calling rt.PkgPath() for a name to register. If your project has vendor
// directory, it is possible that PkgPath will get a path like this:
// ../../../vendor/github.com/golang/mock/mockgen/model
gob.RegisterName(pkgPath+".PredeclaredType", PredeclaredType(""))
}
// ArrayType is an array or slice type.
type ArrayType struct {
Len int // -1 for slices, >= 0 for arrays
Type Type
}
func (at *ArrayType) String(pm map[string]string, pkgOverride string) string {
s := "[]"
if at.Len > -1 {
s = fmt.Sprintf("[%d]", at.Len)
}
return s + at.Type.String(pm, pkgOverride)
}
func (at *ArrayType) addImports(im map[string]bool) { at.Type.addImports(im) }
// ChanType is a channel type.
type ChanType struct {
Dir ChanDir // 0, 1 or 2
Type Type
}
func (ct *ChanType) String(pm map[string]string, pkgOverride string) string {
s := ct.Type.String(pm, pkgOverride)
if ct.Dir == RecvDir {
return "<-chan " + s
}
if ct.Dir == SendDir {
return "chan<- " + s
}
return "chan " + s
}
func (ct *ChanType) addImports(im map[string]bool) { ct.Type.addImports(im) }
// ChanDir is a channel direction.
type ChanDir int
// Constants for channel directions.
const (
RecvDir ChanDir = 1
SendDir ChanDir = 2
)
// FuncType is a function type.
type FuncType struct {
In, Out []*Parameter
Variadic *Parameter // may be nil
}
func (ft *FuncType) String(pm map[string]string, pkgOverride string) string {
args := make([]string, len(ft.In))
for i, p := range ft.In {
args[i] = p.Type.String(pm, pkgOverride)
}
if ft.Variadic != nil {
args = append(args, "..."+ft.Variadic.Type.String(pm, pkgOverride))
}
rets := make([]string, len(ft.Out))
for i, p := range ft.Out {
rets[i] = p.Type.String(pm, pkgOverride)
}
retString := strings.Join(rets, ", ")
if nOut := len(ft.Out); nOut == 1 {
retString = " " + retString
} else if nOut > 1 {
retString = " (" + retString + ")"
}
return "func(" + strings.Join(args, ", ") + ")" + retString
}
func (ft *FuncType) addImports(im map[string]bool) {
for _, p := range ft.In {
p.Type.addImports(im)
}
if ft.Variadic != nil {
ft.Variadic.Type.addImports(im)
}
for _, p := range ft.Out {
p.Type.addImports(im)
}
}
// MapType is a map type.
type MapType struct {
Key, Value Type
}
func (mt *MapType) String(pm map[string]string, pkgOverride string) string {
return "map[" + mt.Key.String(pm, pkgOverride) + "]" + mt.Value.String(pm, pkgOverride)
}
func (mt *MapType) addImports(im map[string]bool) {
mt.Key.addImports(im)
mt.Value.addImports(im)
}
// NamedType is an exported type in a package.
type NamedType struct {
Package string // may be empty
Type string
}
func (nt *NamedType) String(pm map[string]string, pkgOverride string) string {
if pkgOverride == nt.Package {
return nt.Type
}
prefix := pm[nt.Package]
if prefix != "" {
return prefix + "." + nt.Type
}
return nt.Type
}
func (nt *NamedType) addImports(im map[string]bool) {
if nt.Package != "" {
im[nt.Package] = true
}
}
// PointerType is a pointer to another type.
type PointerType struct {
Type Type
}
func (pt *PointerType) String(pm map[string]string, pkgOverride string) string {
return "*" + pt.Type.String(pm, pkgOverride)
}
func (pt *PointerType) addImports(im map[string]bool) { pt.Type.addImports(im) }
// PredeclaredType is a predeclared type such as "int".
type PredeclaredType string
func (pt PredeclaredType) String(map[string]string, string) string { return string(pt) }
func (pt PredeclaredType) addImports(map[string]bool) {}
// The following code is intended to be called by the program generated by ../reflect.go.
// InterfaceFromInterfaceType returns a pointer to an interface for the
// given reflection interface type.
func InterfaceFromInterfaceType(it reflect.Type) (*Interface, error) {
if it.Kind() != reflect.Interface {
return nil, fmt.Errorf("%v is not an interface", it)
}
intf := &Interface{}
for i := 0; i < it.NumMethod(); i++ {
mt := it.Method(i)
// TODO: need to skip unexported methods? or just raise an error?
m := &Method{
Name: mt.Name,
}
var err error
m.In, m.Variadic, m.Out, err = funcArgsFromType(mt.Type)
if err != nil {
return nil, err
}
intf.AddMethod(m)
}
return intf, nil
}
// t's Kind must be a reflect.Func.
func funcArgsFromType(t reflect.Type) (in []*Parameter, variadic *Parameter, out []*Parameter, err error) {
nin := t.NumIn()
if t.IsVariadic() {
nin--
}
var p *Parameter
for i := 0; i < nin; i++ {
p, err = parameterFromType(t.In(i))
if err != nil {
return
}
in = append(in, p)
}
if t.IsVariadic() {
p, err = parameterFromType(t.In(nin).Elem())
if err != nil {
return
}
variadic = p
}
for i := 0; i < t.NumOut(); i++ {
p, err = parameterFromType(t.Out(i))
if err != nil {
return
}
out = append(out, p)
}
return
}
func parameterFromType(t reflect.Type) (*Parameter, error) {
tt, err := typeFromType(t)
if err != nil {
return nil, err
}
return &Parameter{Type: tt}, nil
}
var errorType = reflect.TypeOf((*error)(nil)).Elem()
var byteType = reflect.TypeOf(byte(0))
func typeFromType(t reflect.Type) (Type, error) {
// Hack workaround for https://golang.org/issue/3853.
// This explicit check should not be necessary.
if t == byteType {
return PredeclaredType("byte"), nil
}
if imp := t.PkgPath(); imp != "" {
return &NamedType{
Package: impPath(imp),
Type: t.Name(),
}, nil
}
// only unnamed or predeclared types after here
// Lots of types have element types. Let's do the parsing and error checking for all of them.
var elemType Type
switch t.Kind() {
case reflect.Array, reflect.Chan, reflect.Map, reflect.Ptr, reflect.Slice:
var err error
elemType, err = typeFromType(t.Elem())
if err != nil {
return nil, err
}
}
switch t.Kind() {
case reflect.Array:
return &ArrayType{
Len: t.Len(),
Type: elemType,
}, nil
case reflect.Bool, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr,
reflect.Float32, reflect.Float64, reflect.Complex64, reflect.Complex128, reflect.String:
return PredeclaredType(t.Kind().String()), nil
case reflect.Chan:
var dir ChanDir
switch t.ChanDir() {
case reflect.RecvDir:
dir = RecvDir
case reflect.SendDir:
dir = SendDir
}
return &ChanType{
Dir: dir,
Type: elemType,
}, nil
case reflect.Func:
in, variadic, out, err := funcArgsFromType(t)
if err != nil {
return nil, err
}
return &FuncType{
In: in,
Out: out,
Variadic: variadic,
}, nil
case reflect.Interface:
// Two special interfaces.
if t.NumMethod() == 0 {
return PredeclaredType("interface{}"), nil
}
if t == errorType {
return PredeclaredType("error"), nil
}
case reflect.Map:
kt, err := typeFromType(t.Key())
if err != nil {
return nil, err
}
return &MapType{
Key: kt,
Value: elemType,
}, nil
case reflect.Ptr:
return &PointerType{
Type: elemType,
}, nil
case reflect.Slice:
return &ArrayType{
Len: -1,
Type: elemType,
}, nil
case reflect.Struct:
if t.NumField() == 0 {
return PredeclaredType("struct{}"), nil
}
}
// TODO: Struct, UnsafePointer
return nil, fmt.Errorf("can't yet turn %v (%v) into a model.Type", t, t.Kind())
}
// impPath sanitizes the package path returned by `PkgPath` method of a reflect Type so that
// it is importable. PkgPath might return a path that includes "vendor". These paths do not
// compile, so we need to remove everything up to and including "/vendor/".
// See https://github.com/golang/go/issues/12019.
func impPath(imp string) string {
if strings.HasPrefix(imp, "vendor/") {
imp = "/" + imp
}
if i := strings.LastIndex(imp, "/vendor/"); i != -1 {
imp = imp[i+len("/vendor/"):]
}
return imp
}
// ErrorInterface represent built-in error interface.
var ErrorInterface = Interface{
Name: "error",
Methods: []*Method{
{
Name: "Error",
Out: []*Parameter{
{
Name: "",
Type: PredeclaredType("string"),
},
},
},
},
}

View File

@ -239,6 +239,7 @@ github.com/golang/groupcache/lru
# github.com/golang/mock v1.6.0
## explicit
github.com/golang/mock/gomock
github.com/golang/mock/mockgen/model
# github.com/golang/protobuf v1.5.2
github.com/golang/protobuf/descriptor
github.com/golang/protobuf/jsonpb
@ -741,6 +742,7 @@ google.golang.org/genproto/googleapis/api/httpbody
google.golang.org/genproto/googleapis/rpc/status
google.golang.org/genproto/protobuf/field_mask
# google.golang.org/grpc v1.40.0
## explicit
google.golang.org/grpc
google.golang.org/grpc/attributes
google.golang.org/grpc/backoff
@ -789,6 +791,7 @@ google.golang.org/grpc/stats
google.golang.org/grpc/status
google.golang.org/grpc/tap
# google.golang.org/protobuf v1.27.1
## explicit
google.golang.org/protobuf/encoding/protojson
google.golang.org/protobuf/encoding/prototext
google.golang.org/protobuf/encoding/protowire

View File

@ -36,6 +36,7 @@ excluded_packages=(
'cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud-sdk-go-v3'
'cluster-autoscaler/cloudprovider/ionoscloud/ionos-cloud-sdk-go'
'cluster-autoscaler/cloudprovider/hetzner/hcloud-go'
'cluster-autoscaler/expander/grpcplugin/protos'
)
FIND_PACKAGES='go list ./... '