Merge pull request #909 from Garrybest/pr_estimator_debug
add object reference information when calling gRPC
This commit is contained in:
commit
a53f673599
|
@ -6,6 +6,8 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
|
||||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||||
"github.com/karmada-io/karmada/pkg/estimator/pb"
|
"github.com/karmada-io/karmada/pkg/estimator/pb"
|
||||||
|
@ -34,8 +36,8 @@ func NewSchedulerEstimator(cache *SchedulerEstimatorCache, timeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// MaxAvailableReplicas estimates the maximum replicas that can be applied to the target cluster by calling karmada-scheduler-estimator.
|
// MaxAvailableReplicas estimates the maximum replicas that can be applied to the target cluster by calling karmada-scheduler-estimator.
|
||||||
func (se *SchedulerEstimator) MaxAvailableReplicas(clusters []*clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) ([]workv1alpha2.TargetCluster, error) {
|
func (se *SchedulerEstimator) MaxAvailableReplicas(parentCtx context.Context, clusters []*clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) ([]workv1alpha2.TargetCluster, error) {
|
||||||
return getClusterReplicasConcurrently(clusters, se.timeout, func(ctx context.Context, cluster string) (int32, error) {
|
return getClusterReplicasConcurrently(parentCtx, clusters, se.timeout, func(ctx context.Context, cluster string) (int32, error) {
|
||||||
return se.maxAvailableReplicas(ctx, cluster, replicaRequirements.DeepCopy())
|
return se.maxAvailableReplicas(ctx, cluster, replicaRequirements.DeepCopy())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -66,11 +68,17 @@ func (se *SchedulerEstimator) maxAvailableReplicas(ctx context.Context, cluster
|
||||||
return res.MaxReplicas, nil
|
return res.MaxReplicas, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getClusterReplicasConcurrently(clusters []*clusterv1alpha1.Cluster, timeout time.Duration, getClusterReplicas getClusterReplicasFunc) ([]workv1alpha2.TargetCluster, error) {
|
func getClusterReplicasConcurrently(parentCtx context.Context, clusters []*clusterv1alpha1.Cluster,
|
||||||
availableTargetClusters := make([]workv1alpha2.TargetCluster, len(clusters))
|
timeout time.Duration, getClusterReplicas getClusterReplicasFunc) ([]workv1alpha2.TargetCluster, error) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
// add object information into gRPC metadata
|
||||||
|
if u, ok := parentCtx.Value(util.ContextKeyObject).(string); ok {
|
||||||
|
parentCtx = metadata.AppendToOutgoingContext(parentCtx, string(util.ContextKeyObject), u)
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(parentCtx, timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
availableTargetClusters := make([]workv1alpha2.TargetCluster, len(clusters))
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
errChan := make(chan error, len(clusters))
|
errChan := make(chan error, len(clusters))
|
||||||
for i := range clusters {
|
for i := range clusters {
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
|
||||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||||
|
@ -21,7 +23,7 @@ func NewGeneralEstimator() *GeneralEstimator {
|
||||||
}
|
}
|
||||||
|
|
||||||
// MaxAvailableReplicas estimates the maximum replicas that can be applied to the target cluster by cluster ResourceSummary.
|
// MaxAvailableReplicas estimates the maximum replicas that can be applied to the target cluster by cluster ResourceSummary.
|
||||||
func (ge *GeneralEstimator) MaxAvailableReplicas(clusters []*clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) ([]workv1alpha2.TargetCluster, error) {
|
func (ge *GeneralEstimator) MaxAvailableReplicas(ctx context.Context, clusters []*clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) ([]workv1alpha2.TargetCluster, error) {
|
||||||
availableTargetClusters := make([]workv1alpha2.TargetCluster, len(clusters))
|
availableTargetClusters := make([]workv1alpha2.TargetCluster, len(clusters))
|
||||||
for i, cluster := range clusters {
|
for i, cluster := range clusters {
|
||||||
maxReplicas := ge.maxAvailableReplicas(cluster, replicaRequirements)
|
maxReplicas := ge.maxAvailableReplicas(cluster, replicaRequirements)
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||||
)
|
)
|
||||||
|
@ -16,7 +18,7 @@ var (
|
||||||
|
|
||||||
// ReplicaEstimator is an estimator which estimates the maximum replicas that can be applied to the target cluster.
|
// ReplicaEstimator is an estimator which estimates the maximum replicas that can be applied to the target cluster.
|
||||||
type ReplicaEstimator interface {
|
type ReplicaEstimator interface {
|
||||||
MaxAvailableReplicas(clusters []*clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) ([]workv1alpha2.TargetCluster, error)
|
MaxAvailableReplicas(ctx context.Context, clusters []*clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) ([]workv1alpha2.TargetCluster, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetReplicaEstimators returns all replica estimators.
|
// GetReplicaEstimators returns all replica estimators.
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/kr/pretty"
|
"github.com/kr/pretty"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
infov1 "k8s.io/client-go/informers/core/v1"
|
infov1 "k8s.io/client-go/informers/core/v1"
|
||||||
|
@ -20,6 +21,7 @@ import (
|
||||||
"github.com/karmada-io/karmada/pkg/estimator/pb"
|
"github.com/karmada-io/karmada/pkg/estimator/pb"
|
||||||
nodeutil "github.com/karmada-io/karmada/pkg/estimator/server/nodes"
|
nodeutil "github.com/karmada-io/karmada/pkg/estimator/server/nodes"
|
||||||
"github.com/karmada-io/karmada/pkg/estimator/server/replica"
|
"github.com/karmada-io/karmada/pkg/estimator/server/replica"
|
||||||
|
"github.com/karmada-io/karmada/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -132,7 +134,16 @@ func (es *AccurateSchedulerEstimatorServer) Start(ctx context.Context) error {
|
||||||
// MaxAvailableReplicas is the implementation of gRPC interface. It will return the
|
// MaxAvailableReplicas is the implementation of gRPC interface. It will return the
|
||||||
// max available replicas that a cluster could accommodate based on its requirements.
|
// max available replicas that a cluster could accommodate based on its requirements.
|
||||||
func (es *AccurateSchedulerEstimatorServer) MaxAvailableReplicas(ctx context.Context, request *pb.MaxAvailableReplicasRequest) (response *pb.MaxAvailableReplicasResponse, rerr error) {
|
func (es *AccurateSchedulerEstimatorServer) MaxAvailableReplicas(ctx context.Context, request *pb.MaxAvailableReplicasRequest) (response *pb.MaxAvailableReplicasResponse, rerr error) {
|
||||||
defer traceMaxAvailableReplicas(time.Now(), request)(&response, &rerr)
|
md, ok := metadata.FromIncomingContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
klog.Warningf("No metadata from context.")
|
||||||
|
}
|
||||||
|
var object string
|
||||||
|
if m := md.Get(string(util.ContextKeyObject)); len(m) != 0 {
|
||||||
|
object = m[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
defer traceMaxAvailableReplicas(object, time.Now(), request)(&response, &rerr)
|
||||||
|
|
||||||
if request.Cluster != es.clusterName {
|
if request.Cluster != es.clusterName {
|
||||||
return nil, fmt.Errorf("cluster name does not match, got: %s, desire: %s", request.Cluster, es.clusterName)
|
return nil, fmt.Errorf("cluster name does not match, got: %s, desire: %s", request.Cluster, es.clusterName)
|
||||||
|
@ -188,13 +199,13 @@ func (es *AccurateSchedulerEstimatorServer) waitForCacheSync(stopCh <-chan struc
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func traceMaxAvailableReplicas(start time.Time, request *pb.MaxAvailableReplicasRequest) func(response **pb.MaxAvailableReplicasResponse, err *error) {
|
func traceMaxAvailableReplicas(object string, start time.Time, request *pb.MaxAvailableReplicasRequest) func(response **pb.MaxAvailableReplicasResponse, err *error) {
|
||||||
klog.V(4).Infof("Begin calculating cluster available replicas, request: %s", pretty.Sprint(*request))
|
klog.V(4).Infof("Begin calculating cluster available replicas of resource(%s), request: %s", object, pretty.Sprint(*request))
|
||||||
return func(response **pb.MaxAvailableReplicasResponse, err *error) {
|
return func(response **pb.MaxAvailableReplicasResponse, err *error) {
|
||||||
if *err != nil {
|
if *err != nil {
|
||||||
klog.Errorf("Failed to calculate cluster available replicas: %v", *err)
|
klog.Errorf("Failed to calculate cluster available replicas: %v", *err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
klog.Infof("Finish calculating cluster available replicas, max replicas: %d, time elapsed: %s", (*response).MaxReplicas, time.Since(start))
|
klog.Infof("Finish calculating cluster available replicas of resource(%s), max replicas: %d, time elapsed: %s", object, (*response).MaxReplicas, time.Since(start))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -346,8 +346,10 @@ func (g *genericScheduler) calAvailableReplicas(clusters []*clusterv1alpha1.Clus
|
||||||
|
|
||||||
// Get the minimum value of MaxAvailableReplicas in terms of all estimators.
|
// Get the minimum value of MaxAvailableReplicas in terms of all estimators.
|
||||||
estimators := estimatorclient.GetReplicaEstimators()
|
estimators := estimatorclient.GetReplicaEstimators()
|
||||||
|
ctx := context.WithValue(context.TODO(), util.ContextKeyObject,
|
||||||
|
fmt.Sprintf("kind=%s, name=%s/%s", spec.Resource.Kind, spec.Resource.Namespace, spec.Resource.Name))
|
||||||
for _, estimator := range estimators {
|
for _, estimator := range estimators {
|
||||||
res, err := estimator.MaxAvailableReplicas(clusters, spec.ReplicaRequirements)
|
res, err := estimator.MaxAvailableReplicas(ctx, clusters, spec.ReplicaRequirements)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Max cluster available replicas error: %v", err)
|
klog.Errorf("Max cluster available replicas error: %v", err)
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -117,3 +117,11 @@ const (
|
||||||
// NamespaceKarmadaSystem is the karmada system namespace.
|
// NamespaceKarmadaSystem is the karmada system namespace.
|
||||||
NamespaceKarmadaSystem = "karmada-system"
|
NamespaceKarmadaSystem = "karmada-system"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ContextKey is the key of context.
|
||||||
|
type ContextKey string
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ContextKeyObject is the context value key of a resource.
|
||||||
|
ContextKeyObject ContextKey = "object"
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue