feat: scheduler detects peer survivability and clears offline peers' metadata (#3353)

feat: scheduler detects peer survivability and clears offline peers

1. The Host GC method will remove hosts that did not announce in time and their peers' metadata.
2. The scheduler Host GC interval is lowered in order to detect non-surviving hosts in time.
3. Add unit tests.

Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
This commit is contained in:
Chongzhi Deng 2024-07-04 13:46:35 +08:00 committed by GitHub
parent 43ef996ed8
commit 8153e57ed1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 112 additions and 5 deletions

2
go.mod
View File

@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21 go 1.21
require ( require (
d7y.io/api/v2 v2.0.123 d7y.io/api/v2 v2.0.126
github.com/MysteriousPotato/go-lockable v1.0.0 github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8 github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0 github.com/Showmax/go-fqdn v1.0.0

4
go.sum
View File

@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
d7y.io/api/v2 v2.0.123 h1:GGJy9DIaVYDHpS5PfiW2/Ad93jS4237uhmZx/lH9Zhc= d7y.io/api/v2 v2.0.126 h1:mlVZHBJwOQL9PZcnsVN9Etcru+rr2nbDXBuDC8v0PhY=
d7y.io/api/v2 v2.0.123/go.mod h1:5n5c+0oceb9/Ih4xL6UNRwQEZhBztiHMf4ghb+wGx4U= d7y.io/api/v2 v2.0.126/go.mod h1:5n5c+0oceb9/Ih4xL6UNRwQEZhBztiHMf4ghb+wGx4U=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=

View File

@ -85,7 +85,7 @@ const (
DefaultSchedulerTaskGCInterval = 30 * time.Minute DefaultSchedulerTaskGCInterval = 30 * time.Minute
// DefaultSchedulerHostGCInterval is default interval for host gc. // DefaultSchedulerHostGCInterval is default interval for host gc.
DefaultSchedulerHostGCInterval = 6 * time.Hour DefaultSchedulerHostGCInterval = 5 * time.Minute
// DefaultSchedulerHostTTL is default ttl for host. // DefaultSchedulerHostTTL is default ttl for host.
DefaultSchedulerHostTTL = 1 * time.Hour DefaultSchedulerHostTTL = 1 * time.Hour

View File

@ -122,6 +122,13 @@ func WithBuild(build Build) HostOption {
} }
} }
// WithAnnounceInterval sets host's announce interval.
func WithAnnounceInterval(announceInterval time.Duration) HostOption {
return func(h *Host) {
h.AnnounceInterval = announceInterval
}
}
// Host contains content for host. // Host contains content for host.
type Host struct { type Host struct {
// ID is host id. // ID is host id.
@ -178,6 +185,9 @@ type Host struct {
// SchedulerClusterID is the scheduler cluster id matched by scopes. // SchedulerClusterID is the scheduler cluster id matched by scopes.
SchedulerClusterID uint64 SchedulerClusterID uint64
// AnnounceInterval is the interval between host announces to scheduler.
AnnounceInterval time.Duration
// ConcurrentUploadLimit is concurrent upload limit count. // ConcurrentUploadLimit is concurrent upload limit count.
ConcurrentUploadLimit *atomic.Int32 ConcurrentUploadLimit *atomic.Int32

View File

@ -20,6 +20,7 @@ package resource
import ( import (
"sync" "sync"
"time"
"d7y.io/dragonfly/v2/pkg/container/set" "d7y.io/dragonfly/v2/pkg/container/set"
pkggc "d7y.io/dragonfly/v2/pkg/gc" pkggc "d7y.io/dragonfly/v2/pkg/gc"
@ -142,7 +143,7 @@ func (h *hostManager) LoadRandomHosts(n int, blocklist set.SafeSet[string]) []*H
return hosts return hosts
} }
// Try to reclaim host. // RunGC tries to reclaim host.
func (h *hostManager) RunGC() error { func (h *hostManager) RunGC() error {
h.Map.Range(func(_, value any) bool { h.Map.Range(func(_, value any) bool {
host, ok := value.(*Host) host, ok := value.(*Host)
@ -151,11 +152,22 @@ func (h *hostManager) RunGC() error {
return true return true
} }
// If the host's elapsed exceeds twice the announcing interval,
// then leave peers in host.
elapsed := time.Since(host.UpdatedAt.Load())
if host.AnnounceInterval > 0 && elapsed > host.AnnounceInterval*2 {
host.Log.Info("host elapsed exceeds twice the announce interval, causing the host to leave peers")
host.LeavePeers()
return true
}
// Reclaim the host.
if host.PeerCount.Load() == 0 && if host.PeerCount.Load() == 0 &&
host.ConcurrentUploadCount.Load() == 0 && host.ConcurrentUploadCount.Load() == 0 &&
host.Type == types.HostTypeNormal { host.Type == types.HostTypeNormal {
host.Log.Info("host has been reclaimed") host.Log.Info("host has been reclaimed")
h.Delete(host.ID) h.Delete(host.ID)
return true
} }
return true return true

View File

@ -503,6 +503,30 @@ func TestHostManager_RunGC(t *testing.T) {
assert.Equal(host.ID, mockSeedHost.ID) assert.Equal(host.ID, mockSeedHost.ID)
}, },
}, },
{
name: "host elapsed exceeds twice the announce interval",
mock: func(m *gc.MockGCMockRecorder) {
m.Add(gomock.Any()).Return(nil).Times(1)
},
expect: func(t *testing.T, hostManager HostManager, mockHost *Host, mockPeer *Peer) {
assert := assert.New(t)
mockHost.AnnounceInterval = 1 * time.Microsecond
hostManager.Store(mockHost)
mockHost.StorePeer(mockPeer)
err := hostManager.RunGC()
assert.NoError(err)
mockHost.Peers.Range(func(_, value any) bool {
peer := value.(*Peer)
assert.True(peer.FSM.Is(PeerStateLeave))
return true
})
host, loaded := hostManager.Load(mockHost.ID)
assert.Equal(loaded, true)
assert.Equal(host.ID, mockHost.ID)
},
},
} }
for _, tc := range tests { for _, tc := range tests {

View File

@ -126,6 +126,8 @@ var (
Platform: "darwin", Platform: "darwin",
} }
mockAnnounceInterval = 5 * time.Minute
mockHostID = idgen.HostIDV2("127.0.0.1", "foo") mockHostID = idgen.HostIDV2("127.0.0.1", "foo")
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar") mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar")
mockHostLocation = "baz" mockHostLocation = "baz"
@ -152,6 +154,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -176,6 +179,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.DownloadPort, mockRawSeedHost.DownloadPort) assert.Equal(host.DownloadPort, mockRawSeedHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultSeedPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultSeedPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -225,6 +229,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(1)) assert.Equal(host.ObjectStoragePort, int32(1))
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -250,6 +255,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -276,6 +282,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.OS, "linux") assert.Equal(host.OS, "linux")
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -302,6 +309,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.Platform, "ubuntu") assert.Equal(host.Platform, "ubuntu")
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -328,6 +336,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.PlatformFamily, "debian") assert.Equal(host.PlatformFamily, "debian")
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -353,6 +362,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.PlatformVersion, "22.04") assert.Equal(host.PlatformVersion, "22.04")
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -379,6 +389,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.KernelVersion, "5.15.0-27-generic") assert.Equal(host.KernelVersion, "5.15.0-27-generic")
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -405,6 +416,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.CPU, mockCPU) assert.EqualValues(host.CPU, mockCPU)
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -431,6 +443,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.Memory, mockMemory) assert.EqualValues(host.Memory, mockMemory)
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -457,6 +470,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Network, mockNetwork)
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -483,6 +497,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Disk, mockDisk)
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -509,6 +524,33 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.Build, mockBuild) assert.EqualValues(host.Build, mockBuild)
assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set announce interval",
rawHost: mockRawHost,
options: []HostOption{WithAnnounceInterval(mockAnnounceInterval)},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, 5*time.Minute)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))

View File

@ -183,6 +183,8 @@ var (
Platform: "darwin", Platform: "darwin",
} }
mockInterval = durationpb.New(5 * time.Minute).AsDuration()
mockPeerHost = &schedulerv1.PeerHost{ mockPeerHost = &schedulerv1.PeerHost{
Id: mockHostID, Id: mockHostID,
Ip: "127.0.0.1", Ip: "127.0.0.1",

View File

@ -584,6 +584,10 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
options = append(options, resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID))) options = append(options, resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID)))
} }
if req.GetInterval() != nil {
options = append(options, resource.WithAnnounceInterval(req.GetInterval().AsDuration()))
}
host = resource.NewHost( host = resource.NewHost(
req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(), req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(),
req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()), req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()),
@ -673,6 +677,10 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
} }
} }
if req.GetInterval() != nil {
host.AnnounceInterval = req.GetInterval().AsDuration()
}
return nil return nil
} }

View File

@ -28,6 +28,7 @@ import (
"strconv" "strconv"
"sync" "sync"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock" "go.uber.org/mock/gomock"
@ -488,6 +489,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Platform: &mockBuild.Platform, Platform: &mockBuild.Platform,
}, },
}, },
Interval: durationpb.New(5 * time.Minute),
}, },
run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
gomock.InOrder( gomock.InOrder(
@ -512,6 +514,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Network, mockNetwork)
assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Disk, mockDisk)
assert.EqualValues(host.Build, mockBuild) assert.EqualValues(host.Build, mockBuild)
assert.EqualValues(host.AnnounceInterval, mockInterval)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -592,6 +595,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Platform: &mockBuild.Platform, Platform: &mockBuild.Platform,
}, },
}, },
Interval: durationpb.New(5 * time.Minute),
}, },
run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
gomock.InOrder( gomock.InOrder(
@ -616,6 +620,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Network, mockNetwork)
assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Disk, mockDisk)
assert.EqualValues(host.Build, mockBuild) assert.EqualValues(host.Build, mockBuild)
assert.EqualValues(host.AnnounceInterval, mockInterval)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -696,6 +701,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Platform: &mockBuild.Platform, Platform: &mockBuild.Platform,
}, },
}, },
Interval: durationpb.New(5 * time.Minute),
}, },
run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
gomock.InOrder( gomock.InOrder(
@ -721,6 +727,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Network, mockNetwork)
assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Disk, mockDisk)
assert.EqualValues(host.Build, mockBuild) assert.EqualValues(host.Build, mockBuild)
assert.EqualValues(host.AnnounceInterval, mockInterval)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))
@ -796,6 +803,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Platform: &mockBuild.Platform, Platform: &mockBuild.Platform,
}, },
}, },
Interval: durationpb.New(5 * time.Minute),
}, },
run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
gomock.InOrder( gomock.InOrder(
@ -821,6 +829,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Network, mockNetwork)
assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Disk, mockDisk)
assert.EqualValues(host.Build, mockBuild) assert.EqualValues(host.Build, mockBuild)
assert.EqualValues(host.AnnounceInterval, mockInterval)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0)) assert.Equal(host.UploadCount.Load(), int64(0))