feat: optimize implement of the sync peers (#3677)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
d0e41b538c
commit
aa78396155
|
|
@ -31,19 +31,19 @@ jobs:
|
||||||
include:
|
include:
|
||||||
- module: manager
|
- module: manager
|
||||||
image: manager
|
image: manager
|
||||||
image-tag: v2.1.60
|
image-tag: v2.1.65
|
||||||
chart-name: manager
|
chart-name: manager
|
||||||
- module: scheduler
|
- module: scheduler
|
||||||
image: scheduler
|
image: scheduler
|
||||||
image-tag: v2.1.60
|
image-tag: v2.1.65
|
||||||
chart-name: scheduler
|
chart-name: scheduler
|
||||||
- module: client
|
- module: client
|
||||||
image: client
|
image: client
|
||||||
image-tag: v0.1.115
|
image-tag: v0.1.119
|
||||||
chart-name: client
|
chart-name: client
|
||||||
- module: seed-client
|
- module: seed-client
|
||||||
image: client
|
image: client
|
||||||
image-tag: v0.1.115
|
image-tag: v0.1.119
|
||||||
chart-name: seed-client
|
chart-name: seed-client
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
|
|
|
||||||
|
|
@ -4202,28 +4202,34 @@ const docTemplate = `{
|
||||||
],
|
],
|
||||||
"properties": {
|
"properties": {
|
||||||
"args": {
|
"args": {
|
||||||
|
"description": "Args is the arguments of the job.",
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"additionalProperties": {}
|
"additionalProperties": {}
|
||||||
},
|
},
|
||||||
"bio": {
|
"bio": {
|
||||||
|
"description": "BIO is the description of the job.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"scheduler_cluster_ids": {
|
"scheduler_cluster_ids": {
|
||||||
|
"description": "SchedulerClusterIDs is the scheduler cluster ids of the job.",
|
||||||
"type": "array",
|
"type": "array",
|
||||||
"items": {
|
"items": {
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"seed_peer_cluster_ids": {
|
"seed_peer_cluster_ids": {
|
||||||
|
"description": "SeedPeerClusterIDs is the seed peer cluster ids of the job.",
|
||||||
"type": "array",
|
"type": "array",
|
||||||
"items": {
|
"items": {
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"type": {
|
"type": {
|
||||||
|
"description": "Type is the type of the job.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"user_id": {
|
"user_id": {
|
||||||
|
"description": "UserID is the user id of the job.",
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -4682,6 +4688,11 @@ const docTemplate = `{
|
||||||
"type": "integer",
|
"type": "integer",
|
||||||
"maximum": 1000,
|
"maximum": 1000,
|
||||||
"minimum": 10
|
"minimum": 10
|
||||||
|
},
|
||||||
|
"job_rate_limit": {
|
||||||
|
"type": "integer",
|
||||||
|
"maximum": 1000000,
|
||||||
|
"minimum": 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
@ -4881,9 +4892,11 @@ const docTemplate = `{
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
"bio": {
|
"bio": {
|
||||||
|
"description": "BIO is the description of the job.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"user_id": {
|
"user_id": {
|
||||||
|
"description": "UserID is the user id of the job.",
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4196,28 +4196,34 @@
|
||||||
],
|
],
|
||||||
"properties": {
|
"properties": {
|
||||||
"args": {
|
"args": {
|
||||||
|
"description": "Args is the arguments of the job.",
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"additionalProperties": {}
|
"additionalProperties": {}
|
||||||
},
|
},
|
||||||
"bio": {
|
"bio": {
|
||||||
|
"description": "BIO is the description of the job.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"scheduler_cluster_ids": {
|
"scheduler_cluster_ids": {
|
||||||
|
"description": "SchedulerClusterIDs is the scheduler cluster ids of the job.",
|
||||||
"type": "array",
|
"type": "array",
|
||||||
"items": {
|
"items": {
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"seed_peer_cluster_ids": {
|
"seed_peer_cluster_ids": {
|
||||||
|
"description": "SeedPeerClusterIDs is the seed peer cluster ids of the job.",
|
||||||
"type": "array",
|
"type": "array",
|
||||||
"items": {
|
"items": {
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"type": {
|
"type": {
|
||||||
|
"description": "Type is the type of the job.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"user_id": {
|
"user_id": {
|
||||||
|
"description": "UserID is the user id of the job.",
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -4676,6 +4682,11 @@
|
||||||
"type": "integer",
|
"type": "integer",
|
||||||
"maximum": 1000,
|
"maximum": 1000,
|
||||||
"minimum": 10
|
"minimum": 10
|
||||||
|
},
|
||||||
|
"job_rate_limit": {
|
||||||
|
"type": "integer",
|
||||||
|
"maximum": 1000000,
|
||||||
|
"minimum": 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
@ -4875,9 +4886,11 @@
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
"bio": {
|
"bio": {
|
||||||
|
"description": "BIO is the description of the job.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"user_id": {
|
"user_id": {
|
||||||
|
"description": "UserID is the user id of the job.",
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -461,20 +461,26 @@ definitions:
|
||||||
properties:
|
properties:
|
||||||
args:
|
args:
|
||||||
additionalProperties: {}
|
additionalProperties: {}
|
||||||
|
description: Args is the arguments of the job.
|
||||||
type: object
|
type: object
|
||||||
bio:
|
bio:
|
||||||
|
description: BIO is the description of the job.
|
||||||
type: string
|
type: string
|
||||||
scheduler_cluster_ids:
|
scheduler_cluster_ids:
|
||||||
|
description: SchedulerClusterIDs is the scheduler cluster ids of the job.
|
||||||
items:
|
items:
|
||||||
type: integer
|
type: integer
|
||||||
type: array
|
type: array
|
||||||
seed_peer_cluster_ids:
|
seed_peer_cluster_ids:
|
||||||
|
description: SeedPeerClusterIDs is the seed peer cluster ids of the job.
|
||||||
items:
|
items:
|
||||||
type: integer
|
type: integer
|
||||||
type: array
|
type: array
|
||||||
type:
|
type:
|
||||||
|
description: Type is the type of the job.
|
||||||
type: string
|
type: string
|
||||||
user_id:
|
user_id:
|
||||||
|
description: UserID is the user id of the job.
|
||||||
type: integer
|
type: integer
|
||||||
required:
|
required:
|
||||||
- type
|
- type
|
||||||
|
|
@ -792,6 +798,10 @@ definitions:
|
||||||
maximum: 1000
|
maximum: 1000
|
||||||
minimum: 10
|
minimum: 10
|
||||||
type: integer
|
type: integer
|
||||||
|
job_rate_limit:
|
||||||
|
maximum: 1000000
|
||||||
|
minimum: 1
|
||||||
|
type: integer
|
||||||
type: object
|
type: object
|
||||||
d7y_io_dragonfly_v2_manager_types.SchedulerClusterScopes:
|
d7y_io_dragonfly_v2_manager_types.SchedulerClusterScopes:
|
||||||
properties:
|
properties:
|
||||||
|
|
@ -925,8 +935,10 @@ definitions:
|
||||||
d7y_io_dragonfly_v2_manager_types.UpdateJobRequest:
|
d7y_io_dragonfly_v2_manager_types.UpdateJobRequest:
|
||||||
properties:
|
properties:
|
||||||
bio:
|
bio:
|
||||||
|
description: BIO is the description of the job.
|
||||||
type: string
|
type: string
|
||||||
user_id:
|
user_id:
|
||||||
|
description: UserID is the user id of the job.
|
||||||
type: integer
|
type: integer
|
||||||
type: object
|
type: object
|
||||||
d7y_io_dragonfly_v2_manager_types.UpdateOauthRequest:
|
d7y_io_dragonfly_v2_manager_types.UpdateOauthRequest:
|
||||||
|
|
|
||||||
|
|
@ -1 +1 @@
|
||||||
Subproject commit ee21989120b16db4e8456e7d163f9ba06aad98c9
|
Subproject commit 9fd770ffab34893da27ab40c6febe1125d6c077c
|
||||||
2
go.mod
2
go.mod
|
|
@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
|
||||||
go 1.23.0
|
go 1.23.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
d7y.io/api/v2 v2.0.175
|
d7y.io/api/v2 v2.0.177
|
||||||
github.com/MysteriousPotato/go-lockable v1.0.0
|
github.com/MysteriousPotato/go-lockable v1.0.0
|
||||||
github.com/RichardKnop/machinery v1.10.8
|
github.com/RichardKnop/machinery v1.10.8
|
||||||
github.com/Showmax/go-fqdn v1.0.0
|
github.com/Showmax/go-fqdn v1.0.0
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
|
||||||
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
|
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
|
||||||
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
||||||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||||
d7y.io/api/v2 v2.0.175 h1:yE1FeYnBEK/geHmDJbqXB0pUXtPBtqk9E7xijIVh0AA=
|
d7y.io/api/v2 v2.0.177 h1:iC+Jm4n7lKs3N1JIO25XOdtELbKSlis85LEoGbYlp98=
|
||||||
d7y.io/api/v2 v2.0.175/go.mod h1:+l4ErhthKmcIhcRU6F01Km8q+yDyICF7JImefg0t6HY=
|
d7y.io/api/v2 v2.0.177/go.mod h1:+l4ErhthKmcIhcRU6F01Km8q+yDyICF7JImefg0t6HY=
|
||||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||||
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||||
|
|
|
||||||
|
|
@ -339,7 +339,7 @@ type SyncPeersConfig struct {
|
||||||
// Timeout is the timeout for syncing peers information from the single scheduler.
|
// Timeout is the timeout for syncing peers information from the single scheduler.
|
||||||
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
|
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
|
||||||
|
|
||||||
// BatchSize is the batch size when operating gorm.
|
// BatchSize is the batch size when operating gorm database.
|
||||||
BatchSize int `yaml:"batchSize" mapstructure:"batchSize"`
|
BatchSize int `yaml:"batchSize" mapstructure:"batchSize"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -641,6 +641,10 @@ func (cfg *Config) Validate() error {
|
||||||
return errors.New("syncPeers requires parameter timeout")
|
return errors.New("syncPeers requires parameter timeout")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cfg.Job.SyncPeers.BatchSize == 0 {
|
||||||
|
return errors.New("syncPeers requires parameter batchSize")
|
||||||
|
}
|
||||||
|
|
||||||
if cfg.ObjectStorage.Enable {
|
if cfg.ObjectStorage.Enable {
|
||||||
if cfg.ObjectStorage.Name == "" {
|
if cfg.ObjectStorage.Name == "" {
|
||||||
return errors.New("objectStorage requires parameter name")
|
return errors.New("objectStorage requires parameter name")
|
||||||
|
|
|
||||||
|
|
@ -188,8 +188,9 @@ func TestConfig_Load(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
SyncPeers: SyncPeersConfig{
|
SyncPeers: SyncPeersConfig{
|
||||||
Interval: 13 * time.Hour,
|
Interval: 13 * time.Hour,
|
||||||
Timeout: 2 * time.Minute,
|
Timeout: 2 * time.Minute,
|
||||||
|
BatchSize: 50,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
ObjectStorage: ObjectStorageConfig{
|
ObjectStorage: ObjectStorageConfig{
|
||||||
|
|
@ -809,6 +810,21 @@ func TestConfig_Validate(t *testing.T) {
|
||||||
assert.EqualError(err, "syncPeers requires parameter timeout")
|
assert.EqualError(err, "syncPeers requires parameter timeout")
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "syncPeers requires parameter batchSize",
|
||||||
|
config: New(),
|
||||||
|
mock: func(cfg *Config) {
|
||||||
|
cfg.Auth.JWT = mockJWTConfig
|
||||||
|
cfg.Database.Type = DatabaseTypeMysql
|
||||||
|
cfg.Database.Mysql = mockMysqlConfig
|
||||||
|
cfg.Database.Redis = mockRedisConfig
|
||||||
|
cfg.Job.SyncPeers.BatchSize = 0
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, err error) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.EqualError(err, "syncPeers requires parameter batchSize")
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "objectStorage requires parameter name",
|
name: "objectStorage requires parameter name",
|
||||||
config: New(),
|
config: New(),
|
||||||
|
|
|
||||||
|
|
@ -108,7 +108,8 @@ const (
|
||||||
// DefaultClusterJobRateLimit is default rate limit(requests per second) for job Open API by cluster.
|
// DefaultClusterJobRateLimit is default rate limit(requests per second) for job Open API by cluster.
|
||||||
DefaultClusterJobRateLimit = 10
|
DefaultClusterJobRateLimit = 10
|
||||||
|
|
||||||
// DefaultJobSyncPeersBatchSize is the default batch size for syncing all peers information from the scheduler.
|
// DefaultJobSyncPeersBatchSize is the default batch size for syncing all peers information from the scheduler and
|
||||||
|
// operating on the database.
|
||||||
DefaultJobSyncPeersBatchSize = 500
|
DefaultJobSyncPeersBatchSize = 500
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -79,6 +79,7 @@ job:
|
||||||
syncPeers:
|
syncPeers:
|
||||||
interval: 13h
|
interval: 13h
|
||||||
timeout: 2m
|
timeout: 2m
|
||||||
|
batchSize: 50
|
||||||
|
|
||||||
objectStorage:
|
objectStorage:
|
||||||
enable: true
|
enable: true
|
||||||
|
|
|
||||||
|
|
@ -70,13 +70,15 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
job, err := h.service.CreateSyncPeersJob(ctx.Request.Context(), json)
|
// CreateSyncPeersJob is a sync operation, so don't need to return the job id,
|
||||||
if err != nil {
|
// and not record the job information in the database. If return success, need to
|
||||||
|
// query the peers table to get the latest data.
|
||||||
|
if err := h.service.CreateSyncPeersJob(ctx.Request.Context(), json); err != nil {
|
||||||
ctx.Error(err) // nolint: errcheck
|
ctx.Error(err) // nolint: errcheck
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.JSON(http.StatusOK, job)
|
ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK))
|
||||||
case job.GetTaskJob:
|
case job.GetTaskJob:
|
||||||
var json types.CreateGetTaskJobRequest
|
var json types.CreateGetTaskJobRequest
|
||||||
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
|
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultTaskPollingInterval is the default interval for polling task.
|
// DefaultTaskPollingInterval is the default interval for polling task.
|
||||||
const DefaultTaskPollingInterval = 5 * time.Second
|
const DefaultTaskPollingInterval = 10 * time.Second
|
||||||
|
|
||||||
// tracer is a global tracer for job.
|
// tracer is a global tracer for job.
|
||||||
var tracer = otel.Tracer("manager")
|
var tracer = otel.Tracer("manager")
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ import (
|
||||||
context "context"
|
context "context"
|
||||||
reflect "reflect"
|
reflect "reflect"
|
||||||
|
|
||||||
job "d7y.io/dragonfly/v2/manager/job"
|
models "d7y.io/dragonfly/v2/manager/models"
|
||||||
gomock "go.uber.org/mock/gomock"
|
gomock "go.uber.org/mock/gomock"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -41,18 +41,18 @@ func (m *MockSyncPeers) EXPECT() *MockSyncPeersMockRecorder {
|
||||||
return m.recorder
|
return m.recorder
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run mocks base method.
|
// CreateSyncPeers mocks base method.
|
||||||
func (m *MockSyncPeers) Run(arg0 context.Context, arg1 job.SyncPeersArgs) error {
|
func (m *MockSyncPeers) CreateSyncPeers(arg0 context.Context, arg1 []models.Scheduler) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "Run", arg0, arg1)
|
ret := m.ctrl.Call(m, "CreateSyncPeers", arg0, arg1)
|
||||||
ret0, _ := ret[0].(error)
|
ret0, _ := ret[0].(error)
|
||||||
return ret0
|
return ret0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run indicates an expected call of Run.
|
// CreateSyncPeers indicates an expected call of CreateSyncPeers.
|
||||||
func (mr *MockSyncPeersMockRecorder) Run(arg0, arg1 any) *gomock.Call {
|
func (mr *MockSyncPeersMockRecorder) CreateSyncPeers(arg0, arg1 any) *gomock.Call {
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSyncPeers)(nil).Run), arg0, arg1)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSyncPeers", reflect.TypeOf((*MockSyncPeers)(nil).CreateSyncPeers), arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serve mocks base method.
|
// Serve mocks base method.
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ package job
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -28,22 +29,22 @@ import (
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"gorm.io/gorm/clause"
|
|
||||||
|
|
||||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||||
internaljob "d7y.io/dragonfly/v2/internal/job"
|
internaljob "d7y.io/dragonfly/v2/internal/job"
|
||||||
"d7y.io/dragonfly/v2/manager/config"
|
"d7y.io/dragonfly/v2/manager/config"
|
||||||
"d7y.io/dragonfly/v2/manager/models"
|
"d7y.io/dragonfly/v2/manager/models"
|
||||||
"d7y.io/dragonfly/v2/pkg/container/slice"
|
|
||||||
"d7y.io/dragonfly/v2/pkg/idgen"
|
"d7y.io/dragonfly/v2/pkg/idgen"
|
||||||
"d7y.io/dragonfly/v2/pkg/types"
|
pkgtypes "d7y.io/dragonfly/v2/pkg/types"
|
||||||
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
|
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SyncPeers is an interface for sync peers.
|
// SyncPeers is an interface for sync peers.
|
||||||
type SyncPeers interface {
|
type SyncPeers interface {
|
||||||
// Run execute action to sync peers, which is async.
|
// CreateSyncPeers creates sync peers job, and merge the sync peer results with the data
|
||||||
Run(context.Context, SyncPeersArgs) error
|
// in the peer table in the database. It is a synchronous operation, and it will returns
|
||||||
|
// an error if the sync peers job is failed.
|
||||||
|
CreateSyncPeers(context.Context, []models.Scheduler) error
|
||||||
|
|
||||||
// Serve started sync peers server.
|
// Serve started sync peers server.
|
||||||
Serve()
|
Serve()
|
||||||
|
|
@ -57,115 +58,32 @@ type syncPeers struct {
|
||||||
config *config.Config
|
config *config.Config
|
||||||
job *internaljob.Job
|
job *internaljob.Job
|
||||||
db *gorm.DB
|
db *gorm.DB
|
||||||
|
mu *sync.Mutex
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
|
|
||||||
syncLocker sync.Mutex
|
|
||||||
workChan chan SyncPeersArgs
|
|
||||||
}
|
|
||||||
|
|
||||||
type SyncPeersArgs struct {
|
|
||||||
CandidateSchedulerClusters []models.SchedulerCluster
|
|
||||||
TaskID string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newSyncPeers returns a new SyncPeers.
|
// newSyncPeers returns a new SyncPeers.
|
||||||
func newSyncPeers(cfg *config.Config, job *internaljob.Job, gdb *gorm.DB) (SyncPeers, error) {
|
func newSyncPeers(cfg *config.Config, job *internaljob.Job, gdb *gorm.DB) (SyncPeers, error) {
|
||||||
return &syncPeers{
|
return &syncPeers{
|
||||||
config: cfg,
|
config: cfg,
|
||||||
db: gdb,
|
job: job,
|
||||||
job: job,
|
db: gdb,
|
||||||
done: make(chan struct{}),
|
mu: &sync.Mutex{},
|
||||||
workChan: make(chan SyncPeersArgs, 10),
|
done: make(chan struct{}),
|
||||||
syncLocker: sync.Mutex{},
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run start to sync peers.
|
// CreateSyncPeers creates sync peers job.
|
||||||
func (s *syncPeers) Run(ctx context.Context, args SyncPeersArgs) error {
|
func (s *syncPeers) CreateSyncPeers(ctx context.Context, schedulers []models.Scheduler) error {
|
||||||
if len(args.CandidateSchedulerClusters) == 0 {
|
// Avoid running multiple sync peers jobs at the same time.
|
||||||
if err := s.db.WithContext(ctx).Find(&args.CandidateSchedulerClusters).Error; err != nil {
|
if !s.mu.TryLock() {
|
||||||
return fmt.Errorf("failed to get candidate scheduler clusters: %v", err)
|
return errors.New("sync peers job is running")
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
s.workChan <- args
|
// Send sync peer requests to all available schedulers, and merge the sync peer results
|
||||||
return nil
|
// with the data in the peer table in the database.
|
||||||
}
|
for _, scheduler := range schedulers {
|
||||||
|
|
||||||
// Serve started sync peers server.
|
|
||||||
func (s *syncPeers) Serve() {
|
|
||||||
ticker := time.NewTicker(s.config.Job.SyncPeers.Interval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
logger.Debugf("start to sync peers periodically")
|
|
||||||
if err := s.syncPeers(context.Background(), nil); err != nil {
|
|
||||||
logger.Errorf("sync peers failed periodically: %v", err)
|
|
||||||
}
|
|
||||||
case args := <-s.workChan:
|
|
||||||
logger.Debugf("start to sync peers for request")
|
|
||||||
err := s.syncPeers(context.Background(), args.CandidateSchedulerClusters)
|
|
||||||
if err != nil {
|
|
||||||
logger.Errorf("sync peers failed for request: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if args.TaskID != "" {
|
|
||||||
job := models.Job{}
|
|
||||||
state := machineryv1tasks.StateFailure
|
|
||||||
if err == nil {
|
|
||||||
state = machineryv1tasks.StateSuccess
|
|
||||||
}
|
|
||||||
if updateErr := s.db.WithContext(context.Background()).First(&job, "task_id = ?", args.TaskID).Updates(models.Job{
|
|
||||||
State: state,
|
|
||||||
}).Error; updateErr != nil {
|
|
||||||
logger.Errorf("update sync peers job result failed for request: %v", updateErr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case <-s.done:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop sync peers server.
|
|
||||||
func (s *syncPeers) Stop() {
|
|
||||||
close(s.done)
|
|
||||||
}
|
|
||||||
|
|
||||||
// syncPeers is the real working function in synchronous mode.
|
|
||||||
func (s *syncPeers) syncPeers(ctx context.Context, candidateSchedulerClusters []models.SchedulerCluster) error {
|
|
||||||
if !s.syncLocker.TryLock() {
|
|
||||||
return fmt.Errorf("another sync peers is already running")
|
|
||||||
}
|
|
||||||
defer s.syncLocker.Unlock()
|
|
||||||
|
|
||||||
if len(candidateSchedulerClusters) == 0 {
|
|
||||||
if err := s.db.WithContext(ctx).Find(&candidateSchedulerClusters).Error; err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find all of the schedulers that has active scheduler cluster.
|
|
||||||
var candidateSchedulers []models.Scheduler
|
|
||||||
for _, candidateSchedulerCluster := range candidateSchedulerClusters {
|
|
||||||
var scheduler models.Scheduler
|
|
||||||
if err := s.db.WithContext(ctx).Preload("SchedulerCluster").First(&scheduler, models.Scheduler{
|
|
||||||
SchedulerClusterID: candidateSchedulerCluster.ID,
|
|
||||||
State: models.SchedulerStateActive,
|
|
||||||
}).Error; err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Infof("sync peers find candidate scheduler cluster %s", candidateSchedulerCluster.Name)
|
|
||||||
candidateSchedulers = append(candidateSchedulers, scheduler)
|
|
||||||
}
|
|
||||||
logger.Infof("sync peers find candidate schedulers count is %d", len(candidateSchedulers))
|
|
||||||
|
|
||||||
// Send sync peer requests to all available schedulers,
|
|
||||||
// and merge the sync peer results with the data in
|
|
||||||
// the peer table in the database.
|
|
||||||
for _, scheduler := range candidateSchedulers {
|
|
||||||
log := logger.WithScheduler(scheduler.Hostname, scheduler.IP, uint64(scheduler.SchedulerClusterID))
|
log := logger.WithScheduler(scheduler.Hostname, scheduler.IP, uint64(scheduler.SchedulerClusterID))
|
||||||
|
|
||||||
// Send sync peer request to scheduler.
|
// Send sync peer request to scheduler.
|
||||||
|
|
@ -179,9 +97,57 @@ func (s *syncPeers) syncPeers(ctx context.Context, candidateSchedulerClusters []
|
||||||
// Merge sync peer results with the data in the peer table.
|
// Merge sync peer results with the data in the peer table.
|
||||||
s.mergePeers(ctx, scheduler, results, log)
|
s.mergePeers(ctx, scheduler, results, log)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Serve started sync peers server.
|
||||||
|
func (s *syncPeers) Serve() {
|
||||||
|
tick := time.NewTicker(s.config.Job.SyncPeers.Interval)
|
||||||
|
defer tick.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-tick.C:
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), s.config.Job.SyncPeers.Timeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Find all of the scheduler clusters that has active schedulers.
|
||||||
|
var schedulerClusters []models.SchedulerCluster
|
||||||
|
if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil {
|
||||||
|
logger.Errorf("sync peers find scheduler clusters failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find all of the schedulers that has active scheduler cluster.
|
||||||
|
var schedulers []models.Scheduler
|
||||||
|
for _, schedulerCluster := range schedulerClusters {
|
||||||
|
var scheduler models.Scheduler
|
||||||
|
if err := s.db.WithContext(ctx).Preload("SchedulerCluster").First(&scheduler, models.Scheduler{
|
||||||
|
SchedulerClusterID: schedulerCluster.ID,
|
||||||
|
State: models.SchedulerStateActive,
|
||||||
|
}).Error; err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Infof("sync peers find scheduler cluster %s", schedulerCluster.Name)
|
||||||
|
schedulers = append(schedulers, scheduler)
|
||||||
|
}
|
||||||
|
logger.Infof("sync peers find schedulers count is %d", len(schedulers))
|
||||||
|
|
||||||
|
if err := s.CreateSyncPeers(ctx, schedulers); err != nil {
|
||||||
|
logger.Errorf("sync peers failed: %v", err)
|
||||||
|
}
|
||||||
|
case <-s.done:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop sync peers server.
|
||||||
|
func (s *syncPeers) Stop() {
|
||||||
|
close(s.done)
|
||||||
|
}
|
||||||
|
|
||||||
// createSyncPeers creates sync peers.
|
// createSyncPeers creates sync peers.
|
||||||
func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Scheduler) ([]*resource.Host, error) {
|
func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Scheduler) ([]*resource.Host, error) {
|
||||||
var span trace.Span
|
var span trace.Span
|
||||||
|
|
@ -205,7 +171,7 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu
|
||||||
logger.Infof("create sync peers in queue %v, task: %#v", queue, task)
|
logger.Infof("create sync peers in queue %v, task: %#v", queue, task)
|
||||||
asyncResult, err := s.job.Server.SendTaskWithContext(ctx, task)
|
asyncResult, err := s.job.Server.SendTaskWithContext(ctx, task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("create sync peers in queue %v failed: %v", queue, err)
|
logger.Errorf("create sync peers in queue %v failed", queue, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -226,111 +192,92 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu
|
||||||
|
|
||||||
// Merge sync peer results with the data in the peer table.
|
// Merge sync peer results with the data in the peer table.
|
||||||
func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler, results []*resource.Host, log *logger.SugaredLoggerOnWith) {
|
func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler, results []*resource.Host, log *logger.SugaredLoggerOnWith) {
|
||||||
// Fetch existing peers from the database
|
// Convert sync peer results from slice to map.
|
||||||
var existingPeers []models.Peer
|
syncPeers := make(map[string]*resource.Host, len(results))
|
||||||
var count int64
|
for _, result := range results {
|
||||||
|
syncPeers[result.ID] = result
|
||||||
|
}
|
||||||
|
|
||||||
if err := s.db.Model(&models.Peer{}).
|
oldPeers := make([]*models.Peer, 0, s.config.Job.SyncPeers.BatchSize)
|
||||||
Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID).
|
if err := s.db.WithContext(ctx).Model(&models.Peer{}).Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID).FindInBatches(&oldPeers, s.config.Job.SyncPeers.BatchSize, func(tx *gorm.DB, batch int) error {
|
||||||
Count(&count).
|
peers := make([]*models.Peer, 0, s.config.Job.SyncPeers.BatchSize)
|
||||||
Error; err != nil {
|
for _, oldPeer := range oldPeers {
|
||||||
log.Error("failed to count existing peers: ", err)
|
// If the peer exists in the sync peer results, update the peer data in the database with
|
||||||
|
// the sync peer results and delete the sync peer from the sync peers map.
|
||||||
|
isSeedPeer := pkgtypes.ParseHostType(oldPeer.Type) != pkgtypes.HostTypeNormal
|
||||||
|
id := idgen.HostIDV2(oldPeer.IP, oldPeer.Hostname, isSeedPeer)
|
||||||
|
if syncPeer, ok := syncPeers[id]; ok {
|
||||||
|
peers = append(peers, &models.Peer{
|
||||||
|
Hostname: syncPeer.Hostname,
|
||||||
|
Type: syncPeer.Type.Name(),
|
||||||
|
IDC: syncPeer.Network.IDC,
|
||||||
|
Location: syncPeer.Network.Location,
|
||||||
|
IP: syncPeer.IP,
|
||||||
|
Port: syncPeer.Port,
|
||||||
|
DownloadPort: syncPeer.DownloadPort,
|
||||||
|
ObjectStoragePort: syncPeer.ObjectStoragePort,
|
||||||
|
State: models.PeerStateActive,
|
||||||
|
OS: syncPeer.OS,
|
||||||
|
Platform: syncPeer.Platform,
|
||||||
|
PlatformFamily: syncPeer.PlatformFamily,
|
||||||
|
PlatformVersion: syncPeer.PlatformVersion,
|
||||||
|
KernelVersion: syncPeer.KernelVersion,
|
||||||
|
GitVersion: syncPeer.Build.GitVersion,
|
||||||
|
GitCommit: syncPeer.Build.GitCommit,
|
||||||
|
BuildPlatform: syncPeer.Build.Platform,
|
||||||
|
SchedulerClusterID: uint(syncPeer.SchedulerClusterID),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Delete the sync peer from the sync peers map.
|
||||||
|
delete(syncPeers, id)
|
||||||
|
} else {
|
||||||
|
// If the peer does not exist in the sync peer results, delete the peer in the database.
|
||||||
|
if err := tx.Unscoped().Delete(&models.Peer{}, oldPeer.ID).Error; err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Avoid save empty slice.
|
||||||
|
if len(peers) > 0 {
|
||||||
|
tx.Save(&peers)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}).Error; err != nil {
|
||||||
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("total peers count: %d", count)
|
// Insert the sync peers that do not exist in the database into the peer table.
|
||||||
|
peers := make([]*models.Peer, 0, len(syncPeers))
|
||||||
pageSize := s.config.Job.SyncPeers.BatchSize
|
for _, syncPeer := range syncPeers {
|
||||||
totalPages := (count + int64(pageSize-1)) / int64(pageSize)
|
peers = append(peers, &models.Peer{
|
||||||
|
Hostname: syncPeer.Hostname,
|
||||||
for page := 1; page <= int(totalPages); page++ {
|
Type: syncPeer.Type.Name(),
|
||||||
var batchPeers []models.Peer
|
IDC: syncPeer.Network.IDC,
|
||||||
if err := s.db.Preload("SchedulerCluster").
|
Location: syncPeer.Network.Location,
|
||||||
Scopes(models.Paginate(page, pageSize)).
|
IP: syncPeer.IP,
|
||||||
Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID).
|
Port: syncPeer.Port,
|
||||||
Find(&batchPeers).
|
DownloadPort: syncPeer.DownloadPort,
|
||||||
Error; err != nil {
|
ObjectStoragePort: syncPeer.ObjectStoragePort,
|
||||||
log.Error("Failed to fetch peers in batch: ", err)
|
State: models.PeerStateActive,
|
||||||
return
|
OS: syncPeer.OS,
|
||||||
}
|
Platform: syncPeer.Platform,
|
||||||
|
PlatformFamily: syncPeer.PlatformFamily,
|
||||||
existingPeers = append(existingPeers, batchPeers...)
|
PlatformVersion: syncPeer.PlatformVersion,
|
||||||
|
KernelVersion: syncPeer.KernelVersion,
|
||||||
|
GitVersion: syncPeer.Build.GitVersion,
|
||||||
|
GitCommit: syncPeer.Build.GitCommit,
|
||||||
|
BuildPlatform: syncPeer.Build.Platform,
|
||||||
|
SchedulerClusterID: uint(syncPeer.SchedulerClusterID),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate differences using diffPeers function
|
// Avoid save empty slice.
|
||||||
toUpsert, toDelete := diffPeers(existingPeers, results)
|
if len(peers) > 0 {
|
||||||
|
if err := s.db.WithContext(ctx).CreateInBatches(peers, len(peers)).Error; err != nil {
|
||||||
// Perform batch upsert
|
|
||||||
if len(toUpsert) > 0 {
|
|
||||||
// Construct the upsert query
|
|
||||||
if err := s.db.WithContext(ctx).
|
|
||||||
Clauses(clause.OnConflict{
|
|
||||||
Columns: []clause.Column{{Name: "id"}},
|
|
||||||
UpdateAll: true,
|
|
||||||
}).
|
|
||||||
CreateInBatches(toUpsert, s.config.Job.SyncPeers.BatchSize).
|
|
||||||
Error; err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Perform batch delete
|
|
||||||
if len(toDelete) > 0 {
|
|
||||||
if err := s.db.WithContext(ctx).
|
|
||||||
Delete(&toDelete).
|
|
||||||
Error; err != nil {
|
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func diffPeers(existingPeers []models.Peer, currentPeers []*resource.Host) (toUpsert, toDelete []models.Peer) {
|
|
||||||
// Convert current peers to a map for quick lookup
|
|
||||||
currentPeersMap := slice.KeyBy[string, *resource.Host](currentPeers, func(item *resource.Host) string {
|
|
||||||
return item.ID
|
|
||||||
})
|
|
||||||
|
|
||||||
// Convert existing peers to a map for quick lookup
|
|
||||||
existingPeersMap := slice.KeyBy[string, models.Peer](existingPeers, func(item models.Peer) string {
|
|
||||||
return idgen.HostIDV2(item.IP, item.Hostname, types.ParseHostType(item.Type) != types.HostTypeNormal)
|
|
||||||
})
|
|
||||||
|
|
||||||
// Calculate differences
|
|
||||||
for id, currentPeer := range currentPeersMap {
|
|
||||||
if _, ok := existingPeersMap[id]; ok {
|
|
||||||
// Remove from existingPeersMap to mark it as processed
|
|
||||||
delete(existingPeersMap, id)
|
|
||||||
}
|
|
||||||
// Add all current peers to upsert list
|
|
||||||
toUpsert = append(toUpsert, convertToModelPeer(*currentPeer))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Peers left in existingPeersMap are to be deleted
|
|
||||||
toDelete = slice.Values(existingPeersMap)
|
|
||||||
|
|
||||||
return toUpsert, toDelete
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper function to convert resource.Host to models.Peer
|
|
||||||
func convertToModelPeer(peer resource.Host) models.Peer {
|
|
||||||
return models.Peer{
|
|
||||||
Hostname: peer.Hostname,
|
|
||||||
Type: peer.Type.Name(),
|
|
||||||
IDC: peer.Network.IDC,
|
|
||||||
Location: peer.Network.Location,
|
|
||||||
IP: peer.IP,
|
|
||||||
Port: peer.Port,
|
|
||||||
DownloadPort: peer.DownloadPort,
|
|
||||||
ObjectStoragePort: peer.ObjectStoragePort,
|
|
||||||
State: models.PeerStateActive,
|
|
||||||
OS: peer.OS,
|
|
||||||
Platform: peer.Platform,
|
|
||||||
PlatformFamily: peer.PlatformFamily,
|
|
||||||
PlatformVersion: peer.PlatformVersion,
|
|
||||||
KernelVersion: peer.KernelVersion,
|
|
||||||
GitVersion: peer.Build.GitVersion,
|
|
||||||
GitCommit: peer.Build.GitCommit,
|
|
||||||
BuildPlatform: peer.Build.Platform,
|
|
||||||
SchedulerClusterID: uint(peer.SchedulerClusterID),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -1,118 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2024 The Dragonfly Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package job
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sort"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
|
|
||||||
"d7y.io/dragonfly/v2/manager/models"
|
|
||||||
"d7y.io/dragonfly/v2/pkg/idgen"
|
|
||||||
"d7y.io/dragonfly/v2/pkg/types"
|
|
||||||
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Test_diffPeers(t *testing.T) {
|
|
||||||
type args struct {
|
|
||||||
existingPeers []models.Peer
|
|
||||||
currentPeers []*resource.Host
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
args args
|
|
||||||
wantToUpsert []models.Peer
|
|
||||||
wantToDelete []models.Peer
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "append",
|
|
||||||
args: args{
|
|
||||||
existingPeers: []models.Peer{
|
|
||||||
// delete for not existing
|
|
||||||
generateModePeer("127.0.0.6", "foo6", 80, 80, types.HostTypeSuperSeed),
|
|
||||||
// delete for original HostTypeNormal
|
|
||||||
generateModePeer("127.0.0.5", "foo5", 80, 80, types.HostTypeNormal),
|
|
||||||
// delete for type changed
|
|
||||||
generateModePeer("127.0.0.4", "foo4", 80, 80, types.HostTypeNormal),
|
|
||||||
// update for port changed
|
|
||||||
generateModePeer("127.0.0.1", "foo1", 80, 443, types.HostTypeSuperSeed),
|
|
||||||
// update for type changed
|
|
||||||
generateModePeer("127.0.0.2", "foo2", 80, 80, types.HostTypeStrongSeed),
|
|
||||||
},
|
|
||||||
currentPeers: []*resource.Host{
|
|
||||||
resource.NewHost(
|
|
||||||
idgen.HostIDV2("127.0.0.1", "foo1", true),
|
|
||||||
"127.0.0.1",
|
|
||||||
"foo1",
|
|
||||||
80,
|
|
||||||
80,
|
|
||||||
types.HostTypeSuperSeed),
|
|
||||||
resource.NewHost(
|
|
||||||
idgen.HostIDV2("127.0.0.2", "foo2", true),
|
|
||||||
"127.0.0.2",
|
|
||||||
"foo2",
|
|
||||||
80,
|
|
||||||
80,
|
|
||||||
types.HostTypeSuperSeed),
|
|
||||||
resource.NewHost(
|
|
||||||
idgen.HostIDV2("127.0.0.3", "foo3", true),
|
|
||||||
"127.0.0.3",
|
|
||||||
"foo3",
|
|
||||||
80,
|
|
||||||
80,
|
|
||||||
types.HostTypeSuperSeed), // append only
|
|
||||||
},
|
|
||||||
},
|
|
||||||
wantToUpsert: []models.Peer{
|
|
||||||
generateModePeer("127.0.0.1", "foo1", 80, 80, types.HostTypeSuperSeed),
|
|
||||||
generateModePeer("127.0.0.2", "foo2", 80, 80, types.HostTypeSuperSeed),
|
|
||||||
generateModePeer("127.0.0.3", "foo3", 80, 80, types.HostTypeSuperSeed),
|
|
||||||
},
|
|
||||||
wantToDelete: []models.Peer{
|
|
||||||
generateModePeer("127.0.0.4", "foo4", 80, 80, types.HostTypeNormal),
|
|
||||||
generateModePeer("127.0.0.5", "foo5", 80, 80, types.HostTypeNormal),
|
|
||||||
generateModePeer("127.0.0.6", "foo6", 80, 80, types.HostTypeSuperSeed),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
gotToUpdate, gotToDelete := diffPeers(tt.args.existingPeers, tt.args.currentPeers)
|
|
||||||
// sort the result to compare
|
|
||||||
sort.Slice(gotToUpdate, func(i, j int) bool {
|
|
||||||
return gotToUpdate[i].IP < gotToUpdate[j].IP
|
|
||||||
})
|
|
||||||
sort.Slice(gotToDelete, func(i, j int) bool {
|
|
||||||
return gotToDelete[i].IP < gotToDelete[j].IP
|
|
||||||
})
|
|
||||||
assert.Equalf(t, tt.wantToUpsert, gotToUpdate, "diffPeers toUpsert(%v, %v)", tt.args.existingPeers, tt.args.currentPeers)
|
|
||||||
assert.Equalf(t, tt.wantToDelete, gotToDelete, "diffPeers toDelete(%v, %v)", tt.args.existingPeers, tt.args.currentPeers)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func generateModePeer(ip, hostname string, port, downloadPort int32, typ types.HostType) models.Peer {
|
|
||||||
return models.Peer{
|
|
||||||
Hostname: hostname,
|
|
||||||
Type: typ.Name(),
|
|
||||||
IP: ip,
|
|
||||||
Port: port,
|
|
||||||
State: models.PeerStateActive,
|
|
||||||
DownloadPort: downloadPort,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -22,11 +22,9 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
|
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
|
||||||
"github.com/google/uuid"
|
|
||||||
|
|
||||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||||
internaljob "d7y.io/dragonfly/v2/internal/job"
|
internaljob "d7y.io/dragonfly/v2/internal/job"
|
||||||
"d7y.io/dragonfly/v2/manager/job"
|
|
||||||
"d7y.io/dragonfly/v2/manager/metrics"
|
"d7y.io/dragonfly/v2/manager/metrics"
|
||||||
"d7y.io/dragonfly/v2/manager/models"
|
"d7y.io/dragonfly/v2/manager/models"
|
||||||
"d7y.io/dragonfly/v2/manager/types"
|
"d7y.io/dragonfly/v2/manager/types"
|
||||||
|
|
@ -36,47 +34,13 @@ import (
|
||||||
"d7y.io/dragonfly/v2/pkg/structure"
|
"d7y.io/dragonfly/v2/pkg/structure"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) {
|
func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error {
|
||||||
args, err := structure.StructToMap(json)
|
schedulers, err := s.findSchedulerInClusters(ctx, json.SchedulerClusterIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs, nil)
|
return s.job.SyncPeers.CreateSyncPeers(ctx, schedulers)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var candidateClusters []models.SchedulerCluster
|
|
||||||
for _, scheduler := range candidateSchedulers {
|
|
||||||
candidateClusters = append(candidateClusters, scheduler.SchedulerCluster)
|
|
||||||
}
|
|
||||||
|
|
||||||
taskID := fmt.Sprintf("manager_%v", uuid.New().String())
|
|
||||||
|
|
||||||
if err = s.job.SyncPeers.Run(ctx, job.SyncPeersArgs{
|
|
||||||
CandidateSchedulerClusters: candidateClusters,
|
|
||||||
TaskID: taskID,
|
|
||||||
}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// job here is a local one controlled by the manager self.
|
|
||||||
job := models.Job{
|
|
||||||
TaskID: taskID,
|
|
||||||
BIO: json.BIO,
|
|
||||||
Args: args,
|
|
||||||
Type: json.Type,
|
|
||||||
State: machineryv1tasks.StateStarted,
|
|
||||||
UserID: json.UserID,
|
|
||||||
SchedulerClusters: candidateClusters,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = s.db.WithContext(ctx).Create(&job).Error; err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &job, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) {
|
func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) {
|
||||||
|
|
@ -101,7 +65,7 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs, []string{types.SchedulerFeaturePreheat})
|
candidateSchedulers, err := s.findAllCandidateSchedulersInClusters(ctx, json.SchedulerClusterIDs, []string{types.SchedulerFeaturePreheat})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -144,7 +108,7 @@ func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTask
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
schedulers, err := s.findSchedulers(ctx, json.SchedulerClusterIDs)
|
schedulers, err := s.findAllSchedulersInClusters(ctx, json.SchedulerClusterIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -191,7 +155,7 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
schedulers, err := s.findSchedulers(ctx, json.SchedulerClusterIDs)
|
schedulers, err := s.findAllSchedulersInClusters(ctx, json.SchedulerClusterIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -224,7 +188,54 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele
|
||||||
return &job, nil
|
return &job, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) findSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) {
|
func (s *service) findSchedulerInClusters(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) {
|
||||||
|
var activeSchedulers []models.Scheduler
|
||||||
|
if len(schedulerClusterIDs) != 0 {
|
||||||
|
// Find the scheduler clusters by request.
|
||||||
|
for _, schedulerClusterID := range schedulerClusterIDs {
|
||||||
|
schedulerCluster := models.SchedulerCluster{}
|
||||||
|
if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil {
|
||||||
|
return nil, fmt.Errorf("scheduler cluster id %d: %w", schedulerClusterID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
scheduler := models.Scheduler{}
|
||||||
|
if err := s.db.WithContext(ctx).Preload("SchedulerCluster").First(&scheduler, models.Scheduler{
|
||||||
|
SchedulerClusterID: schedulerCluster.ID,
|
||||||
|
State: models.SchedulerStateActive,
|
||||||
|
}).Error; err != nil {
|
||||||
|
return nil, fmt.Errorf("scheduler cluster id %d: %w", schedulerClusterID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
activeSchedulers = append(activeSchedulers, scheduler)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Find all of the scheduler clusters that has active scheduler.
|
||||||
|
var schedulerClusters []models.SchedulerCluster
|
||||||
|
if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, schedulerCluster := range schedulerClusters {
|
||||||
|
scheduler := models.Scheduler{}
|
||||||
|
if err := s.db.WithContext(ctx).Preload("SchedulerCluster").First(&scheduler, models.Scheduler{
|
||||||
|
SchedulerClusterID: schedulerCluster.ID,
|
||||||
|
State: models.SchedulerStateActive,
|
||||||
|
}).Error; err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
activeSchedulers = append(activeSchedulers, scheduler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(activeSchedulers) == 0 {
|
||||||
|
return nil, errors.New("active schedulers not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
return activeSchedulers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *service) findAllSchedulersInClusters(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) {
|
||||||
var activeSchedulers []models.Scheduler
|
var activeSchedulers []models.Scheduler
|
||||||
if len(schedulerClusterIDs) != 0 {
|
if len(schedulerClusterIDs) != 0 {
|
||||||
// Find the scheduler clusters by request.
|
// Find the scheduler clusters by request.
|
||||||
|
|
@ -271,7 +282,7 @@ func (s *service) findSchedulers(ctx context.Context, schedulerClusterIDs []uint
|
||||||
return activeSchedulers, nil
|
return activeSchedulers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint, features []string) ([]models.Scheduler, error) {
|
func (s *service) findAllCandidateSchedulersInClusters(ctx context.Context, schedulerClusterIDs []uint, features []string) ([]models.Scheduler, error) {
|
||||||
var candidateSchedulers []models.Scheduler
|
var candidateSchedulers []models.Scheduler
|
||||||
if len(schedulerClusterIDs) != 0 {
|
if len(schedulerClusterIDs) != 0 {
|
||||||
// Find the scheduler clusters by request.
|
// Find the scheduler clusters by request.
|
||||||
|
|
|
||||||
|
|
@ -341,12 +341,11 @@ func (mr *MockServiceMockRecorder) CreateSeedPeerCluster(arg0, arg1 any) *gomock
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateSyncPeersJob mocks base method.
|
// CreateSyncPeersJob mocks base method.
|
||||||
func (m *MockService) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) {
|
func (m *MockService) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "CreateSyncPeersJob", ctx, json)
|
ret := m.ctrl.Call(m, "CreateSyncPeersJob", ctx, json)
|
||||||
ret0, _ := ret[0].(*models.Job)
|
ret0, _ := ret[0].(error)
|
||||||
ret1, _ := ret[1].(error)
|
return ret0
|
||||||
return ret0, ret1
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateSyncPeersJob indicates an expected call of CreateSyncPeersJob.
|
// CreateSyncPeersJob indicates an expected call of CreateSyncPeersJob.
|
||||||
|
|
|
||||||
|
|
@ -115,7 +115,7 @@ type Service interface {
|
||||||
GetConfigs(context.Context, types.GetConfigsQuery) ([]models.Config, int64, error)
|
GetConfigs(context.Context, types.GetConfigsQuery) ([]models.Config, int64, error)
|
||||||
|
|
||||||
CreatePreheatJob(context.Context, types.CreatePreheatJobRequest) (*models.Job, error)
|
CreatePreheatJob(context.Context, types.CreatePreheatJobRequest) (*models.Job, error)
|
||||||
CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error)
|
CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error
|
||||||
CreateDeleteTaskJob(context.Context, types.CreateDeleteTaskJobRequest) (*models.Job, error)
|
CreateDeleteTaskJob(context.Context, types.CreateDeleteTaskJobRequest) (*models.Job, error)
|
||||||
CreateGetTaskJob(context.Context, types.CreateGetTaskJobRequest) (*models.Job, error)
|
CreateGetTaskJob(context.Context, types.CreateGetTaskJobRequest) (*models.Job, error)
|
||||||
DestroyJob(context.Context, uint) error
|
DestroyJob(context.Context, uint) error
|
||||||
|
|
|
||||||
|
|
@ -35,37 +35,70 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
type CreateJobRequest struct {
|
type CreateJobRequest struct {
|
||||||
BIO string `json:"bio" binding:"omitempty"`
|
// BIO is the description of the job.
|
||||||
Type string `json:"type" binding:"required"`
|
BIO string `json:"bio" binding:"omitempty"`
|
||||||
Args map[string]any `json:"args" binding:"omitempty"`
|
|
||||||
UserID uint `json:"user_id" binding:"omitempty"`
|
// Type is the type of the job.
|
||||||
SeedPeerClusterIDs []uint `json:"seed_peer_cluster_ids" binding:"omitempty"`
|
Type string `json:"type" binding:"required"`
|
||||||
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
|
|
||||||
|
// Args is the arguments of the job.
|
||||||
|
Args map[string]any `json:"args" binding:"omitempty"`
|
||||||
|
|
||||||
|
// UserID is the user id of the job.
|
||||||
|
UserID uint `json:"user_id" binding:"omitempty"`
|
||||||
|
|
||||||
|
// SeedPeerClusterIDs is the seed peer cluster ids of the job.
|
||||||
|
SeedPeerClusterIDs []uint `json:"seed_peer_cluster_ids" binding:"omitempty"`
|
||||||
|
|
||||||
|
// SchedulerClusterIDs is the scheduler cluster ids of the job.
|
||||||
|
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type UpdateJobRequest struct {
|
type UpdateJobRequest struct {
|
||||||
BIO string `json:"bio" binding:"omitempty"`
|
// BIO is the description of the job.
|
||||||
UserID uint `json:"user_id" binding:"omitempty"`
|
BIO string `json:"bio" binding:"omitempty"`
|
||||||
|
|
||||||
|
// UserID is the user id of the job.
|
||||||
|
UserID uint `json:"user_id" binding:"omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type JobParams struct {
|
type JobParams struct {
|
||||||
|
// Type is the type of the job.
|
||||||
ID uint `uri:"id" binding:"required"`
|
ID uint `uri:"id" binding:"required"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type GetJobsQuery struct {
|
type GetJobsQuery struct {
|
||||||
Type string `form:"type" binding:"omitempty"`
|
// Type is the type of the job.
|
||||||
State string `form:"state" binding:"omitempty,oneof=PENDING RECEIVED STARTED RETRY SUCCESS FAILURE"`
|
Type string `form:"type" binding:"omitempty"`
|
||||||
UserID uint `form:"user_id" binding:"omitempty"`
|
|
||||||
Page int `form:"page" binding:"omitempty,gte=1"`
|
// State is the state of the job.
|
||||||
PerPage int `form:"per_page" binding:"omitempty,gte=1,lte=10000000"`
|
State string `form:"state" binding:"omitempty,oneof=PENDING RECEIVED STARTED RETRY SUCCESS FAILURE"`
|
||||||
|
|
||||||
|
// UserID is the user id of the job.
|
||||||
|
UserID uint `form:"user_id" binding:"omitempty"`
|
||||||
|
|
||||||
|
// Page is the page number of the job list.
|
||||||
|
Page int `form:"page" binding:"omitempty,gte=1"`
|
||||||
|
|
||||||
|
// PerPage is the item count per page of the job list.
|
||||||
|
PerPage int `form:"per_page" binding:"omitempty,gte=1,lte=10000000"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CreatePreheatJobRequest struct {
|
type CreatePreheatJobRequest struct {
|
||||||
BIO string `json:"bio" binding:"omitempty"`
|
// BIO is the description of the job.
|
||||||
Type string `json:"type" binding:"required"`
|
BIO string `json:"bio" binding:"omitempty"`
|
||||||
Args PreheatArgs `json:"args" binding:"omitempty"`
|
|
||||||
UserID uint `json:"user_id" binding:"omitempty"`
|
// Type is the preheating type, support image and file.
|
||||||
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
|
Type string `json:"type" binding:"required"`
|
||||||
|
|
||||||
|
// Args is the arguments of the preheating job.
|
||||||
|
Args PreheatArgs `json:"args" binding:"omitempty"`
|
||||||
|
|
||||||
|
// UserID is the user id of the job.
|
||||||
|
UserID uint `json:"user_id" binding:"omitempty"`
|
||||||
|
|
||||||
|
// SchedulerClusterIDs is the scheduler cluster ids of the job.
|
||||||
|
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type PreheatArgs struct {
|
type PreheatArgs struct {
|
||||||
|
|
@ -104,19 +137,34 @@ type PreheatArgs struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type CreateSyncPeersJobRequest struct {
|
type CreateSyncPeersJobRequest struct {
|
||||||
BIO string `json:"bio" binding:"omitempty"`
|
// BIO is the description of the job.
|
||||||
Type string `json:"type" binding:"required"`
|
BIO string `json:"bio" binding:"omitempty"`
|
||||||
UserID uint `json:"user_id" binding:"omitempty"`
|
|
||||||
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
|
// Type is the type of the job.
|
||||||
Timeout time.Duration `json:"timeout" binding:"omitempty"`
|
Type string `json:"type" binding:"required"`
|
||||||
|
|
||||||
|
// UserID is the user id of the job.
|
||||||
|
UserID uint `json:"user_id" binding:"omitempty"`
|
||||||
|
|
||||||
|
// SeedPeerClusterIDs is the seed peer cluster ids of the job.
|
||||||
|
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CreateGetTaskJobRequest struct {
|
type CreateGetTaskJobRequest struct {
|
||||||
BIO string `json:"bio" binding:"omitempty"`
|
// BIO is the description of the job.
|
||||||
Type string `json:"type" binding:"required"`
|
BIO string `json:"bio" binding:"omitempty"`
|
||||||
Args GetTaskArgs `json:"args" binding:"omitempty"`
|
|
||||||
UserID uint `json:"user_id" binding:"omitempty"`
|
// Type is the type of the job.
|
||||||
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
|
Type string `json:"type" binding:"required"`
|
||||||
|
|
||||||
|
// Args is the arguments of the job.
|
||||||
|
Args GetTaskArgs `json:"args" binding:"omitempty"`
|
||||||
|
|
||||||
|
// UserID is the user id of the job.
|
||||||
|
UserID uint `json:"user_id" binding:"omitempty"`
|
||||||
|
|
||||||
|
// SchedulerClusterIDs is the scheduler cluster ids of the job.
|
||||||
|
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type GetTaskArgs struct {
|
type GetTaskArgs struct {
|
||||||
|
|
@ -137,11 +185,20 @@ type GetTaskArgs struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type CreateDeleteTaskJobRequest struct {
|
type CreateDeleteTaskJobRequest struct {
|
||||||
BIO string `json:"bio" binding:"omitempty"`
|
// BIO is the description of the job.
|
||||||
Type string `json:"type" binding:"required"`
|
BIO string `json:"bio" binding:"omitempty"`
|
||||||
Args DeleteTaskArgs `json:"args" binding:"omitempty"`
|
|
||||||
UserID uint `json:"user_id" binding:"omitempty"`
|
// Type is the type of the job.
|
||||||
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
|
Type string `json:"type" binding:"required"`
|
||||||
|
|
||||||
|
// Args is the arguments of the job.
|
||||||
|
Args DeleteTaskArgs `json:"args" binding:"omitempty"`
|
||||||
|
|
||||||
|
// UserID is the user id of the job.
|
||||||
|
UserID uint `json:"user_id" binding:"omitempty"`
|
||||||
|
|
||||||
|
// SchedulerClusterIDs is the scheduler cluster ids of the job.
|
||||||
|
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type DeleteTaskArgs struct {
|
type DeleteTaskArgs struct {
|
||||||
|
|
|
||||||
|
|
@ -1,44 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2023 The Dragonfly Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package slice
|
|
||||||
|
|
||||||
func KeyBy[K comparable, V any](collection []V, iteratee func(item V) K) map[K]V {
|
|
||||||
result := make(map[K]V, len(collection))
|
|
||||||
|
|
||||||
for i := range collection {
|
|
||||||
k := iteratee(collection[i])
|
|
||||||
result[k] = collection[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func Values[K comparable, V any](in ...map[K]V) []V {
|
|
||||||
size := 0
|
|
||||||
for i := range in {
|
|
||||||
size += len(in[i])
|
|
||||||
}
|
|
||||||
result := make([]V, 0, size)
|
|
||||||
|
|
||||||
for i := range in {
|
|
||||||
for k := range in[i] {
|
|
||||||
result = append(result, in[i][k])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
@ -1,102 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2023 The Dragonfly Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package slice
|
|
||||||
|
|
||||||
import (
|
|
||||||
"reflect"
|
|
||||||
"sort"
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TestKeyBy tests the KeyBy function
|
|
||||||
func TestKeyBy(t *testing.T) {
|
|
||||||
type Person struct {
|
|
||||||
ID int
|
|
||||||
Name string
|
|
||||||
}
|
|
||||||
|
|
||||||
people := []Person{
|
|
||||||
{ID: 1, Name: "Alice"},
|
|
||||||
{ID: 2, Name: "Bob"},
|
|
||||||
{ID: 3, Name: "Charlie"},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test case 1: Key by ID
|
|
||||||
keyByID := KeyBy(people, func(p Person) int {
|
|
||||||
return p.ID
|
|
||||||
})
|
|
||||||
expectedKeyByID := map[int]Person{
|
|
||||||
1: {ID: 1, Name: "Alice"},
|
|
||||||
2: {ID: 2, Name: "Bob"},
|
|
||||||
3: {ID: 3, Name: "Charlie"},
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(keyByID, expectedKeyByID) {
|
|
||||||
t.Errorf("KeyBy by ID failed, expected %v, got %v", expectedKeyByID, keyByID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test case 2: Key by Name
|
|
||||||
keyByName := KeyBy(people, func(p Person) string {
|
|
||||||
return p.Name
|
|
||||||
})
|
|
||||||
expectedKeyByName := map[string]Person{
|
|
||||||
"Alice": {ID: 1, Name: "Alice"},
|
|
||||||
"Bob": {ID: 2, Name: "Bob"},
|
|
||||||
"Charlie": {ID: 3, Name: "Charlie"},
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(keyByName, expectedKeyByName) {
|
|
||||||
t.Errorf("KeyBy by Name failed, expected %v, got %v", expectedKeyByName, keyByName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestValues tests the Values function
|
|
||||||
func TestValues(t *testing.T) {
|
|
||||||
map1 := map[int]string{
|
|
||||||
1: "one",
|
|
||||||
2: "two",
|
|
||||||
}
|
|
||||||
map2 := map[int]string{
|
|
||||||
3: "three",
|
|
||||||
4: "four",
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test case 1: Values from one map
|
|
||||||
values1 := Values(map1)
|
|
||||||
expectedValues1 := []string{"one", "two"}
|
|
||||||
|
|
||||||
sort.Strings(values1)
|
|
||||||
sort.Strings(expectedValues1)
|
|
||||||
if !reflect.DeepEqual(values1, expectedValues1) {
|
|
||||||
t.Errorf("Values from one map failed, expected %v, got %v", expectedValues1, values1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test case 2: Values from multiple maps
|
|
||||||
values2 := Values(map1, map2)
|
|
||||||
expectedValues2 := []string{"one", "two", "three", "four"}
|
|
||||||
|
|
||||||
sort.Strings(values2)
|
|
||||||
sort.Strings(expectedValues2)
|
|
||||||
if !reflect.DeepEqual(values2, expectedValues2) {
|
|
||||||
t.Errorf("Values from multiple maps failed, expected %v, got %v", expectedValues2, values2)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test case 3: Values from empty maps
|
|
||||||
values3 := Values(map[int]string{}, map[int]string{})
|
|
||||||
expectedValues3 := []string{}
|
|
||||||
if !reflect.DeepEqual(values3, expectedValues3) {
|
|
||||||
t.Errorf("Values from empty maps failed, expected %v, got %v", expectedValues3, values3)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -123,7 +123,7 @@ func Parse(digest string) (*Digest, error) {
|
||||||
|
|
||||||
switch algorithm {
|
switch algorithm {
|
||||||
case AlgorithmCRC32:
|
case AlgorithmCRC32:
|
||||||
if len(encoded) != 8 && len(encoded) != 10 {
|
if len(encoded) <= 0 {
|
||||||
return nil, errors.New("invalid encoded")
|
return nil, errors.New("invalid encoded")
|
||||||
}
|
}
|
||||||
case AlgorithmBlake3:
|
case AlgorithmBlake3:
|
||||||
|
|
|
||||||
|
|
@ -472,6 +472,7 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ
|
||||||
resource.WithPlatformFamily(req.GetPlatformFamily()),
|
resource.WithPlatformFamily(req.GetPlatformFamily()),
|
||||||
resource.WithPlatformVersion(req.GetPlatformVersion()),
|
resource.WithPlatformVersion(req.GetPlatformVersion()),
|
||||||
resource.WithKernelVersion(req.GetKernelVersion()),
|
resource.WithKernelVersion(req.GetKernelVersion()),
|
||||||
|
resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID)),
|
||||||
}
|
}
|
||||||
|
|
||||||
if concurrentUploadLimit > 0 {
|
if concurrentUploadLimit > 0 {
|
||||||
|
|
@ -541,10 +542,6 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.GetSchedulerClusterId() != 0 {
|
|
||||||
options = append(options, resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID)))
|
|
||||||
}
|
|
||||||
|
|
||||||
if req.GetObjectStoragePort() != 0 {
|
if req.GetObjectStoragePort() != 0 {
|
||||||
options = append(options, resource.WithObjectStoragePort(req.GetObjectStoragePort()))
|
options = append(options, resource.WithObjectStoragePort(req.GetObjectStoragePort()))
|
||||||
}
|
}
|
||||||
|
|
@ -654,6 +651,9 @@ func (v *V1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) e
|
||||||
|
|
||||||
// Leave peers in host.
|
// Leave peers in host.
|
||||||
host.LeavePeers()
|
host.LeavePeers()
|
||||||
|
|
||||||
|
// Delete host in scheduler.
|
||||||
|
v.resource.HostManager().Delete(host.ID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2232,6 +2232,8 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2247,6 +2249,8 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2262,6 +2266,8 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2278,6 +2284,8 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2294,6 +2302,8 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2310,6 +2320,8 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2326,6 +2338,8 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2342,6 +2356,8 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2358,6 +2374,8 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2374,6 +2392,8 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2390,6 +2410,8 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
|
||||||
|
|
@ -521,6 +521,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
|
||||||
standard.WithPlatformFamily(req.Host.GetPlatformFamily()),
|
standard.WithPlatformFamily(req.Host.GetPlatformFamily()),
|
||||||
standard.WithPlatformVersion(req.Host.GetPlatformVersion()),
|
standard.WithPlatformVersion(req.Host.GetPlatformVersion()),
|
||||||
standard.WithKernelVersion(req.Host.GetKernelVersion()),
|
standard.WithKernelVersion(req.Host.GetKernelVersion()),
|
||||||
|
standard.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID)),
|
||||||
}
|
}
|
||||||
|
|
||||||
if concurrentUploadLimit > 0 {
|
if concurrentUploadLimit > 0 {
|
||||||
|
|
@ -596,10 +597,6 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Host.GetSchedulerClusterId() != 0 {
|
|
||||||
options = append(options, standard.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID)))
|
|
||||||
}
|
|
||||||
|
|
||||||
if req.GetInterval() != nil {
|
if req.GetInterval() != nil {
|
||||||
options = append(options, standard.WithAnnounceInterval(req.GetInterval().AsDuration()))
|
options = append(options, standard.WithAnnounceInterval(req.GetInterval().AsDuration()))
|
||||||
}
|
}
|
||||||
|
|
@ -979,6 +976,9 @@ func (v *V2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest)
|
||||||
|
|
||||||
// Leave peers in host.
|
// Leave peers in host.
|
||||||
host.LeavePeers()
|
host.LeavePeers()
|
||||||
|
|
||||||
|
// Delete host in scheduler.
|
||||||
|
v.resource.HostManager().Delete(req.GetHostId())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1591,6 +1591,8 @@ func TestServiceV2_DeleteHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *standard.Peer, err error) {
|
expect: func(t *testing.T, peer *standard.Peer, err error) {
|
||||||
|
|
@ -1606,6 +1608,8 @@ func TestServiceV2_DeleteHost(t *testing.T) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.Delete(gomock.Any()).Return().Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *standard.Peer, err error) {
|
expect: func(t *testing.T, peer *standard.Peer, err error) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue