From 8153e57ed12c27f4ed98c806c99abe8ff790a35b Mon Sep 17 00:00:00 2001 From: Chongzhi Deng <57183262+BruceAko@users.noreply.github.com> Date: Thu, 4 Jul 2024 13:46:35 +0800 Subject: [PATCH] feat: scheduler detects peer survivability and clears offline peers' metadata (#3353) feat: scheduler detects peer survivability and clears offline peers 1. The Host GC method will remove hosts that did not announce in time and their peers' metadata. 2. The scheduler Host GC interval is lowered in order to detect non-surviving hosts in time. 3. Add unit tests. Signed-off-by: BruceAko --- go.mod | 2 +- go.sum | 4 +-- scheduler/config/constants.go | 2 +- scheduler/resource/host.go | 10 ++++++ scheduler/resource/host_manager.go | 14 ++++++++- scheduler/resource/host_manager_test.go | 24 ++++++++++++++ scheduler/resource/host_test.go | 42 +++++++++++++++++++++++++ scheduler/service/service_v1_test.go | 2 ++ scheduler/service/service_v2.go | 8 +++++ scheduler/service/service_v2_test.go | 9 ++++++ 10 files changed, 112 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index af31c075f..9085951ad 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.21 require ( - d7y.io/api/v2 v2.0.123 + d7y.io/api/v2 v2.0.126 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.8 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index 60eb0adc1..8b703a904 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= -d7y.io/api/v2 v2.0.123 h1:GGJy9DIaVYDHpS5PfiW2/Ad93jS4237uhmZx/lH9Zhc= -d7y.io/api/v2 v2.0.123/go.mod h1:5n5c+0oceb9/Ih4xL6UNRwQEZhBztiHMf4ghb+wGx4U= +d7y.io/api/v2 v2.0.126 h1:mlVZHBJwOQL9PZcnsVN9Etcru+rr2nbDXBuDC8v0PhY= +d7y.io/api/v2 v2.0.126/go.mod h1:5n5c+0oceb9/Ih4xL6UNRwQEZhBztiHMf4ghb+wGx4U= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/scheduler/config/constants.go b/scheduler/config/constants.go index 373a45fb0..e3a77685e 100644 --- a/scheduler/config/constants.go +++ b/scheduler/config/constants.go @@ -85,7 +85,7 @@ const ( DefaultSchedulerTaskGCInterval = 30 * time.Minute // DefaultSchedulerHostGCInterval is default interval for host gc. - DefaultSchedulerHostGCInterval = 6 * time.Hour + DefaultSchedulerHostGCInterval = 5 * time.Minute // DefaultSchedulerHostTTL is default ttl for host. DefaultSchedulerHostTTL = 1 * time.Hour diff --git a/scheduler/resource/host.go b/scheduler/resource/host.go index b0e35bba3..36f3ae27d 100644 --- a/scheduler/resource/host.go +++ b/scheduler/resource/host.go @@ -122,6 +122,13 @@ func WithBuild(build Build) HostOption { } } +// WithAnnounceInterval sets host's announce interval. +func WithAnnounceInterval(announceInterval time.Duration) HostOption { + return func(h *Host) { + h.AnnounceInterval = announceInterval + } +} + // Host contains content for host. type Host struct { // ID is host id. @@ -178,6 +185,9 @@ type Host struct { // SchedulerClusterID is the scheduler cluster id matched by scopes. SchedulerClusterID uint64 + // AnnounceInterval is the interval between host announces to scheduler. + AnnounceInterval time.Duration + // ConcurrentUploadLimit is concurrent upload limit count. ConcurrentUploadLimit *atomic.Int32 diff --git a/scheduler/resource/host_manager.go b/scheduler/resource/host_manager.go index 344240c90..6bd477112 100644 --- a/scheduler/resource/host_manager.go +++ b/scheduler/resource/host_manager.go @@ -20,6 +20,7 @@ package resource import ( "sync" + "time" "d7y.io/dragonfly/v2/pkg/container/set" pkggc "d7y.io/dragonfly/v2/pkg/gc" @@ -142,7 +143,7 @@ func (h *hostManager) LoadRandomHosts(n int, blocklist set.SafeSet[string]) []*H return hosts } -// Try to reclaim host. +// RunGC tries to reclaim host. func (h *hostManager) RunGC() error { h.Map.Range(func(_, value any) bool { host, ok := value.(*Host) @@ -151,11 +152,22 @@ func (h *hostManager) RunGC() error { return true } + // If the host's elapsed exceeds twice the announcing interval, + // then leave peers in host. + elapsed := time.Since(host.UpdatedAt.Load()) + if host.AnnounceInterval > 0 && elapsed > host.AnnounceInterval*2 { + host.Log.Info("host elapsed exceeds twice the announce interval, causing the host to leave peers") + host.LeavePeers() + return true + } + + // Reclaim the host. if host.PeerCount.Load() == 0 && host.ConcurrentUploadCount.Load() == 0 && host.Type == types.HostTypeNormal { host.Log.Info("host has been reclaimed") h.Delete(host.ID) + return true } return true diff --git a/scheduler/resource/host_manager_test.go b/scheduler/resource/host_manager_test.go index 976657205..4a245b9cb 100644 --- a/scheduler/resource/host_manager_test.go +++ b/scheduler/resource/host_manager_test.go @@ -503,6 +503,30 @@ func TestHostManager_RunGC(t *testing.T) { assert.Equal(host.ID, mockSeedHost.ID) }, }, + { + name: "host elapsed exceeds twice the announce interval", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, + expect: func(t *testing.T, hostManager HostManager, mockHost *Host, mockPeer *Peer) { + assert := assert.New(t) + mockHost.AnnounceInterval = 1 * time.Microsecond + hostManager.Store(mockHost) + mockHost.StorePeer(mockPeer) + err := hostManager.RunGC() + assert.NoError(err) + + mockHost.Peers.Range(func(_, value any) bool { + peer := value.(*Peer) + assert.True(peer.FSM.Is(PeerStateLeave)) + return true + }) + + host, loaded := hostManager.Load(mockHost.ID) + assert.Equal(loaded, true) + assert.Equal(host.ID, mockHost.ID) + }, + }, } for _, tc := range tests { diff --git a/scheduler/resource/host_test.go b/scheduler/resource/host_test.go index 0f2923d4d..be73e4ed1 100644 --- a/scheduler/resource/host_test.go +++ b/scheduler/resource/host_test.go @@ -126,6 +126,8 @@ var ( Platform: "darwin", } + mockAnnounceInterval = 5 * time.Minute + mockHostID = idgen.HostIDV2("127.0.0.1", "foo") mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar") mockHostLocation = "baz" @@ -152,6 +154,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -176,6 +179,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.DownloadPort, mockRawSeedHost.DownloadPort) assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultSeedPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -225,6 +229,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.ObjectStoragePort, int32(1)) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -250,6 +255,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -276,6 +282,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.OS, "linux") assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -302,6 +309,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.Platform, "ubuntu") assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -328,6 +336,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.PlatformFamily, "debian") assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -353,6 +362,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.PlatformVersion, "22.04") + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -379,6 +389,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.KernelVersion, "5.15.0-27-generic") assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -405,6 +416,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.CPU, mockCPU) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -431,6 +443,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.Memory, mockMemory) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -457,6 +470,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.Network, mockNetwork) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -483,6 +497,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.Disk, mockDisk) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -509,6 +524,33 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.Build, mockBuild) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, + { + name: "new host and set announce interval", + rawHost: mockRawHost, + options: []HostOption{WithAnnounceInterval(mockAnnounceInterval)}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) + assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, 5*time.Minute) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 8ce1c58dd..1f792cf02 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -183,6 +183,8 @@ var ( Platform: "darwin", } + mockInterval = durationpb.New(5 * time.Minute).AsDuration() + mockPeerHost = &schedulerv1.PeerHost{ Id: mockHostID, Ip: "127.0.0.1", diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 1791a1959..170386737 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -584,6 +584,10 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ options = append(options, resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID))) } + if req.GetInterval() != nil { + options = append(options, resource.WithAnnounceInterval(req.GetInterval().AsDuration())) + } + host = resource.NewHost( req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(), req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()), @@ -673,6 +677,10 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ } } + if req.GetInterval() != nil { + host.AnnounceInterval = req.GetInterval().AsDuration() + } + return nil } diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 74902b365..ce8546681 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -28,6 +28,7 @@ import ( "strconv" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" @@ -488,6 +489,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { Platform: &mockBuild.Platform, }, }, + Interval: durationpb.New(5 * time.Minute), }, run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( @@ -512,6 +514,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Build, mockBuild) + assert.EqualValues(host.AnnounceInterval, mockInterval) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -592,6 +595,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { Platform: &mockBuild.Platform, }, }, + Interval: durationpb.New(5 * time.Minute), }, run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( @@ -616,6 +620,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Build, mockBuild) + assert.EqualValues(host.AnnounceInterval, mockInterval) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -696,6 +701,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { Platform: &mockBuild.Platform, }, }, + Interval: durationpb.New(5 * time.Minute), }, run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( @@ -721,6 +727,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Build, mockBuild) + assert.EqualValues(host.AnnounceInterval, mockInterval) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -796,6 +803,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { Platform: &mockBuild.Platform, }, }, + Interval: durationpb.New(5 * time.Minute), }, run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( @@ -821,6 +829,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Build, mockBuild) + assert.EqualValues(host.AnnounceInterval, mockInterval) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0))