feat: seacher return multiple scheduler clusters (#1175)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-03-22 20:45:36 +08:00
parent 7446cf45fa
commit 9666b89c9a
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
5 changed files with 314 additions and 103 deletions

View File

@ -410,9 +410,9 @@ func (s *Server) ListSchedulers(ctx context.Context, req *manager.ListSchedulers
return nil, status.Error(codes.Unknown, err.Error())
}
// Search optimal scheduler cluster
// Search optimal scheduler clusters
log.Infof("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(schedulerClusters), req.HostInfo)
schedulerCluster, err := s.searcher.FindSchedulerCluster(ctx, schedulerClusters, req)
schedulerClusters, err := s.searcher.FindSchedulerClusters(ctx, schedulerClusters, req)
if err != nil {
log.Errorf("can not matching scheduler cluster %v", err)
return nil, status.Error(codes.NotFound, "scheduler cluster not found")
@ -420,11 +420,10 @@ func (s *Server) ListSchedulers(ctx context.Context, req *manager.ListSchedulers
log.Infof("find matching scheduler cluster %v", getSchedulerClusterNames(schedulerClusters))
schedulers := []model.Scheduler{}
if err := s.db.WithContext(ctx).Find(&schedulers, &model.Scheduler{
State: model.SchedulerStateActive,
SchedulerClusterID: schedulerCluster.ID,
}).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
for _, schedulerCluster := range schedulerClusters {
for _, scheduler := range schedulerCluster.Schedulers {
schedulers = append(schedulers, scheduler)
}
}
for _, scheduler := range schedulers {

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"strings"
"github.com/mitchellh/mapstructure"
@ -45,14 +46,17 @@ const (
)
const (
// SecurityDomain affinity weight
securityDomainAffinityWeight float64 = 0.4
// IDC affinity weight
idcAffinityWeight float64 = 0.5
idcAffinityWeight float64 = 0.3
// NetTopology affinity weight
netTopologyAffinityWeight = 0.3
netTopologyAffinityWeight = 0.2
// Location affinity weight
locationAffinityWeight = 0.2
locationAffinityWeight = 0.1
)
const (
@ -68,6 +72,7 @@ const (
maxElementLen = 5
)
// Scheduler cluster scopes
type Scopes struct {
IDC string `mapstructure:"idc"`
Location string `mapstructure:"location"`
@ -75,7 +80,8 @@ type Scopes struct {
}
type Searcher interface {
FindSchedulerCluster(context.Context, []model.SchedulerCluster, *manager.ListSchedulersRequest) (model.SchedulerCluster, error)
// FindSchedulerClusters finds scheduler clusters that best matches the evaluation
FindSchedulerClusters(context.Context, []model.SchedulerCluster, *manager.ListSchedulersRequest) ([]model.SchedulerCluster, error)
}
type searcher struct{}
@ -91,83 +97,113 @@ func New(pluginDir string) Searcher {
return s
}
func (s *searcher) FindSchedulerCluster(ctx context.Context, schedulerClusters []model.SchedulerCluster, client *manager.ListSchedulersRequest) (model.SchedulerCluster, error) {
// FindSchedulerClusters finds scheduler clusters that best matches the evaluation
func (s *searcher) FindSchedulerClusters(ctx context.Context, schedulerClusters []model.SchedulerCluster, client *manager.ListSchedulersRequest) ([]model.SchedulerCluster, error) {
conditions := client.HostInfo
if len(conditions) <= 0 {
return model.SchedulerCluster{}, errors.New("empty conditions")
return nil, errors.New("empty conditions")
}
if len(schedulerClusters) <= 0 {
return model.SchedulerCluster{}, errors.New("empty scheduler clusters")
return nil, errors.New("empty scheduler clusters")
}
// If there are security domain conditions, match clusters of the same security domain.
// If the security domain condition does not exist, it will match all scheduler security domains.
// Then use clusters sets to score according to scopes.
clusters := FilterSchedulerClusters(conditions, schedulerClusters)
if len(clusters) == 0 {
return nil, fmt.Errorf("security domain %s does not match any scheduler cluster", conditions[ConditionSecurityDomain])
}
sort.Slice(
clusters,
func(i, j int) bool {
var si, sj Scopes
if err := mapstructure.Decode(clusters[i].Scopes, &si); err != nil {
logger.Errorf("cluster %s decode scopes failed: %v", clusters[i].Name, err)
return false
}
if err := mapstructure.Decode(clusters[j].Scopes, &sj); err != nil {
logger.Errorf("cluster %s decode scopes failed: %v", clusters[i].Name, err)
return false
}
return Evaluate(conditions, si, clusters[i].SecurityGroup.SecurityRules) > Evaluate(conditions, sj, clusters[j].SecurityGroup.SecurityRules)
},
)
return clusters, nil
}
// Filter the scheduler clusters that dfdaemon can be used
func FilterSchedulerClusters(conditions map[string]string, schedulerClusters []model.SchedulerCluster) []model.SchedulerCluster {
var clusters []model.SchedulerCluster
securityDomain := conditions[ConditionSecurityDomain]
if securityDomain == "" {
logger.Infof("dfdaemon %s %s have empty security domain", client.HostName, client.Ip)
}
for _, schedulerCluster := range schedulerClusters {
if len(schedulerCluster.Schedulers) > 0 {
if securityDomain == "" {
clusters = append(clusters, schedulerCluster)
} else {
for _, securityRule := range schedulerCluster.SecurityGroup.SecurityRules {
if strings.Compare(securityRule.Domain, securityDomain) == 0 {
clusters = append(clusters, schedulerCluster)
}
}
}
}
}
switch len(clusters) {
case 0:
// If the security domain does not match, there is no cluster available
return model.SchedulerCluster{}, fmt.Errorf("security domain %s does not match", securityDomain)
case 1:
// If only one cluster matches the security domain, return the cluster directly
return clusters[0], nil
default:
// If there are multiple clusters matching the security domain,
// select the schuelder cluster with a higher score
var maxScore float64 = 0
result := clusters[0]
for _, cluster := range clusters {
var scopes Scopes
if err := mapstructure.Decode(cluster.Scopes, &scopes); err != nil {
logger.Infof("cluster %s decode scopes failed: %v", cluster.Name, err)
// Scopes parse failed to skip this evaluation
// There are no active schedulers in the scheduler cluster
if len(schedulerCluster.Schedulers) == 0 {
continue
}
score := Evaluate(conditions, scopes)
if score > maxScore {
maxScore = score
result = cluster
// Dfdaemon security_domain does not exist, matching all scheduler clusters
if securityDomain == "" {
clusters = append(clusters, schedulerCluster)
continue
}
// Scheduler cluster is default, matching all dfdaemons
if schedulerCluster.IsDefault {
clusters = append(clusters, schedulerCluster)
continue
}
// Scheduler cluster SecurityRules does not exist, matching all dfdaemons
if len(schedulerCluster.SecurityGroup.SecurityRules) == 0 {
clusters = append(clusters, schedulerCluster)
continue
}
// If security_domain exists for dfdaemon and
// scheduler cluster SecurityRules also exists,
// then security_domain and SecurityRules are equal to match.
for _, securityRule := range schedulerCluster.SecurityGroup.SecurityRules {
if securityRule.Domain == securityDomain {
clusters = append(clusters, schedulerCluster)
}
}
return result, nil
}
return clusters
}
// Evaluate the degree of matching between scheduler cluster and dfdaemon
func Evaluate(conditions map[string]string, scopes Scopes) float64 {
return idcAffinityWeight*calculateIDCAffinityScore(conditions[ConditionIDC], scopes.IDC) +
func Evaluate(conditions map[string]string, scopes Scopes, securityRules []model.SecurityRule) float64 {
return securityDomainAffinityWeight*calculateSecurityDomainAffinityScore(conditions[ConditionSecurityDomain], securityRules) +
idcAffinityWeight*calculateIDCAffinityScore(conditions[ConditionIDC], scopes.IDC) +
locationAffinityWeight*calculateMultiElementAffinityScore(conditions[ConditionLocation], scopes.Location) +
netTopologyAffinityWeight*calculateMultiElementAffinityScore(conditions[ConditionNetTopology], scopes.NetTopology)
}
// calculateSecurityDomainAffinityScore 0.0~1.0 larger and better
func calculateSecurityDomainAffinityScore(securityDomain string, securityRules []model.SecurityRule) float64 {
if securityDomain == "" {
return minScore
}
if len(securityRules) == 0 {
return minScore
}
return maxScore
}
// calculateIDCAffinityScore 0.0~1.0 larger and better
func calculateIDCAffinityScore(dst, src string) float64 {
if dst == "" || src == "" {
return minScore
}
if strings.Compare(dst, src) == 0 {
if dst == src {
return maxScore
}
@ -176,7 +212,7 @@ func calculateIDCAffinityScore(dst, src string) float64 {
// it gets the max score of idc.
srcElements := strings.Split(src, "|")
for _, srcElement := range srcElements {
if strings.Compare(dst, srcElement) == 0 {
if dst == srcElement {
return maxScore
}
}
@ -190,7 +226,7 @@ func calculateMultiElementAffinityScore(dst, src string) float64 {
return minScore
}
if strings.Compare(dst, src) == 0 {
if dst == src {
return maxScore
}
@ -206,7 +242,7 @@ func calculateMultiElementAffinityScore(dst, src string) float64 {
}
for i := 0; i < elementLen; i++ {
if strings.Compare(dstElements[i], srcElements[i]) != 0 {
if dstElements[i] != srcElements[i] {
break
}
score++

View File

@ -32,13 +32,13 @@ func TestSchedulerCluster(t *testing.T) {
name string
schedulerClusters []model.SchedulerCluster
conditions map[string]string
expect func(t *testing.T, data model.SchedulerCluster, err error)
expect func(t *testing.T, data []model.SchedulerCluster, err error)
}{
{
name: "conditions is empty",
schedulerClusters: []model.SchedulerCluster{{Name: "foo"}},
conditions: map[string]string{},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.EqualError(err, "empty conditions")
},
@ -47,7 +47,7 @@ func TestSchedulerCluster(t *testing.T) {
name: "scheduler clusters is empty",
schedulerClusters: []model.SchedulerCluster{},
conditions: map[string]string{"location": "foo"},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.EqualError(err, "empty scheduler clusters")
},
@ -76,20 +76,20 @@ func TestSchedulerCluster(t *testing.T) {
},
},
conditions: map[string]string{"security_domain": "domain-1"},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.EqualError(err, "security domain domain-1 does not match")
assert.EqualError(err, "security domain domain-1 does not match any scheduler cluster")
},
},
{
name: "match according to security_domain condition",
name: "scheduler clusters have default cluster",
schedulerClusters: []model.SchedulerCluster{
{
Name: "foo",
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
{
Domain: "domain-1",
Domain: "domain-2",
},
},
},
@ -102,13 +102,109 @@ func TestSchedulerCluster(t *testing.T) {
},
{
Name: "bar",
Schedulers: []model.Scheduler{
{
HostName: "bar",
State: "active",
},
},
IsDefault: true,
},
},
conditions: map[string]string{"security_domain": "domain-1"},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.NoError(err)
assert.Equal(data[0].Name, "bar")
assert.Equal(len(data), 1)
},
},
{
name: "scheduler cluster SecurityRules does not exist",
schedulerClusters: []model.SchedulerCluster{
{
Name: "foo",
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
{
Domain: "domain-2",
},
},
},
Schedulers: []model.Scheduler{
{
HostName: "foo",
State: "active",
},
},
},
{
Name: "bar",
Schedulers: []model.Scheduler{
{
HostName: "bar",
State: "active",
},
},
},
},
conditions: map[string]string{"security_domain": "domain-1"},
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data[0].Name, "bar")
assert.Equal(len(data), 1)
},
},
{
name: "match according to security_domain condition",
schedulerClusters: []model.SchedulerCluster{
{
Name: "foo",
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
{
Domain: "domain-2",
},
},
},
Schedulers: []model.Scheduler{
{
HostName: "foo",
State: "active",
},
},
},
{
Name: "bar",
Schedulers: []model.Scheduler{
{
HostName: "bar",
State: "active",
},
},
},
{
Name: "baz",
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
{
Domain: "domain-1",
},
},
},
Schedulers: []model.Scheduler{
{
HostName: "baz",
State: "active",
},
},
},
},
conditions: map[string]string{"security_domain": "domain-1"},
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data[0].Name, "baz")
assert.Equal(data[1].Name, "bar")
assert.Equal(len(data), 2)
},
},
{
@ -137,10 +233,10 @@ func TestSchedulerCluster(t *testing.T) {
},
},
conditions: map[string]string{"location": "location-1"},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.NoError(err)
assert.Equal(data[0].Name, "foo")
assert.Equal(data[1].Name, "bar")
},
},
{
@ -172,10 +268,11 @@ func TestSchedulerCluster(t *testing.T) {
},
},
conditions: map[string]string{"idc": "idc-2"},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "bar")
assert.NoError(err)
assert.Equal(data[0].Name, "bar")
assert.Equal(data[1].Name, "foo")
assert.Equal(len(data), 2)
},
},
{
@ -204,10 +301,11 @@ func TestSchedulerCluster(t *testing.T) {
},
},
conditions: map[string]string{"net_topology": "net-topology-1"},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.NoError(err)
assert.Equal(data[0].Name, "foo")
assert.Equal(data[1].Name, "bar")
assert.Equal(len(data), 2)
},
},
{
@ -240,10 +338,11 @@ func TestSchedulerCluster(t *testing.T) {
"location": "location-1",
"idc": "idc-1",
},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.NoError(err)
assert.Equal(data[0].Name, "foo")
assert.Equal(data[1].Name, "bar")
assert.Equal(len(data), 2)
},
},
{
@ -282,10 +381,11 @@ func TestSchedulerCluster(t *testing.T) {
"security_domain": "domain-1",
"location": "location-1",
},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.NoError(err)
assert.Equal(data[0].Name, "foo")
assert.Equal(data[1].Name, "bar")
assert.Equal(len(data), 2)
},
},
{
@ -324,10 +424,11 @@ func TestSchedulerCluster(t *testing.T) {
"security_domain": "domain-1",
"idc": "idc-1",
},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.NoError(err)
assert.Equal(data[0].Name, "foo")
assert.Equal(data[1].Name, "bar")
assert.Equal(len(data), 2)
},
},
{
@ -357,7 +458,7 @@ func TestSchedulerCluster(t *testing.T) {
Name: "bar",
Scopes: map[string]interface{}{
"idc": "idc-2",
"location": "location-1",
"location": "location-1|location-2",
},
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
@ -373,16 +474,37 @@ func TestSchedulerCluster(t *testing.T) {
},
},
},
{
Name: "baz",
Scopes: map[string]interface{}{
"idc": "idc-2",
"location": "location-1",
},
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
{
Domain: "domain-2",
},
},
},
Schedulers: []model.Scheduler{
{
HostName: "baz",
State: "active",
},
},
},
},
conditions: map[string]string{
"security_domain": "domain-1",
"idc": "idc-1|idc-2",
"location": "location-1|location-2",
},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.NoError(err)
assert.Equal(data[0].Name, "bar")
assert.Equal(data[1].Name, "foo")
assert.Equal(len(data), 2)
},
},
{
@ -428,16 +550,70 @@ func TestSchedulerCluster(t *testing.T) {
},
},
},
{
Name: "baz",
Scopes: map[string]interface{}{
"idc": "idc-1",
"location": "location-1|location-2",
"net_topology": "net_topology-1",
},
Schedulers: []model.Scheduler{
{
HostName: "baz",
State: "active",
},
},
},
{
Name: "bax",
Scopes: map[string]interface{}{
"idc": "idc-1",
"location": "location-2",
"net_topology": "net_topology-1|net_topology-2",
},
Schedulers: []model.Scheduler{
{
HostName: "bax",
State: "active",
},
},
IsDefault: true,
},
{
Name: "bac",
Scopes: map[string]interface{}{
"idc": "idc-1",
"location": "location-2",
"net_topology": "net_topology-1|net_topology-2",
},
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
{
Domain: "domain-2",
},
},
},
Schedulers: []model.Scheduler{
{
HostName: "bac",
State: "active",
},
},
},
},
conditions: map[string]string{
"security_domain": "domain-1",
"idc": "idc-1|idc-2",
"location": "location-1|location-2",
"net_topology": "net_topology-1|net_topology-1",
},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "bar")
assert.NoError(err)
assert.Equal(data[0].Name, "bar")
assert.Equal(data[1].Name, "foo")
assert.Equal(data[2].Name, "baz")
assert.Equal(data[3].Name, "bax")
assert.Equal(len(data), 4)
},
},
}
@ -445,7 +621,7 @@ func TestSchedulerCluster(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
searcher := New(pluginDir)
clusters, ok := searcher.FindSchedulerCluster(context.Background(), tc.schedulerClusters, &manager.ListSchedulersRequest{
clusters, ok := searcher.FindSchedulerClusters(context.Background(), tc.schedulerClusters, &manager.ListSchedulersRequest{
HostName: "foo",
Ip: "127.0.0.1",
HostInfo: tc.conditions,

View File

@ -33,13 +33,13 @@ func main() {
os.Exit(1)
}
cluster, err := s.FindSchedulerCluster(context.Background(), []model.SchedulerCluster{}, &manager.ListSchedulersRequest{})
clusters, err := s.FindSchedulerClusters(context.Background(), []model.SchedulerCluster{}, &manager.ListSchedulersRequest{})
if err != nil {
fmt.Println("scheduler cluster not found")
os.Exit(1)
}
if cluster.Name != "foo" {
if clusters[0].Name != "foo" {
fmt.Println("scheduler cluster name wrong")
os.Exit(1)
}

View File

@ -25,8 +25,8 @@ import (
type searcher struct{}
func (s *searcher) FindSchedulerCluster(ctx context.Context, schedulerClusters []model.SchedulerCluster, client *manager.ListSchedulersRequest) (model.SchedulerCluster, error) {
return model.SchedulerCluster{Name: "foo"}, nil
func (s *searcher) FindSchedulerClusters(ctx context.Context, schedulerClusters []model.SchedulerCluster, client *manager.ListSchedulersRequest) ([]model.SchedulerCluster, error) {
return []model.SchedulerCluster{{Name: "foo"}}, nil
}
func DragonflyPluginInit(option map[string]string) (interface{}, map[string]string, error) {