enhance: support syncpeers by service and optimize the merge logic (#3637)
* enhance: support syncpeers by service and optimize the merge logic Signed-off-by: cormick <cormick1080@gmail.com>
This commit is contained in:
parent
c220a60464
commit
d0e41b538c
|
|
@ -23,8 +23,8 @@ jobs:
|
|||
- name: Golangci lint
|
||||
uses: golangci/golangci-lint-action@971e284b6050e8a5849b72094c50ab08da042db8
|
||||
with:
|
||||
version: v1.54
|
||||
args: --verbose
|
||||
version: v1.62
|
||||
args: --verbose --timeout=10m
|
||||
|
||||
- name: Markdown lint
|
||||
uses: docker://avtodev/markdown-lint:v1@sha256:6aeedc2f49138ce7a1cd0adffc1b1c0321b841dc2102408967d9301c031949ee
|
||||
|
|
|
|||
|
|
@ -24,11 +24,9 @@ linters:
|
|||
enable:
|
||||
- gci
|
||||
- gofmt
|
||||
- golint
|
||||
- misspell
|
||||
- govet
|
||||
- goconst
|
||||
- deadcode
|
||||
- gocyclo
|
||||
- staticcheck
|
||||
- errcheck
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
Subproject commit 10bab190a28552cd05760b7bd75626990834877a
|
||||
Subproject commit ee21989120b16db4e8456e7d163f9ba06aad98c9
|
||||
|
|
@ -338,6 +338,9 @@ type SyncPeersConfig struct {
|
|||
|
||||
// Timeout is the timeout for syncing peers information from the single scheduler.
|
||||
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
|
||||
|
||||
// BatchSize is the batch size when operating gorm.
|
||||
BatchSize int `yaml:"batchSize" mapstructure:"batchSize"`
|
||||
}
|
||||
|
||||
type PreheatTLSClientConfig struct {
|
||||
|
|
@ -449,6 +452,7 @@ func New() *Config {
|
|||
SyncPeers: SyncPeersConfig{
|
||||
Interval: DefaultJobSyncPeersInterval,
|
||||
Timeout: DefaultJobSyncPeersTimeout,
|
||||
BatchSize: DefaultJobSyncPeersBatchSize,
|
||||
},
|
||||
},
|
||||
ObjectStorage: ObjectStorageConfig{
|
||||
|
|
|
|||
|
|
@ -107,6 +107,9 @@ const (
|
|||
|
||||
// DefaultClusterJobRateLimit is default rate limit(requests per second) for job Open API by cluster.
|
||||
DefaultClusterJobRateLimit = 10
|
||||
|
||||
// DefaultJobSyncPeersBatchSize is the default batch size for syncing all peers information from the scheduler.
|
||||
DefaultJobSyncPeersBatchSize = 500
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
|||
|
|
@ -62,6 +62,20 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, job)
|
||||
case job.SyncPeersJob:
|
||||
var json types.CreateSyncPeersJobRequest
|
||||
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
|
||||
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
job, err := h.service.CreateSyncPeersJob(ctx.Request.Context(), json)
|
||||
if err != nil {
|
||||
ctx.Error(err) // nolint: errcheck
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, job)
|
||||
case job.GetTaskJob:
|
||||
var json types.CreateGetTaskJobRequest
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import (
|
|||
context "context"
|
||||
reflect "reflect"
|
||||
|
||||
job "d7y.io/dragonfly/v2/manager/job"
|
||||
gomock "go.uber.org/mock/gomock"
|
||||
)
|
||||
|
||||
|
|
@ -41,17 +42,17 @@ func (m *MockSyncPeers) EXPECT() *MockSyncPeersMockRecorder {
|
|||
}
|
||||
|
||||
// Run mocks base method.
|
||||
func (m *MockSyncPeers) Run(arg0 context.Context) error {
|
||||
func (m *MockSyncPeers) Run(arg0 context.Context, arg1 job.SyncPeersArgs) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Run", arg0)
|
||||
ret := m.ctrl.Call(m, "Run", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Run indicates an expected call of Run.
|
||||
func (mr *MockSyncPeersMockRecorder) Run(arg0 any) *gomock.Call {
|
||||
func (mr *MockSyncPeersMockRecorder) Run(arg0, arg1 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSyncPeers)(nil).Run), arg0)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSyncPeers)(nil).Run), arg0, arg1)
|
||||
}
|
||||
|
||||
// Serve mocks base method.
|
||||
|
|
|
|||
|
|
@ -21,17 +21,20 @@ package job
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
|
||||
"github.com/google/uuid"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
|
||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||
internaljob "d7y.io/dragonfly/v2/internal/job"
|
||||
"d7y.io/dragonfly/v2/manager/config"
|
||||
"d7y.io/dragonfly/v2/manager/models"
|
||||
"d7y.io/dragonfly/v2/pkg/container/slice"
|
||||
"d7y.io/dragonfly/v2/pkg/idgen"
|
||||
"d7y.io/dragonfly/v2/pkg/types"
|
||||
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
|
||||
|
|
@ -39,8 +42,8 @@ import (
|
|||
|
||||
// SyncPeers is an interface for sync peers.
|
||||
type SyncPeers interface {
|
||||
// Run sync peers.
|
||||
Run(context.Context) error
|
||||
// Run execute action to sync peers, which is async.
|
||||
Run(context.Context, SyncPeersArgs) error
|
||||
|
||||
// Serve started sync peers server.
|
||||
Serve()
|
||||
|
|
@ -55,6 +58,14 @@ type syncPeers struct {
|
|||
job *internaljob.Job
|
||||
db *gorm.DB
|
||||
done chan struct{}
|
||||
|
||||
syncLocker sync.Mutex
|
||||
workChan chan SyncPeersArgs
|
||||
}
|
||||
|
||||
type SyncPeersArgs struct {
|
||||
CandidateSchedulerClusters []models.SchedulerCluster
|
||||
TaskID string
|
||||
}
|
||||
|
||||
// newSyncPeers returns a new SyncPeers.
|
||||
|
|
@ -64,16 +75,76 @@ func newSyncPeers(cfg *config.Config, job *internaljob.Job, gdb *gorm.DB) (SyncP
|
|||
db: gdb,
|
||||
job: job,
|
||||
done: make(chan struct{}),
|
||||
workChan: make(chan SyncPeersArgs, 10),
|
||||
syncLocker: sync.Mutex{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run sync peers.
|
||||
func (s *syncPeers) Run(ctx context.Context) error {
|
||||
// Find all of the scheduler clusters that has active schedulers.
|
||||
var candidateSchedulerClusters []models.SchedulerCluster
|
||||
// Run start to sync peers.
|
||||
func (s *syncPeers) Run(ctx context.Context, args SyncPeersArgs) error {
|
||||
if len(args.CandidateSchedulerClusters) == 0 {
|
||||
if err := s.db.WithContext(ctx).Find(&args.CandidateSchedulerClusters).Error; err != nil {
|
||||
return fmt.Errorf("failed to get candidate scheduler clusters: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
s.workChan <- args
|
||||
return nil
|
||||
}
|
||||
|
||||
// Serve started sync peers server.
|
||||
func (s *syncPeers) Serve() {
|
||||
ticker := time.NewTicker(s.config.Job.SyncPeers.Interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
logger.Debugf("start to sync peers periodically")
|
||||
if err := s.syncPeers(context.Background(), nil); err != nil {
|
||||
logger.Errorf("sync peers failed periodically: %v", err)
|
||||
}
|
||||
case args := <-s.workChan:
|
||||
logger.Debugf("start to sync peers for request")
|
||||
err := s.syncPeers(context.Background(), args.CandidateSchedulerClusters)
|
||||
if err != nil {
|
||||
logger.Errorf("sync peers failed for request: %v", err)
|
||||
}
|
||||
|
||||
if args.TaskID != "" {
|
||||
job := models.Job{}
|
||||
state := machineryv1tasks.StateFailure
|
||||
if err == nil {
|
||||
state = machineryv1tasks.StateSuccess
|
||||
}
|
||||
if updateErr := s.db.WithContext(context.Background()).First(&job, "task_id = ?", args.TaskID).Updates(models.Job{
|
||||
State: state,
|
||||
}).Error; updateErr != nil {
|
||||
logger.Errorf("update sync peers job result failed for request: %v", updateErr)
|
||||
}
|
||||
}
|
||||
case <-s.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop sync peers server.
|
||||
func (s *syncPeers) Stop() {
|
||||
close(s.done)
|
||||
}
|
||||
|
||||
// syncPeers is the real working function in synchronous mode.
|
||||
func (s *syncPeers) syncPeers(ctx context.Context, candidateSchedulerClusters []models.SchedulerCluster) error {
|
||||
if !s.syncLocker.TryLock() {
|
||||
return fmt.Errorf("another sync peers is already running")
|
||||
}
|
||||
defer s.syncLocker.Unlock()
|
||||
|
||||
if len(candidateSchedulerClusters) == 0 {
|
||||
if err := s.db.WithContext(ctx).Find(&candidateSchedulerClusters).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Find all of the schedulers that has active scheduler cluster.
|
||||
var candidateSchedulers []models.Scheduler
|
||||
|
|
@ -111,26 +182,6 @@ func (s *syncPeers) Run(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Serve started sync peers server.
|
||||
func (s *syncPeers) Serve() {
|
||||
tick := time.NewTicker(s.config.Job.SyncPeers.Interval)
|
||||
for {
|
||||
select {
|
||||
case <-tick.C:
|
||||
if err := s.Run(context.Background()); err != nil {
|
||||
logger.Errorf("sync peers failed: %v", err)
|
||||
}
|
||||
case <-s.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop sync peers server.
|
||||
func (s *syncPeers) Stop() {
|
||||
close(s.done)
|
||||
}
|
||||
|
||||
// createSyncPeers creates sync peers.
|
||||
func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Scheduler) ([]*resource.Host, error) {
|
||||
var span trace.Span
|
||||
|
|
@ -154,7 +205,7 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu
|
|||
logger.Infof("create sync peers in queue %v, task: %#v", queue, task)
|
||||
asyncResult, err := s.job.Server.SendTaskWithContext(ctx, task)
|
||||
if err != nil {
|
||||
logger.Errorf("create sync peers in queue %v failed", queue, err)
|
||||
logger.Errorf("create sync peers in queue %v failed: %v", queue, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
@ -175,84 +226,111 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu
|
|||
|
||||
// Merge sync peer results with the data in the peer table.
|
||||
func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler, results []*resource.Host, log *logger.SugaredLoggerOnWith) {
|
||||
// Convert sync peer results from slice to map.
|
||||
syncPeers := make(map[string]*resource.Host)
|
||||
for _, result := range results {
|
||||
syncPeers[result.ID] = result
|
||||
}
|
||||
// Fetch existing peers from the database
|
||||
var existingPeers []models.Peer
|
||||
var count int64
|
||||
|
||||
rows, err := s.db.Model(&models.Peer{}).Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID).Rows()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
if err := s.db.Model(&models.Peer{}).
|
||||
Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID).
|
||||
Count(&count).
|
||||
Error; err != nil {
|
||||
log.Error("failed to count existing peers: ", err)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
peer := models.Peer{}
|
||||
if err := s.db.ScanRows(rows, &peer); err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
log.Infof("total peers count: %d", count)
|
||||
|
||||
pageSize := s.config.Job.SyncPeers.BatchSize
|
||||
totalPages := (count + int64(pageSize-1)) / int64(pageSize)
|
||||
|
||||
for page := 1; page <= int(totalPages); page++ {
|
||||
var batchPeers []models.Peer
|
||||
if err := s.db.Preload("SchedulerCluster").
|
||||
Scopes(models.Paginate(page, pageSize)).
|
||||
Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID).
|
||||
Find(&batchPeers).
|
||||
Error; err != nil {
|
||||
log.Error("Failed to fetch peers in batch: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
// If the peer exists in the sync peer results, update the peer data in the database with
|
||||
// the sync peer results and delete the sync peer from the sync peers map.
|
||||
isSeedPeer := types.ParseHostType(peer.Type) != types.HostTypeNormal
|
||||
id := idgen.HostIDV2(peer.IP, peer.Hostname, isSeedPeer)
|
||||
if syncPeer, ok := syncPeers[id]; ok {
|
||||
if err := s.db.WithContext(ctx).First(&models.Peer{}, peer.ID).Updates(models.Peer{
|
||||
Type: syncPeer.Type.Name(),
|
||||
IDC: syncPeer.Network.IDC,
|
||||
Location: syncPeer.Network.Location,
|
||||
Port: syncPeer.Port,
|
||||
DownloadPort: syncPeer.DownloadPort,
|
||||
ObjectStoragePort: syncPeer.ObjectStoragePort,
|
||||
State: models.PeerStateActive,
|
||||
OS: syncPeer.OS,
|
||||
Platform: syncPeer.Platform,
|
||||
PlatformFamily: syncPeer.PlatformFamily,
|
||||
PlatformVersion: syncPeer.PlatformVersion,
|
||||
KernelVersion: syncPeer.KernelVersion,
|
||||
GitVersion: syncPeer.Build.GitVersion,
|
||||
GitCommit: syncPeer.Build.GitCommit,
|
||||
BuildPlatform: syncPeer.Build.Platform,
|
||||
}).Error; err != nil {
|
||||
log.Error(err)
|
||||
existingPeers = append(existingPeers, batchPeers...)
|
||||
}
|
||||
|
||||
// Delete the sync peer from the sync peers map.
|
||||
delete(syncPeers, id)
|
||||
} else {
|
||||
// If the peer does not exist in the sync peer results, delete the peer in the database.
|
||||
if err := s.db.WithContext(ctx).Unscoped().Delete(&models.Peer{}, peer.ID).Error; err != nil {
|
||||
// Calculate differences using diffPeers function
|
||||
toUpsert, toDelete := diffPeers(existingPeers, results)
|
||||
|
||||
// Perform batch upsert
|
||||
if len(toUpsert) > 0 {
|
||||
// Construct the upsert query
|
||||
if err := s.db.WithContext(ctx).
|
||||
Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "id"}},
|
||||
UpdateAll: true,
|
||||
}).
|
||||
CreateInBatches(toUpsert, s.config.Job.SyncPeers.BatchSize).
|
||||
Error; err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Insert the sync peers that do not exist in the database into the peer table.
|
||||
for _, syncPeer := range syncPeers {
|
||||
if err := s.db.WithContext(ctx).Create(&models.Peer{
|
||||
Hostname: syncPeer.Hostname,
|
||||
Type: syncPeer.Type.Name(),
|
||||
IDC: syncPeer.Network.IDC,
|
||||
Location: syncPeer.Network.Location,
|
||||
IP: syncPeer.IP,
|
||||
Port: syncPeer.Port,
|
||||
DownloadPort: syncPeer.DownloadPort,
|
||||
ObjectStoragePort: syncPeer.ObjectStoragePort,
|
||||
State: models.PeerStateActive,
|
||||
OS: syncPeer.OS,
|
||||
Platform: syncPeer.Platform,
|
||||
PlatformFamily: syncPeer.PlatformFamily,
|
||||
PlatformVersion: syncPeer.PlatformVersion,
|
||||
KernelVersion: syncPeer.KernelVersion,
|
||||
GitVersion: syncPeer.Build.GitVersion,
|
||||
GitCommit: syncPeer.Build.GitCommit,
|
||||
BuildPlatform: syncPeer.Build.Platform,
|
||||
SchedulerClusterID: uint(syncPeer.SchedulerClusterID),
|
||||
}).Error; err != nil {
|
||||
// Perform batch delete
|
||||
if len(toDelete) > 0 {
|
||||
if err := s.db.WithContext(ctx).
|
||||
Delete(&toDelete).
|
||||
Error; err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func diffPeers(existingPeers []models.Peer, currentPeers []*resource.Host) (toUpsert, toDelete []models.Peer) {
|
||||
// Convert current peers to a map for quick lookup
|
||||
currentPeersMap := slice.KeyBy[string, *resource.Host](currentPeers, func(item *resource.Host) string {
|
||||
return item.ID
|
||||
})
|
||||
|
||||
// Convert existing peers to a map for quick lookup
|
||||
existingPeersMap := slice.KeyBy[string, models.Peer](existingPeers, func(item models.Peer) string {
|
||||
return idgen.HostIDV2(item.IP, item.Hostname, types.ParseHostType(item.Type) != types.HostTypeNormal)
|
||||
})
|
||||
|
||||
// Calculate differences
|
||||
for id, currentPeer := range currentPeersMap {
|
||||
if _, ok := existingPeersMap[id]; ok {
|
||||
// Remove from existingPeersMap to mark it as processed
|
||||
delete(existingPeersMap, id)
|
||||
}
|
||||
// Add all current peers to upsert list
|
||||
toUpsert = append(toUpsert, convertToModelPeer(*currentPeer))
|
||||
}
|
||||
|
||||
// Peers left in existingPeersMap are to be deleted
|
||||
toDelete = slice.Values(existingPeersMap)
|
||||
|
||||
return toUpsert, toDelete
|
||||
}
|
||||
|
||||
// Helper function to convert resource.Host to models.Peer
|
||||
func convertToModelPeer(peer resource.Host) models.Peer {
|
||||
return models.Peer{
|
||||
Hostname: peer.Hostname,
|
||||
Type: peer.Type.Name(),
|
||||
IDC: peer.Network.IDC,
|
||||
Location: peer.Network.Location,
|
||||
IP: peer.IP,
|
||||
Port: peer.Port,
|
||||
DownloadPort: peer.DownloadPort,
|
||||
ObjectStoragePort: peer.ObjectStoragePort,
|
||||
State: models.PeerStateActive,
|
||||
OS: peer.OS,
|
||||
Platform: peer.Platform,
|
||||
PlatformFamily: peer.PlatformFamily,
|
||||
PlatformVersion: peer.PlatformVersion,
|
||||
KernelVersion: peer.KernelVersion,
|
||||
GitVersion: peer.Build.GitVersion,
|
||||
GitCommit: peer.Build.GitCommit,
|
||||
BuildPlatform: peer.Build.Platform,
|
||||
SchedulerClusterID: uint(peer.SchedulerClusterID),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,118 @@
|
|||
/*
|
||||
* Copyright 2024 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package job
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"d7y.io/dragonfly/v2/manager/models"
|
||||
"d7y.io/dragonfly/v2/pkg/idgen"
|
||||
"d7y.io/dragonfly/v2/pkg/types"
|
||||
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
|
||||
)
|
||||
|
||||
func Test_diffPeers(t *testing.T) {
|
||||
type args struct {
|
||||
existingPeers []models.Peer
|
||||
currentPeers []*resource.Host
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantToUpsert []models.Peer
|
||||
wantToDelete []models.Peer
|
||||
}{
|
||||
{
|
||||
name: "append",
|
||||
args: args{
|
||||
existingPeers: []models.Peer{
|
||||
// delete for not existing
|
||||
generateModePeer("127.0.0.6", "foo6", 80, 80, types.HostTypeSuperSeed),
|
||||
// delete for original HostTypeNormal
|
||||
generateModePeer("127.0.0.5", "foo5", 80, 80, types.HostTypeNormal),
|
||||
// delete for type changed
|
||||
generateModePeer("127.0.0.4", "foo4", 80, 80, types.HostTypeNormal),
|
||||
// update for port changed
|
||||
generateModePeer("127.0.0.1", "foo1", 80, 443, types.HostTypeSuperSeed),
|
||||
// update for type changed
|
||||
generateModePeer("127.0.0.2", "foo2", 80, 80, types.HostTypeStrongSeed),
|
||||
},
|
||||
currentPeers: []*resource.Host{
|
||||
resource.NewHost(
|
||||
idgen.HostIDV2("127.0.0.1", "foo1", true),
|
||||
"127.0.0.1",
|
||||
"foo1",
|
||||
80,
|
||||
80,
|
||||
types.HostTypeSuperSeed),
|
||||
resource.NewHost(
|
||||
idgen.HostIDV2("127.0.0.2", "foo2", true),
|
||||
"127.0.0.2",
|
||||
"foo2",
|
||||
80,
|
||||
80,
|
||||
types.HostTypeSuperSeed),
|
||||
resource.NewHost(
|
||||
idgen.HostIDV2("127.0.0.3", "foo3", true),
|
||||
"127.0.0.3",
|
||||
"foo3",
|
||||
80,
|
||||
80,
|
||||
types.HostTypeSuperSeed), // append only
|
||||
},
|
||||
},
|
||||
wantToUpsert: []models.Peer{
|
||||
generateModePeer("127.0.0.1", "foo1", 80, 80, types.HostTypeSuperSeed),
|
||||
generateModePeer("127.0.0.2", "foo2", 80, 80, types.HostTypeSuperSeed),
|
||||
generateModePeer("127.0.0.3", "foo3", 80, 80, types.HostTypeSuperSeed),
|
||||
},
|
||||
wantToDelete: []models.Peer{
|
||||
generateModePeer("127.0.0.4", "foo4", 80, 80, types.HostTypeNormal),
|
||||
generateModePeer("127.0.0.5", "foo5", 80, 80, types.HostTypeNormal),
|
||||
generateModePeer("127.0.0.6", "foo6", 80, 80, types.HostTypeSuperSeed),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
gotToUpdate, gotToDelete := diffPeers(tt.args.existingPeers, tt.args.currentPeers)
|
||||
// sort the result to compare
|
||||
sort.Slice(gotToUpdate, func(i, j int) bool {
|
||||
return gotToUpdate[i].IP < gotToUpdate[j].IP
|
||||
})
|
||||
sort.Slice(gotToDelete, func(i, j int) bool {
|
||||
return gotToDelete[i].IP < gotToDelete[j].IP
|
||||
})
|
||||
assert.Equalf(t, tt.wantToUpsert, gotToUpdate, "diffPeers toUpsert(%v, %v)", tt.args.existingPeers, tt.args.currentPeers)
|
||||
assert.Equalf(t, tt.wantToDelete, gotToDelete, "diffPeers toDelete(%v, %v)", tt.args.existingPeers, tt.args.currentPeers)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func generateModePeer(ip, hostname string, port, downloadPort int32, typ types.HostType) models.Peer {
|
||||
return models.Peer{
|
||||
Hostname: hostname,
|
||||
Type: typ.Name(),
|
||||
IP: ip,
|
||||
Port: port,
|
||||
State: models.PeerStateActive,
|
||||
DownloadPort: downloadPort,
|
||||
}
|
||||
}
|
||||
|
|
@ -22,9 +22,11 @@ import (
|
|||
"fmt"
|
||||
|
||||
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
|
||||
"github.com/google/uuid"
|
||||
|
||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||
internaljob "d7y.io/dragonfly/v2/internal/job"
|
||||
"d7y.io/dragonfly/v2/manager/job"
|
||||
"d7y.io/dragonfly/v2/manager/metrics"
|
||||
"d7y.io/dragonfly/v2/manager/models"
|
||||
"d7y.io/dragonfly/v2/manager/types"
|
||||
|
|
@ -34,6 +36,49 @@ import (
|
|||
"d7y.io/dragonfly/v2/pkg/structure"
|
||||
)
|
||||
|
||||
func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) {
|
||||
args, err := structure.StructToMap(json)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var candidateClusters []models.SchedulerCluster
|
||||
for _, scheduler := range candidateSchedulers {
|
||||
candidateClusters = append(candidateClusters, scheduler.SchedulerCluster)
|
||||
}
|
||||
|
||||
taskID := fmt.Sprintf("manager_%v", uuid.New().String())
|
||||
|
||||
if err = s.job.SyncPeers.Run(ctx, job.SyncPeersArgs{
|
||||
CandidateSchedulerClusters: candidateClusters,
|
||||
TaskID: taskID,
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// job here is a local one controlled by the manager self.
|
||||
job := models.Job{
|
||||
TaskID: taskID,
|
||||
BIO: json.BIO,
|
||||
Args: args,
|
||||
Type: json.Type,
|
||||
State: machineryv1tasks.StateStarted,
|
||||
UserID: json.UserID,
|
||||
SchedulerClusters: candidateClusters,
|
||||
}
|
||||
|
||||
if err = s.db.WithContext(ctx).Create(&job).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &job, nil
|
||||
}
|
||||
|
||||
func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) {
|
||||
if json.Args.Scope == "" {
|
||||
json.Args.Scope = types.SinglePeerScope
|
||||
|
|
|
|||
|
|
@ -340,6 +340,21 @@ func (mr *MockServiceMockRecorder) CreateSeedPeerCluster(arg0, arg1 any) *gomock
|
|||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSeedPeerCluster", reflect.TypeOf((*MockService)(nil).CreateSeedPeerCluster), arg0, arg1)
|
||||
}
|
||||
|
||||
// CreateSyncPeersJob mocks base method.
|
||||
func (m *MockService) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "CreateSyncPeersJob", ctx, json)
|
||||
ret0, _ := ret[0].(*models.Job)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// CreateSyncPeersJob indicates an expected call of CreateSyncPeersJob.
|
||||
func (mr *MockServiceMockRecorder) CreateSyncPeersJob(ctx, json any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSyncPeersJob", reflect.TypeOf((*MockService)(nil).CreateSyncPeersJob), ctx, json)
|
||||
}
|
||||
|
||||
// CreateV1Preheat mocks base method.
|
||||
func (m *MockService) CreateV1Preheat(arg0 context.Context, arg1 types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
|
|
|||
|
|
@ -115,6 +115,7 @@ type Service interface {
|
|||
GetConfigs(context.Context, types.GetConfigsQuery) ([]models.Config, int64, error)
|
||||
|
||||
CreatePreheatJob(context.Context, types.CreatePreheatJobRequest) (*models.Job, error)
|
||||
CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error)
|
||||
CreateDeleteTaskJob(context.Context, types.CreateDeleteTaskJobRequest) (*models.Job, error)
|
||||
CreateGetTaskJob(context.Context, types.CreateGetTaskJobRequest) (*models.Job, error)
|
||||
DestroyJob(context.Context, uint) error
|
||||
|
|
|
|||
|
|
@ -103,6 +103,14 @@ type PreheatArgs struct {
|
|||
Timeout time.Duration `json:"timeout" binding:"omitempty"`
|
||||
}
|
||||
|
||||
type CreateSyncPeersJobRequest struct {
|
||||
BIO string `json:"bio" binding:"omitempty"`
|
||||
Type string `json:"type" binding:"required"`
|
||||
UserID uint `json:"user_id" binding:"omitempty"`
|
||||
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
|
||||
Timeout time.Duration `json:"timeout" binding:"omitempty"`
|
||||
}
|
||||
|
||||
type CreateGetTaskJobRequest struct {
|
||||
BIO string `json:"bio" binding:"omitempty"`
|
||||
Type string `json:"type" binding:"required"`
|
||||
|
|
|
|||
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright 2023 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package slice
|
||||
|
||||
func KeyBy[K comparable, V any](collection []V, iteratee func(item V) K) map[K]V {
|
||||
result := make(map[K]V, len(collection))
|
||||
|
||||
for i := range collection {
|
||||
k := iteratee(collection[i])
|
||||
result[k] = collection[i]
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func Values[K comparable, V any](in ...map[K]V) []V {
|
||||
size := 0
|
||||
for i := range in {
|
||||
size += len(in[i])
|
||||
}
|
||||
result := make([]V, 0, size)
|
||||
|
||||
for i := range in {
|
||||
for k := range in[i] {
|
||||
result = append(result, in[i][k])
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Copyright 2023 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package slice
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestKeyBy tests the KeyBy function
|
||||
func TestKeyBy(t *testing.T) {
|
||||
type Person struct {
|
||||
ID int
|
||||
Name string
|
||||
}
|
||||
|
||||
people := []Person{
|
||||
{ID: 1, Name: "Alice"},
|
||||
{ID: 2, Name: "Bob"},
|
||||
{ID: 3, Name: "Charlie"},
|
||||
}
|
||||
|
||||
// Test case 1: Key by ID
|
||||
keyByID := KeyBy(people, func(p Person) int {
|
||||
return p.ID
|
||||
})
|
||||
expectedKeyByID := map[int]Person{
|
||||
1: {ID: 1, Name: "Alice"},
|
||||
2: {ID: 2, Name: "Bob"},
|
||||
3: {ID: 3, Name: "Charlie"},
|
||||
}
|
||||
if !reflect.DeepEqual(keyByID, expectedKeyByID) {
|
||||
t.Errorf("KeyBy by ID failed, expected %v, got %v", expectedKeyByID, keyByID)
|
||||
}
|
||||
|
||||
// Test case 2: Key by Name
|
||||
keyByName := KeyBy(people, func(p Person) string {
|
||||
return p.Name
|
||||
})
|
||||
expectedKeyByName := map[string]Person{
|
||||
"Alice": {ID: 1, Name: "Alice"},
|
||||
"Bob": {ID: 2, Name: "Bob"},
|
||||
"Charlie": {ID: 3, Name: "Charlie"},
|
||||
}
|
||||
if !reflect.DeepEqual(keyByName, expectedKeyByName) {
|
||||
t.Errorf("KeyBy by Name failed, expected %v, got %v", expectedKeyByName, keyByName)
|
||||
}
|
||||
}
|
||||
|
||||
// TestValues tests the Values function
|
||||
func TestValues(t *testing.T) {
|
||||
map1 := map[int]string{
|
||||
1: "one",
|
||||
2: "two",
|
||||
}
|
||||
map2 := map[int]string{
|
||||
3: "three",
|
||||
4: "four",
|
||||
}
|
||||
|
||||
// Test case 1: Values from one map
|
||||
values1 := Values(map1)
|
||||
expectedValues1 := []string{"one", "two"}
|
||||
|
||||
sort.Strings(values1)
|
||||
sort.Strings(expectedValues1)
|
||||
if !reflect.DeepEqual(values1, expectedValues1) {
|
||||
t.Errorf("Values from one map failed, expected %v, got %v", expectedValues1, values1)
|
||||
}
|
||||
|
||||
// Test case 2: Values from multiple maps
|
||||
values2 := Values(map1, map2)
|
||||
expectedValues2 := []string{"one", "two", "three", "four"}
|
||||
|
||||
sort.Strings(values2)
|
||||
sort.Strings(expectedValues2)
|
||||
if !reflect.DeepEqual(values2, expectedValues2) {
|
||||
t.Errorf("Values from multiple maps failed, expected %v, got %v", expectedValues2, values2)
|
||||
}
|
||||
|
||||
// Test case 3: Values from empty maps
|
||||
values3 := Values(map[int]string{}, map[int]string{})
|
||||
expectedValues3 := []string{}
|
||||
if !reflect.DeepEqual(values3, expectedValues3) {
|
||||
t.Errorf("Values from empty maps failed, expected %v, got %v", expectedValues3, values3)
|
||||
}
|
||||
}
|
||||
|
|
@ -387,7 +387,7 @@ func NewHost(
|
|||
|
||||
h := &Host{
|
||||
ID: id,
|
||||
Type: types.HostType(typ),
|
||||
Type: typ,
|
||||
IP: ip,
|
||||
Hostname: hostname,
|
||||
Port: port,
|
||||
|
|
|
|||
Loading…
Reference in New Issue