diff --git a/scheduler/supervisor/cdn.go b/scheduler/supervisor/cdn.go index 4c7658d3e..5a83366e8 100644 --- a/scheduler/supervisor/cdn.go +++ b/scheduler/supervisor/cdn.go @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +//go:generate mockgen -destination ./mocks/cdn_mock.go -package mocks d7y.io/dragonfly/v2/scheduler/supervisor CDNDynmaicClient package supervisor diff --git a/scheduler/supervisor/cdn_test.go b/scheduler/supervisor/cdn_test.go new file mode 100644 index 000000000..7d9a72b2c --- /dev/null +++ b/scheduler/supervisor/cdn_test.go @@ -0,0 +1,522 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package supervisor_test + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "reflect" + "testing" + + "d7y.io/dragonfly/v2/internal/dfcodes" + "d7y.io/dragonfly/v2/internal/dferrors" + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" + "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client" + "d7y.io/dragonfly/v2/scheduler/supervisor" + "d7y.io/dragonfly/v2/scheduler/supervisor/mocks" + "github.com/agiledragon/gomonkey" + "github.com/golang/mock/gomock" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +var ( + mockPieceSeedStream = &client.PieceSeedStream{} + mockPieceSeed = &cdnsystem.PieceSeed{} + mockHost = &supervisor.Host{} + mockTask = &supervisor.Task{} + mockPeer = &supervisor.Peer{} + mockLogger = &logger.SugaredLoggerOnWith{} +) + +func TestCDN_Nil(t *testing.T) { + tests := []struct { + name string + status supervisor.TaskStatus + mock func(t *testing.T) (supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) + expect func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) + }{ + { + name: "nil client", + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockTask.ID = "mocktask" + + patch := &gomonkey.Patches{} + return mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.NotNil(cdn) + assert.Nil(cdn.GetClient()) + assert.Nil(peer) + assert.Equal(supervisor.ErrCDNClientUninitialized, err) + + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mockPeerManager, mockHostManager, patch := tc.mock(t) + cdn := supervisor.NewCDN(nil, mockPeerManager, mockHostManager) + mockTask.SetStatus(tc.status) + peer, err := cdn.StartSeedTask(context.Background(), mockTask) + tc.expect(t, cdn, peer, err) + patch.Reset() + }) + } +} + +func TestCDN_Initial(t *testing.T) { + tests := []struct { + name string + task *supervisor.Task + status supervisor.TaskStatus + mock func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) + expect func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) + }{ + { + name: "ObtainSeeds cause CdnTaskRegistryFail", + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + err := dferrors.New(dfcodes.CdnTaskRegistryFail, "mockError") + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(nil, err).AnyTimes() + + patch := &gomonkey.Patches{} + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Nil(peer) + assert.Error(supervisor.ErrCDNRegisterFail, errors.Cause(err)) + }, + }, + { + name: "ObtainSeeds cause CdnTaskDownloadFail", + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + err := dferrors.New(dfcodes.CdnTaskDownloadFail, "mockError") + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(nil, err).AnyTimes() + + patch := &gomonkey.Patches{} + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Nil(peer) + assert.Error(supervisor.ErrCDNDownloadFail, errors.Cause(err)) + }, + }, + { + name: "ObtainSeeds cause other errors", + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + err := dferrors.New(114514, "mockError") + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(nil, err).AnyTimes() + + patch := &gomonkey.Patches{} + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Nil(peer) + assert.Error(supervisor.ErrCDNUnknown, errors.Cause(err)) + }, + }, + { + name: "ObtainSeeds cause invoke client failed", + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + err := fmt.Errorf("invoke error") + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(nil, err).AnyTimes() + + patch := &gomonkey.Patches{} + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Nil(peer) + assert.Equal(supervisor.ErrCDNInvokeFail, errors.Cause(err)) + }, + }, + { + name: "failed for EOF and TaskStatusWaiting", + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(mockPieceSeedStream, nil).AnyTimes() + + streamRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{nil, io.EOF}}, + } + patch := gomonkey.ApplyMethodSeq(reflect.TypeOf(mockPieceSeedStream), "Recv", streamRet) + + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Nil(peer) + assert.Error(err) + }, + }, + { + name: "success for EOF and TaskStatusSuccess", + status: supervisor.TaskStatusSuccess, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(mockPieceSeedStream, nil).AnyTimes() + mockPeerManager.EXPECT().Get(gomock.Any()).Return(nil, false).AnyTimes() + mockPeerManager.EXPECT().Add(gomock.Any()).Return().AnyTimes() + mockHostManager.EXPECT().Get(gomock.Any()).Return(mockHost, true).AnyTimes() + + patch := gomonkey.ApplyMethodSeq(reflect.TypeOf(mockLogger), "Debugf", + []gomonkey.OutputCell{{Values: gomonkey.Params{}}}) + + patch.ApplyMethodSeq(reflect.TypeOf(mockTask), "GetOrAddPiece", + []gomonkey.OutputCell{{Values: gomonkey.Params{nil, true}}}) + patch.ApplyMethodSeq(reflect.TypeOf(mockTask), "Log", + []gomonkey.OutputCell{{Values: gomonkey.Params{mockLogger}}}) + + patch.ApplyMethodSeq(reflect.TypeOf(mockPeer), "Touch", + []gomonkey.OutputCell{{Values: gomonkey.Params{}}}) + patch.ApplyMethodSeq(reflect.TypeOf(mockPeer), "UpdateProgress", + []gomonkey.OutputCell{{Values: gomonkey.Params{}}}) + + newPeerRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{mockPeer}}, + } + patch.ApplyFuncSeq(supervisor.NewPeer, newPeerRet) + + streamRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{mockPieceSeed, nil}, Times: 1}, + {Values: gomonkey.Params{nil, io.EOF}, Times: 1}, + } + patch.ApplyMethodSeq(reflect.TypeOf(mockPieceSeedStream), "Recv", streamRet) + + mockPieceSeed.PieceInfo = &base.PieceInfo{PieceNum: 0} + mockPeer.Task = mockTask + mockPeer.ID = "114514" + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Equal(mockPeer, peer) + assert.Nil(err) + }, + }, + { + name: "receivePiece cause CdnTaskRegistryFail", + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(mockPieceSeedStream, nil).AnyTimes() + + err := dferrors.New(dfcodes.CdnTaskRegistryFail, "mockError") + streamRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{nil, err}}, + } + patch := gomonkey.ApplyMethodSeq(reflect.TypeOf(mockPieceSeedStream), "Recv", streamRet) + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Nil(peer) + assert.Error(supervisor.ErrCDNRegisterFail, errors.Cause(err)) + }, + }, + { + name: "receivePiece cause CdnTaskDownloadFail", + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(mockPieceSeedStream, nil).AnyTimes() + + err := dferrors.New(dfcodes.CdnTaskDownloadFail, "mockError") + streamRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{nil, err}}, + } + patch := gomonkey.ApplyMethodSeq(reflect.TypeOf(mockPieceSeedStream), "Recv", streamRet) + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Nil(peer) + assert.Error(supervisor.ErrCDNDownloadFail, errors.Cause(err)) + }, + }, + { + name: "receivePiece cause other errors", + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(mockPieceSeedStream, nil).AnyTimes() + + err := dferrors.New(114514, "mockError") + streamRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{nil, err}}, + } + patch := gomonkey.ApplyMethodSeq(reflect.TypeOf(mockPieceSeedStream), "Recv", streamRet) + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Nil(peer) + assert.Error(supervisor.ErrCDNUnknown, errors.Cause(err)) + }, + }, + { + name: "receivePiece cause invoke client failed", + + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(mockPieceSeedStream, nil).AnyTimes() + + err := fmt.Errorf("invoke error") + streamRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{nil, err}}, + } + patch := gomonkey.ApplyMethodSeq(reflect.TypeOf(mockPieceSeedStream), "Recv", streamRet) + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Nil(peer) + assert.Equal(supervisor.ErrCDNInvokeFail, errors.Cause(err)) + }, + }, + { + name: "initCDNPeer peer is nil", + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(mockPieceSeedStream, nil).AnyTimes() + mockCDNDynmaicClient.EXPECT().GetHost(gomock.Any()).Return(nil, false).AnyTimes() + mockPeerManager.EXPECT().Get(gomock.Any()).Return(nil, false).AnyTimes() + mockHostManager.EXPECT().Get(gomock.Any()).Return(nil, false).AnyTimes() + + streamRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{mockPieceSeed, nil}}, + } + patch := gomonkey.ApplyMethodSeq(reflect.TypeOf(mockPieceSeedStream), "Recv", streamRet) + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Nil(peer) + assert.Equal(supervisor.ErrInitCDNPeerFail, errors.Cause(err)) + }, + }, + { + name: "downloadTinyFile http.Get error (restore host from hostManager)", + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(mockPieceSeedStream, nil).AnyTimes() + mockCDNDynmaicClient.EXPECT().GetHost(gomock.Any()).Return(nil, false).AnyTimes() + mockPeerManager.EXPECT().Get(gomock.Any()).Return(nil, false).AnyTimes() + mockPeerManager.EXPECT().Add(gomock.Any()).Return().AnyTimes() + mockHostManager.EXPECT().Get(gomock.Any()).Return(mockHost, true).AnyTimes() + + patch := gomonkey.ApplyMethodSeq(reflect.TypeOf(mockLogger), "Debugf", + []gomonkey.OutputCell{{Values: gomonkey.Params{}}}) + + patch.ApplyMethodSeq(reflect.TypeOf(mockTask), "Log", + []gomonkey.OutputCell{{Values: gomonkey.Params{mockLogger}}}) + + err := fmt.Errorf("http error") + httpRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{nil, err}}, + } + + patch.ApplyFuncSeq(http.Get, httpRet) + + patch.ApplyMethodSeq(reflect.TypeOf(mockPeer), "Touch", + []gomonkey.OutputCell{{Values: gomonkey.Params{}}}) + + newPeerRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{mockPeer}}, + } + patch.ApplyFuncSeq(supervisor.NewPeer, newPeerRet) + + streamRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{mockPieceSeed, nil}}, + } + patch.ApplyMethodSeq(reflect.TypeOf(mockPieceSeedStream), "Recv", streamRet) + + mockPieceSeed.Done = true + mockHost.IP = "0.0.0.0" + mockHost.DownloadPort = 1919 + mockTask.ID = "1919810" + mockPeer.Host = mockHost + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Equal(mockPeer, peer) + assert.Nil(err) + }, + }, + { + name: "downloadTinyFile success (restore host from client)", + status: supervisor.TaskStatusWaiting, + mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) + mockPeerManager := mocks.NewMockPeerManager(ctl) + mockHostManager := mocks.NewMockHostManager(ctl) + mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(mockPieceSeedStream, nil).AnyTimes() + mockCDNDynmaicClient.EXPECT().GetHost(gomock.Any()).Return(mockHost, true).AnyTimes() + mockPeerManager.EXPECT().Get(gomock.Any()).Return(nil, false).AnyTimes() + mockPeerManager.EXPECT().Add(gomock.Any()).Return().AnyTimes() + mockHostManager.EXPECT().Get(gomock.Any()).Return(nil, false).AnyTimes() + mockHostManager.EXPECT().Add(gomock.Any()).Return().AnyTimes() + + patch := gomonkey.ApplyMethodSeq(reflect.TypeOf(mockLogger), "Debugf", + []gomonkey.OutputCell{{Values: gomonkey.Params{}}}) + + patch.ApplyMethodSeq(reflect.TypeOf(mockTask), "Log", + []gomonkey.OutputCell{{Values: gomonkey.Params{mockLogger}}}) + + const testwords string = "dragonfly-scheduler-test" + res := &http.Response{ + Body: ioutil.NopCloser( + bytes.NewBuffer([]byte(testwords))), + } + httpRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{res, nil}}, + } + patch.ApplyFuncSeq(http.Get, httpRet) + + patch.ApplyMethodSeq(reflect.TypeOf(mockPeer), "Touch", + []gomonkey.OutputCell{{Values: gomonkey.Params{}}}) + + newPeerRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{mockPeer}}, + } + patch.ApplyFuncSeq(supervisor.NewPeer, newPeerRet) + + streamRet := []gomonkey.OutputCell{ + {Values: gomonkey.Params{mockPieceSeed, nil}}, + } + patch.ApplyMethodSeq(reflect.TypeOf(mockPieceSeedStream), "Recv", streamRet) + + mockPieceSeed.Done = true + mockPieceSeed.ContentLength = int64(len(testwords)) + mockHost.IP = "0.0.0.0" + mockHost.DownloadPort = 1919 + mockTask.ID = "1919810" + mockPeer.Host = mockHost + return mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch + }, + expect: func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) { + assert := assert.New(t) + assert.Equal(mockPeer, peer) + assert.Nil(err) + assert.Equal([]byte("dragonfly-scheduler-test"), mockTask.DirectPiece) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mockCDNDynmaicClient, mockPeerManager, mockHostManager, patch := tc.mock(t) + cdn := supervisor.NewCDN(mockCDNDynmaicClient, mockPeerManager, mockHostManager) + mockTask.SetStatus(tc.status) + peer, err := cdn.StartSeedTask(context.Background(), mockTask) + tc.expect(t, cdn, peer, err) + patch.Reset() + }) + } +} diff --git a/scheduler/supervisor/host.go b/scheduler/supervisor/host.go index 054b52520..83a607de2 100644 --- a/scheduler/supervisor/host.go +++ b/scheduler/supervisor/host.go @@ -14,6 +14,8 @@ * limitations under the License. */ +//go:generate mockgen -destination ./mocks/host_mock.go -package mocks d7y.io/dragonfly/v2/scheduler/supervisor HostManager + package supervisor import ( diff --git a/scheduler/supervisor/host_test.go b/scheduler/supervisor/host_test.go new file mode 100644 index 000000000..720cd7b66 --- /dev/null +++ b/scheduler/supervisor/host_test.go @@ -0,0 +1,221 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package supervisor_test + +import ( + "strconv" + "testing" + + "d7y.io/dragonfly/v2/scheduler/supervisor" + "github.com/stretchr/testify/assert" +) + +func TestHost_New(t *testing.T) { + tests := []struct { + name string + host *supervisor.Host + expect func(t *testing.T, host *supervisor.Host) + }{ + { + name: "create by normal config", + host: supervisor.NewClientHost("main", "127.0.0.1", "Client", 8080, 8081, "", "", ""), + expect: func(t *testing.T, host *supervisor.Host) { + assert := assert.New(t) + assert.Equal("main", host.UUID) + }, + }, + { + name: "create CDN by normal config", + host: supervisor.NewCDNHost("main", "127.0.0.1", "Client", 8080, 8081, "", "", ""), + expect: func(t *testing.T, host *supervisor.Host) { + assert := assert.New(t) + assert.Equal("main", host.UUID) + }, + }, + { + name: "create by special symbols", + host: supervisor.NewClientHost("⁂⁎♜♝♞⁑(๑ `▽´๑)", "127.0.0.1", "Client", 8080, 8081, "", "", ""), + expect: func(t *testing.T, host *supervisor.Host) { + assert := assert.New(t) + assert.Equal("⁂⁎♜♝♞⁑(๑ `▽´๑)", host.UUID) + }, + }, + { + name: "create by error address", + host: supervisor.NewClientHost("host", "0.0.0.0", "Client", 8080, 8080, "", "", ""), + expect: func(t *testing.T, host *supervisor.Host) { + assert := assert.New(t) + assert.Equal("host", host.UUID) + }, + }, + { + name: "create with geography information", + host: supervisor.NewClientHost("host", "127.0.0.1", "Client", 8080, 8081, "goagle", "microsaft", "facebaok"), + expect: func(t *testing.T, host *supervisor.Host) { + assert := assert.New(t) + assert.Equal("host", host.UUID) + assert.Equal("goagle", host.SecurityDomain) + assert.Equal("microsaft", host.Location) + assert.Equal("facebaok", host.IDC) + }, + }, + { + name: "create by error address", + host: supervisor.NewClientHost("host", "-1.257.w.-0", "Client", -100, 29000, "", "", ""), + expect: func(t *testing.T, host *supervisor.Host) { + assert := assert.New(t) + assert.Equal("host", host.UUID) + }, + }, + { + name: "create by normal config", + host: supervisor.NewClientHost("host", "127.0.0.1", "Client", 8080, 8081, "", "", ""), + expect: func(t *testing.T, host *supervisor.Host) { + assert := assert.New(t) + assert.Equal("host", host.UUID) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tc.expect(t, tc.host) + }) + } +} + +func TestHostManager_New(t *testing.T) { + tests := []struct { + name string + expect func(t *testing.T, hostManager supervisor.HostManager) + }{ + { + name: "simple create", + expect: func(t *testing.T, hostManager supervisor.HostManager) { + assert := assert.New(t) + assert.NotNil(hostManager) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + hostManager := supervisor.NewHostManager() + tc.expect(t, hostManager) + }) + } +} + +func TestHostManager_Get(t *testing.T) { + tests := []struct { + name string + number int + fetch int + expect func(t *testing.T, host *supervisor.Host, success bool) + }{ + { + name: "fetch first host", + number: 3, + fetch: 0, + expect: func(t *testing.T, host *supervisor.Host, success bool) { + assert := assert.New(t) + assert.Equal("0", host.UUID) + assert.True(success) + }, + }, + { + name: "fetch last host", + number: 3, + fetch: 2, + expect: func(t *testing.T, host *supervisor.Host, success bool) { + assert := assert.New(t) + assert.Equal("2", host.UUID) + assert.True(success) + }, + }, + { + name: "fetch not exist host", + number: 3, + fetch: -1, + expect: func(t *testing.T, host *supervisor.Host, success bool) { + assert := assert.New(t) + assert.Equal((*supervisor.Host)(nil), host) + assert.False(success) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + hostManager := supervisor.NewHostManager() + for i := 0; i < tc.number; i++ { + index := strconv.Itoa(i) + host := mockAHost(index) + hostManager.Add(host) + } + host, success := hostManager.Get(strconv.Itoa(tc.fetch)) + tc.expect(t, host, success) + }) + } +} + +func TestHostManager_Delete(t *testing.T) { + tests := []struct { + name string + number int + delete int + expect func(t *testing.T, host *supervisor.Host, success bool) + }{ + { + name: "delete exist host", + number: 1, + delete: 0, + expect: func(t *testing.T, host *supervisor.Host, success bool) { + assert := assert.New(t) + assert.Nil(host) + assert.False(success) + }, + }, + { + name: "delete not exist host", + number: 1, + delete: 100, + expect: func(t *testing.T, host *supervisor.Host, success bool) { + assert := assert.New(t) + assert.Nil(host) + assert.False(success) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + hostManager := supervisor.NewHostManager() + for i := 0; i < tc.number; i++ { + index := strconv.Itoa(i) + host := mockAHost(index) + hostManager.Add(host) + } + hostManager.Delete(strconv.Itoa(tc.delete)) + host, success := hostManager.Get(strconv.Itoa(tc.delete)) + + tc.expect(t, host, success) + }) + } +} + +func mockAHost(UUID string) *supervisor.Host { + host := supervisor.NewClientHost(UUID, "127.0.0.1", "Client", 8080, 8081, "", "", "") + return host +} diff --git a/scheduler/supervisor/mocks/cdn_mock.go b/scheduler/supervisor/mocks/cdn_mock.go new file mode 100644 index 000000000..ed179ae9e --- /dev/null +++ b/scheduler/supervisor/mocks/cdn_mock.go @@ -0,0 +1,135 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: d7y.io/dragonfly/v2/scheduler/supervisor (interfaces: CDNDynmaicClient) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + dfnet "d7y.io/dragonfly/v2/pkg/basic/dfnet" + base "d7y.io/dragonfly/v2/pkg/rpc/base" + cdnsystem "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" + client "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client" + config "d7y.io/dragonfly/v2/scheduler/config" + supervisor "d7y.io/dragonfly/v2/scheduler/supervisor" + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" +) + +// MockCDNDynmaicClient is a mock of CDNDynmaicClient interface. +type MockCDNDynmaicClient struct { + ctrl *gomock.Controller + recorder *MockCDNDynmaicClientMockRecorder +} + +// MockCDNDynmaicClientMockRecorder is the mock recorder for MockCDNDynmaicClient. +type MockCDNDynmaicClientMockRecorder struct { + mock *MockCDNDynmaicClient +} + +// NewMockCDNDynmaicClient creates a new mock instance. +func NewMockCDNDynmaicClient(ctrl *gomock.Controller) *MockCDNDynmaicClient { + mock := &MockCDNDynmaicClient{ctrl: ctrl} + mock.recorder = &MockCDNDynmaicClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCDNDynmaicClient) EXPECT() *MockCDNDynmaicClientMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockCDNDynmaicClient) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockCDNDynmaicClientMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockCDNDynmaicClient)(nil).Close)) +} + +// GetHost mocks base method. +func (m *MockCDNDynmaicClient) GetHost(arg0 string) (*supervisor.Host, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHost", arg0) + ret0, _ := ret[0].(*supervisor.Host) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetHost indicates an expected call of GetHost. +func (mr *MockCDNDynmaicClientMockRecorder) GetHost(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHost", reflect.TypeOf((*MockCDNDynmaicClient)(nil).GetHost), arg0) +} + +// GetPieceTasks mocks base method. +func (m *MockCDNDynmaicClient) GetPieceTasks(arg0 context.Context, arg1 dfnet.NetAddr, arg2 *base.PieceTaskRequest, arg3 ...grpc.CallOption) (*base.PiecePacket, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1, arg2} + for _, a := range arg3 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetPieceTasks", varargs...) + ret0, _ := ret[0].(*base.PiecePacket) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetPieceTasks indicates an expected call of GetPieceTasks. +func (mr *MockCDNDynmaicClientMockRecorder) GetPieceTasks(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieceTasks", reflect.TypeOf((*MockCDNDynmaicClient)(nil).GetPieceTasks), varargs...) +} + +// ObtainSeeds mocks base method. +func (m *MockCDNDynmaicClient) ObtainSeeds(arg0 context.Context, arg1 *cdnsystem.SeedRequest, arg2 ...grpc.CallOption) (*client.PieceSeedStream, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ObtainSeeds", varargs...) + ret0, _ := ret[0].(*client.PieceSeedStream) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ObtainSeeds indicates an expected call of ObtainSeeds. +func (mr *MockCDNDynmaicClientMockRecorder) ObtainSeeds(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObtainSeeds", reflect.TypeOf((*MockCDNDynmaicClient)(nil).ObtainSeeds), varargs...) +} + +// OnNotify mocks base method. +func (m *MockCDNDynmaicClient) OnNotify(arg0 *config.DynconfigData) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnNotify", arg0) +} + +// OnNotify indicates an expected call of OnNotify. +func (mr *MockCDNDynmaicClientMockRecorder) OnNotify(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnNotify", reflect.TypeOf((*MockCDNDynmaicClient)(nil).OnNotify), arg0) +} + +// UpdateState mocks base method. +func (m *MockCDNDynmaicClient) UpdateState(arg0 []dfnet.NetAddr) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UpdateState", arg0) +} + +// UpdateState indicates an expected call of UpdateState. +func (mr *MockCDNDynmaicClientMockRecorder) UpdateState(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateState", reflect.TypeOf((*MockCDNDynmaicClient)(nil).UpdateState), arg0) +} diff --git a/scheduler/supervisor/mocks/gc_mock.go b/scheduler/supervisor/mocks/gc_mock.go new file mode 100644 index 000000000..547f124b1 --- /dev/null +++ b/scheduler/supervisor/mocks/gc_mock.go @@ -0,0 +1,99 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: d7y.io/dragonfly/v2/pkg/gc (interfaces: GC) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + gc "d7y.io/dragonfly/v2/pkg/gc" + gomock "github.com/golang/mock/gomock" +) + +// MockGC is a mock of GC interface. +type MockGC struct { + ctrl *gomock.Controller + recorder *MockGCMockRecorder +} + +// MockGCMockRecorder is the mock recorder for MockGC. +type MockGCMockRecorder struct { + mock *MockGC +} + +// NewMockGC creates a new mock instance. +func NewMockGC(ctrl *gomock.Controller) *MockGC { + mock := &MockGC{ctrl: ctrl} + mock.recorder = &MockGCMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockGC) EXPECT() *MockGCMockRecorder { + return m.recorder +} + +// Add mocks base method. +func (m *MockGC) Add(arg0 gc.Task) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Add", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Add indicates an expected call of Add. +func (mr *MockGCMockRecorder) Add(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockGC)(nil).Add), arg0) +} + +// Run mocks base method. +func (m *MockGC) Run(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Run", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Run indicates an expected call of Run. +func (mr *MockGCMockRecorder) Run(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockGC)(nil).Run), arg0) +} + +// RunAll mocks base method. +func (m *MockGC) RunAll() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RunAll") +} + +// RunAll indicates an expected call of RunAll. +func (mr *MockGCMockRecorder) RunAll() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunAll", reflect.TypeOf((*MockGC)(nil).RunAll)) +} + +// Serve mocks base method. +func (m *MockGC) Serve() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Serve") +} + +// Serve indicates an expected call of Serve. +func (mr *MockGCMockRecorder) Serve() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Serve", reflect.TypeOf((*MockGC)(nil).Serve)) +} + +// Stop mocks base method. +func (m *MockGC) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockGCMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockGC)(nil).Stop)) +} diff --git a/scheduler/supervisor/mocks/host_mock.go b/scheduler/supervisor/mocks/host_mock.go new file mode 100644 index 000000000..833ac9dbf --- /dev/null +++ b/scheduler/supervisor/mocks/host_mock.go @@ -0,0 +1,74 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: d7y.io/dragonfly/v2/scheduler/supervisor (interfaces: HostManager) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + supervisor "d7y.io/dragonfly/v2/scheduler/supervisor" + gomock "github.com/golang/mock/gomock" +) + +// MockHostManager is a mock of HostManager interface. +type MockHostManager struct { + ctrl *gomock.Controller + recorder *MockHostManagerMockRecorder +} + +// MockHostManagerMockRecorder is the mock recorder for MockHostManager. +type MockHostManagerMockRecorder struct { + mock *MockHostManager +} + +// NewMockHostManager creates a new mock instance. +func NewMockHostManager(ctrl *gomock.Controller) *MockHostManager { + mock := &MockHostManager{ctrl: ctrl} + mock.recorder = &MockHostManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockHostManager) EXPECT() *MockHostManagerMockRecorder { + return m.recorder +} + +// Add mocks base method. +func (m *MockHostManager) Add(arg0 *supervisor.Host) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Add", arg0) +} + +// Add indicates an expected call of Add. +func (mr *MockHostManagerMockRecorder) Add(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockHostManager)(nil).Add), arg0) +} + +// Delete mocks base method. +func (m *MockHostManager) Delete(arg0 string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Delete", arg0) +} + +// Delete indicates an expected call of Delete. +func (mr *MockHostManagerMockRecorder) Delete(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockHostManager)(nil).Delete), arg0) +} + +// Get mocks base method. +func (m *MockHostManager) Get(arg0 string) (*supervisor.Host, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", arg0) + ret0, _ := ret[0].(*supervisor.Host) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockHostManagerMockRecorder) Get(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockHostManager)(nil).Get), arg0) +} diff --git a/scheduler/supervisor/mocks/peer_mock.go b/scheduler/supervisor/mocks/peer_mock.go new file mode 100644 index 000000000..33f075eec --- /dev/null +++ b/scheduler/supervisor/mocks/peer_mock.go @@ -0,0 +1,103 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: d7y.io/dragonfly/v2/scheduler/supervisor (interfaces: PeerManager) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + sync "sync" + + supervisor "d7y.io/dragonfly/v2/scheduler/supervisor" + gomock "github.com/golang/mock/gomock" +) + +// MockPeerManager is a mock of PeerManager interface. +type MockPeerManager struct { + ctrl *gomock.Controller + recorder *MockPeerManagerMockRecorder +} + +// MockPeerManagerMockRecorder is the mock recorder for MockPeerManager. +type MockPeerManagerMockRecorder struct { + mock *MockPeerManager +} + +// NewMockPeerManager creates a new mock instance. +func NewMockPeerManager(ctrl *gomock.Controller) *MockPeerManager { + mock := &MockPeerManager{ctrl: ctrl} + mock.recorder = &MockPeerManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPeerManager) EXPECT() *MockPeerManagerMockRecorder { + return m.recorder +} + +// Add mocks base method. +func (m *MockPeerManager) Add(arg0 *supervisor.Peer) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Add", arg0) +} + +// Add indicates an expected call of Add. +func (mr *MockPeerManagerMockRecorder) Add(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockPeerManager)(nil).Add), arg0) +} + +// Delete mocks base method. +func (m *MockPeerManager) Delete(arg0 string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Delete", arg0) +} + +// Delete indicates an expected call of Delete. +func (mr *MockPeerManagerMockRecorder) Delete(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockPeerManager)(nil).Delete), arg0) +} + +// Get mocks base method. +func (m *MockPeerManager) Get(arg0 string) (*supervisor.Peer, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", arg0) + ret0, _ := ret[0].(*supervisor.Peer) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockPeerManagerMockRecorder) Get(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockPeerManager)(nil).Get), arg0) +} + +// GetPeers mocks base method. +func (m *MockPeerManager) GetPeers() *sync.Map { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPeers") + ret0, _ := ret[0].(*sync.Map) + return ret0 +} + +// GetPeers indicates an expected call of GetPeers. +func (mr *MockPeerManagerMockRecorder) GetPeers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeers", reflect.TypeOf((*MockPeerManager)(nil).GetPeers)) +} + +// GetPeersByTask mocks base method. +func (m *MockPeerManager) GetPeersByTask(arg0 string) []*supervisor.Peer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPeersByTask", arg0) + ret0, _ := ret[0].([]*supervisor.Peer) + return ret0 +} + +// GetPeersByTask indicates an expected call of GetPeersByTask. +func (mr *MockPeerManagerMockRecorder) GetPeersByTask(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeersByTask", reflect.TypeOf((*MockPeerManager)(nil).GetPeersByTask), arg0) +} diff --git a/scheduler/supervisor/mocks/task_mock.go b/scheduler/supervisor/mocks/task_mock.go new file mode 100644 index 000000000..91dca661c --- /dev/null +++ b/scheduler/supervisor/mocks/task_mock.go @@ -0,0 +1,89 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: d7y.io/dragonfly/v2/scheduler/supervisor (interfaces: TaskManager) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + supervisor "d7y.io/dragonfly/v2/scheduler/supervisor" + gomock "github.com/golang/mock/gomock" +) + +// MockTaskManager is a mock of TaskManager interface. +type MockTaskManager struct { + ctrl *gomock.Controller + recorder *MockTaskManagerMockRecorder +} + +// MockTaskManagerMockRecorder is the mock recorder for MockTaskManager. +type MockTaskManagerMockRecorder struct { + mock *MockTaskManager +} + +// NewMockTaskManager creates a new mock instance. +func NewMockTaskManager(ctrl *gomock.Controller) *MockTaskManager { + mock := &MockTaskManager{ctrl: ctrl} + mock.recorder = &MockTaskManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTaskManager) EXPECT() *MockTaskManagerMockRecorder { + return m.recorder +} + +// Add mocks base method. +func (m *MockTaskManager) Add(arg0 *supervisor.Task) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Add", arg0) +} + +// Add indicates an expected call of Add. +func (mr *MockTaskManagerMockRecorder) Add(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockTaskManager)(nil).Add), arg0) +} + +// Delete mocks base method. +func (m *MockTaskManager) Delete(arg0 string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Delete", arg0) +} + +// Delete indicates an expected call of Delete. +func (mr *MockTaskManagerMockRecorder) Delete(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockTaskManager)(nil).Delete), arg0) +} + +// Get mocks base method. +func (m *MockTaskManager) Get(arg0 string) (*supervisor.Task, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", arg0) + ret0, _ := ret[0].(*supervisor.Task) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockTaskManagerMockRecorder) Get(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockTaskManager)(nil).Get), arg0) +} + +// GetOrAdd mocks base method. +func (m *MockTaskManager) GetOrAdd(arg0 *supervisor.Task) (*supervisor.Task, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOrAdd", arg0) + ret0, _ := ret[0].(*supervisor.Task) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetOrAdd indicates an expected call of GetOrAdd. +func (mr *MockTaskManagerMockRecorder) GetOrAdd(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrAdd", reflect.TypeOf((*MockTaskManager)(nil).GetOrAdd), arg0) +} diff --git a/scheduler/supervisor/peer.go b/scheduler/supervisor/peer.go index 336211f2e..8fff4f3f4 100644 --- a/scheduler/supervisor/peer.go +++ b/scheduler/supervisor/peer.go @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +//go:generate mockgen -destination ./mocks/peer_mock.go -package mocks d7y.io/dragonfly/v2/scheduler/supervisor PeerManager package supervisor @@ -289,7 +290,7 @@ func isDescendant(ancestor, offspring *Peer) bool { parent, ok := node.GetParent() if !ok { return false - } else if node.ID == ancestor.ID { + } else if parent.ID == ancestor.ID { return true } node = parent diff --git a/scheduler/supervisor/peer_test.go b/scheduler/supervisor/peer_test.go new file mode 100644 index 000000000..8995846bc --- /dev/null +++ b/scheduler/supervisor/peer_test.go @@ -0,0 +1,676 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package supervisor_test + +import ( + "strconv" + "testing" + + "d7y.io/dragonfly/v2/scheduler/config" + "d7y.io/dragonfly/v2/scheduler/supervisor" + "d7y.io/dragonfly/v2/scheduler/supervisor/mocks" + + "github.com/golang/mock/gomock" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +func TestPeer_New(t *testing.T) { + tests := []struct { + name string + id string + expect func(t *testing.T, peer *supervisor.Peer) + }{ + { + name: "create by normal config", + id: "normal", + expect: func(t *testing.T, peer *supervisor.Peer) { + assert := assert.New(t) + assert.Equal("normal", peer.ID) + }, + }, + { + name: "create by special symbols", + id: "#@+:\b\t\\\"☹ ☺ ☻ (✿◠‿◠)", + expect: func(t *testing.T, peer *supervisor.Peer) { + assert := assert.New(t) + assert.Equal("#@+:\b\t\\\"☹ ☺ ☻ (✿◠‿◠)", peer.ID) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + task := mockATask("task") + host := mockAHost("host") + peer := supervisor.NewPeer(tc.id, task, host) + tc.expect(t, peer) + }) + } +} + +func TestPeer_Tree(t *testing.T) { + tests := []struct { + name string + number int + tree map[int]int + answer []int + expect func(t *testing.T, peers []*supervisor.Peer, number int, answer []int) + }{ + { + name: "test ID of tree structure", + number: 6, + tree: map[int]int{1: 0, 2: 0, 3: 1, 4: 1, 5: 2}, + answer: []int{0, 1, 2, 3, 4, 5}, + expect: func(t *testing.T, peers []*supervisor.Peer, number int, answer []int) { + assert := assert.New(t) + for i := 0; i < number; i++ { + assert.Equal(strconv.Itoa(answer[i]), peers[i].ID) + } + }, + }, + { + name: "test TreeNodeCount of tree structure", + number: 6, + tree: map[int]int{1: 0, 2: 0, 3: 1, 4: 1, 5: 2}, + answer: []int{6, 3, 2, 1, 1, 1}, + expect: func(t *testing.T, peers []*supervisor.Peer, number int, answer []int) { + assert := assert.New(t) + for i := 0; i < number; i++ { + assert.Equal(answer[i], peers[i].GetTreeNodeCount()) + } + }, + }, + { + name: "test TreeDepth of tree structure", + number: 6, + tree: map[int]int{1: 0, 2: 0, 3: 1, 4: 1, 5: 2}, + answer: []int{1, 2, 2, 3, 3, 3}, + expect: func(t *testing.T, peers []*supervisor.Peer, number int, answer []int) { + assert := assert.New(t) + for i := 0; i < number; i++ { + assert.Equal(answer[i], peers[i].GetTreeDepth()) + } + }, + }, + { + name: "test Root of tree structure", + number: 6, + tree: map[int]int{1: 0, 2: 0, 3: 1, 4: 1, 5: 2}, + answer: []int{0, 0, 0, 0, 0, 0}, + expect: func(t *testing.T, peers []*supervisor.Peer, number int, answer []int) { + assert := assert.New(t) + for i := 0; i < number; i++ { + assert.Equal(strconv.Itoa(answer[i]), peers[i].GetRoot().ID) + } + }, + }, + { + name: "test Parent of tree structure", + number: 6, + tree: map[int]int{1: 0, 2: 0, 3: 1, 4: 1, 5: 2}, + answer: []int{-1, 0, 0, 1, 1, 2}, + expect: func(t *testing.T, peers []*supervisor.Peer, number int, answer []int) { + assert := assert.New(t) + for i := 0; i < number; i++ { + parent, success := peers[i].GetParent() + if answer[i] < 0 { + assert.Equal((*supervisor.Peer)(nil), parent) + assert.False(success) + } else { + assert.Equal(strconv.Itoa(answer[i]), parent.ID) + assert.True(success) + } + } + }, + }, + { + name: "test Ancestor of tree structure", + number: 6, + tree: map[int]int{1: 0, 2: 0, 3: 1, 4: 1, 5: 2}, + answer: []int{}, + expect: func(t *testing.T, peers []*supervisor.Peer, number int, answer []int) { + assert := assert.New(t) + assert.False(peers[0].IsAncestor(peers[0])) + assert.False(peers[0].IsAncestor(nil)) + + assert.True(peers[0].IsAncestor(peers[5])) + assert.False(peers[5].IsAncestor(peers[0])) + + assert.True(peers[1].IsAncestor(peers[4])) + assert.False(peers[4].IsAncestor(peers[1])) + }, + }, + { + name: "test Descendant of tree structure", + number: 6, + tree: map[int]int{1: 0, 2: 0, 3: 1, 4: 1, 5: 2}, + answer: []int{}, + expect: func(t *testing.T, peers []*supervisor.Peer, number int, answer []int) { + assert := assert.New(t) + assert.False(peers[0].IsDescendant(peers[0])) + assert.False(peers[0].IsDescendant(nil)) + + assert.False(peers[0].IsDescendant(peers[5])) + assert.True(peers[5].IsDescendant(peers[0])) + + assert.False(peers[1].IsDescendant(peers[4])) + assert.True(peers[4].IsDescendant(peers[1])) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var peers []*supervisor.Peer + task := mockATask("task") + for i := 0; i < tc.number; i++ { + index := strconv.Itoa(i) + peer := mockAPeer(index, task) + if i > 0 { + peer.ReplaceParent(peers[tc.tree[i]]) + } + peers = append(peers, peer) + } + tc.expect(t, peers, tc.number, tc.answer) + }) + } +} + +func TestPeer_Cost(t *testing.T) { + tests := []struct { + name string + finishedCount []int32 + cost []int + expect func(t *testing.T, peer *supervisor.Peer, cost []int) + }{ + { + name: "normal workflow", + finishedCount: []int32{2, 3, 4}, + cost: []int{3, 4, 5}, + expect: func(t *testing.T, peer *supervisor.Peer, cost []int) { + assert := assert.New(t) + + costFetch := peer.GetPieceCosts() + assert.ElementsMatch(costFetch, cost) + + average, success := peer.GetPieceAverageCost() + assert.True(success) + assert.Equal(4, average) + finishedCountFetch, loadFetch := peer.GetSortKeys() + assert.Equal(4, finishedCountFetch) + assert.Equal(100, loadFetch) + }, + }, + { + name: "no workflow will be neglected", + finishedCount: []int32{}, + cost: []int{}, + expect: func(t *testing.T, peer *supervisor.Peer, cost []int) { + assert := assert.New(t) + + average, success := peer.GetPieceAverageCost() + assert.False(success) + assert.Equal(0, average) + }, + }, + { + name: "long workflow will be clipped", + finishedCount: []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22}, + cost: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22}, + expect: func(t *testing.T, peer *supervisor.Peer, cost []int) { + assert := assert.New(t) + + costFetch := peer.GetPieceCosts() + assert.ElementsMatch(costFetch, cost[2:]) + + average, success := peer.GetPieceAverageCost() + assert.True(success) + assert.Equal(12, average) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + task := mockATask("task") + peer := mockAPeer("peer", task) + for i := 0; i < len(tc.finishedCount); i++ { + peer.UpdateProgress(tc.finishedCount[i], tc.cost[i]) + } + tc.expect(t, peer, tc.cost) + }) + } +} + +func TestPeer_Status(t *testing.T) { + tests := []struct { + name string + status supervisor.PeerStatus + statusName string + judgeArray []bool + expect func(t *testing.T, peer *supervisor.Peer, status supervisor.PeerStatus, statusName string, judgeArray []bool) + }{ + { + name: "status Waiting", + status: supervisor.PeerStatusWaiting, + statusName: "Waiting", + judgeArray: []bool{false, true, false, false, false, false}, + expect: func(t *testing.T, peer *supervisor.Peer, status supervisor.PeerStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.PeerStatus.String(status), statusName) + assert.Equal(peer.GetStatus(), status) + + statutusJudgeArray := []bool{ + peer.IsRunning(), peer.IsWaiting(), peer.IsSuccess(), + peer.IsDone(), peer.IsBad(), peer.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + { + name: "status Running", + status: supervisor.PeerStatusRunning, + statusName: "Running", + judgeArray: []bool{true, false, false, false, false, false}, + expect: func(t *testing.T, peer *supervisor.Peer, status supervisor.PeerStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.PeerStatus.String(status), statusName) + assert.Equal(peer.GetStatus(), status) + + statutusJudgeArray := []bool{ + peer.IsRunning(), peer.IsWaiting(), peer.IsSuccess(), + peer.IsDone(), peer.IsBad(), peer.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + { + name: "status Zombie", + status: supervisor.PeerStatusZombie, + statusName: "Zombie", + judgeArray: []bool{false, false, false, false, true, false}, + expect: func(t *testing.T, peer *supervisor.Peer, status supervisor.PeerStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.PeerStatus.String(status), statusName) + assert.Equal(peer.GetStatus(), status) + + statutusJudgeArray := []bool{ + peer.IsRunning(), peer.IsWaiting(), peer.IsSuccess(), + peer.IsDone(), peer.IsBad(), peer.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + { + name: "status Fail", + status: supervisor.PeerStatusFail, + statusName: "Fail", + judgeArray: []bool{false, false, false, true, true, true}, + expect: func(t *testing.T, peer *supervisor.Peer, status supervisor.PeerStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.PeerStatus.String(status), statusName) + assert.Equal(peer.GetStatus(), status) + + statutusJudgeArray := []bool{ + peer.IsRunning(), peer.IsWaiting(), peer.IsSuccess(), + peer.IsDone(), peer.IsBad(), peer.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + { + name: "status Success", + status: supervisor.PeerStatusSuccess, + statusName: "Success", + judgeArray: []bool{false, false, true, true, false, false}, + expect: func(t *testing.T, peer *supervisor.Peer, status supervisor.PeerStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.PeerStatus.String(status), statusName) + assert.Equal(peer.GetStatus(), status) + + statutusJudgeArray := []bool{ + peer.IsRunning(), peer.IsWaiting(), peer.IsSuccess(), + peer.IsDone(), peer.IsBad(), peer.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + { + name: "unknown", + status: 100, + statusName: "unknown", + judgeArray: []bool{false, false, false, false, false, false}, + expect: func(t *testing.T, peer *supervisor.Peer, status supervisor.PeerStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.PeerStatus.String(status), statusName) + assert.Equal(peer.GetStatus(), status) + + statutusJudgeArray := []bool{ + peer.IsRunning(), peer.IsWaiting(), peer.IsSuccess(), + peer.IsDone(), peer.IsBad(), peer.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + task := mockATask("task") + peer := mockAPeer("peer", task) + peer.SetStatus(tc.status) + + tc.expect(t, peer, tc.status, tc.statusName, tc.judgeArray) + }) + } +} + +func TestPeerManager_New(t *testing.T) { + tests := []struct { + name string + config *config.GCConfig + mock func(m *mocks.MockGCMockRecorder) + expect func(t *testing.T, peerManager supervisor.PeerManager, err error) + }{ + { + name: "create with default config", + config: config.New().Scheduler.GC, + mock: func(m *mocks.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).AnyTimes() + }, + expect: func(t *testing.T, peerManager supervisor.PeerManager, err error) { + assert := assert.New(t) + assert.NotNil(peerManager) + assert.Nil(err) + }, + }, + { + name: "create with strange int", + config: &config.GCConfig{ + PeerGCInterval: 1, + TaskGCInterval: 1 >> 69, + PeerTTL: 1 << 62, + PeerTTI: 1, + TaskTTL: 1, + TaskTTI: 1, + }, + mock: func(m *mocks.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).AnyTimes() + }, + expect: func(t *testing.T, peerManager supervisor.PeerManager, err error) { + assert := assert.New(t) + assert.NotNil(peerManager) + assert.Nil(err) + }, + }, + { + name: "gc failed", + config: config.New().Scheduler.GC, + mock: func(m *mocks.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(errors.New("mockError")).AnyTimes() + }, + expect: func(t *testing.T, peerManager supervisor.PeerManager, err error) { + assert := assert.New(t) + assert.Nil(peerManager) + assert.Error(err) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockHostManager := mocks.NewMockHostManager(ctl) + mockGC := mocks.NewMockGC(ctl) + tc.mock(mockGC.EXPECT()) + + peerManager, err := supervisor.NewPeerManager(tc.config, mockGC, mockHostManager) + tc.expect(t, peerManager, err) + }) + } +} + +func TestPeerManager_GetPeer(t *testing.T) { + tests := []struct { + name string + number int + fetch int + expect func(t *testing.T, peer *supervisor.Peer, success bool, err error) + }{ + { + name: "fetch first peer", + number: 3, + fetch: 0, + expect: func(t *testing.T, peer *supervisor.Peer, success bool, err error) { + assert := assert.New(t) + assert.Equal("0", peer.ID) + assert.True(success) + assert.Nil(err) + }, + }, + { + name: "fetch last peer", + number: 3, + fetch: 2, + expect: func(t *testing.T, peer *supervisor.Peer, success bool, err error) { + assert := assert.New(t) + assert.Equal("2", peer.ID) + assert.True(success) + assert.Nil(err) + }, + }, + { + name: "fetch not exist peer", + number: 3, + fetch: -1, + expect: func(t *testing.T, peer *supervisor.Peer, success bool, err error) { + assert := assert.New(t) + assert.Equal((*supervisor.Peer)(nil), peer) + assert.False(success) + assert.Nil(err) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockHostManager := mocks.NewMockHostManager(ctl) + mockGC := mocks.NewMockGC(ctl) + mockGC.EXPECT().Add(gomock.Any()).Return(nil).AnyTimes() + + cfg := config.New() + peerManager, err := supervisor.NewPeerManager(cfg.Scheduler.GC, mockGC, mockHostManager) + task := mockATask("123") + for i := 0; i < tc.number; i++ { + index := strconv.Itoa(i) + peer := mockAPeer(index, task) + peerManager.Add(peer) + } + peer, success := peerManager.Get(strconv.Itoa(tc.fetch)) + tc.expect(t, peer, success, err) + }) + } +} + +func TestPeerManager_Add(t *testing.T) { + tests := []struct { + name string + ID []int + expect func(t *testing.T, peer []*supervisor.Peer) + }{ + { + name: "add seperative peers", + ID: []int{1, 2, 3}, + expect: func(t *testing.T, peers []*supervisor.Peer) { + assert := assert.New(t) + assert.Len(peers, 3) + }, + }, + { + name: "add duplicate peers", + ID: []int{1, 1, 1}, + expect: func(t *testing.T, peers []*supervisor.Peer) { + assert := assert.New(t) + assert.Len(peers, 1) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockHostManager := mocks.NewMockHostManager(ctl) + mockGC := mocks.NewMockGC(ctl) + mockGC.EXPECT().Add(gomock.Any()).Return(nil).AnyTimes() + + cfg := config.New() + peerManager, _ := supervisor.NewPeerManager(cfg.Scheduler.GC, mockGC, mockHostManager) + task := mockATask("123") + for _, i := range tc.ID { + index := strconv.Itoa(i) + peer := mockAPeer(index, task) + peerManager.Add(peer) + } + peers := peerManager.GetPeersByTask("123") + tc.expect(t, peers) + }) + } +} + +func TestPeerManager_GetPeersByTask(t *testing.T) { + tests := []struct { + name string + tasks map[*supervisor.Task]int + fetch string + expect func(t *testing.T, peer []*supervisor.Peer) + }{ + { + name: "peer for a task", + tasks: map[*supervisor.Task]int{mockATask("123"): 3}, + fetch: "123", + expect: func(t *testing.T, peers []*supervisor.Peer) { + assert := assert.New(t) + assert.Len(peers, 3) + }, + }, + { + name: "one from two task", + tasks: map[*supervisor.Task]int{mockATask("123"): 2, mockATask("456"): 3}, + fetch: "123", + expect: func(t *testing.T, peers []*supervisor.Peer) { + assert := assert.New(t) + assert.Len(peers, 2) + }, + }, + { + name: "no peer for a task", + tasks: map[*supervisor.Task]int{mockATask("123"): 1}, + fetch: "456", + expect: func(t *testing.T, peers []*supervisor.Peer) { + assert := assert.New(t) + assert.Len(peers, 0) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockHostManager := mocks.NewMockHostManager(ctl) + mockGC := mocks.NewMockGC(ctl) + mockGC.EXPECT().Add(gomock.Any()).Return(nil).AnyTimes() + + cfg := config.New() + peerManager, _ := supervisor.NewPeerManager(cfg.Scheduler.GC, mockGC, mockHostManager) + nowAt := 0 + for task, num := range tc.tasks { + for i := nowAt; i < nowAt+num; i++ { + index := strconv.Itoa(i) + peer := mockAPeer(index, task) + t.Log(i, index, nowAt, peer.ID, num) + peerManager.Add(peer) + } + nowAt += num + } + peers := peerManager.GetPeersByTask(tc.fetch) + tc.expect(t, peers) + }) + } +} + +func TestPeerManager_Delete(t *testing.T) { + tests := []struct { + name string + number int + delete int + fetch int + expect func(t *testing.T, peer *supervisor.Peer, success bool) + }{ + { + name: "delete exist peer", + number: 1, + delete: 0, + fetch: 0, + expect: func(t *testing.T, peer *supervisor.Peer, success bool) { + assert := assert.New(t) + assert.Nil(peer) + assert.False(success) + }, + }, + { + name: "delete not exist peer", + number: 1, + delete: 100, + fetch: 0, + expect: func(t *testing.T, peer *supervisor.Peer, success bool) { + assert := assert.New(t) + assert.NotNil(peer) + assert.True(success) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockHostManager := mocks.NewMockHostManager(ctl) + mockGC := mocks.NewMockGC(ctl) + mockGC.EXPECT().Add(gomock.Any()).Return(nil).AnyTimes() + + cfg := config.New() + peerManager, _ := supervisor.NewPeerManager(cfg.Scheduler.GC, mockGC, mockHostManager) + task := mockATask("123") + for i := 0; i < tc.number; i++ { + index := strconv.Itoa(i) + peer := mockAPeer(index, task) + peerManager.Add(peer) + } + peerManager.Delete(strconv.Itoa(tc.delete)) + peer, success := peerManager.Get(strconv.Itoa(tc.fetch)) + + tc.expect(t, peer, success) + }) + } +} + +func mockAPeer(ID string, task *supervisor.Task) *supervisor.Peer { + host := supervisor.NewClientHost(ID, "127.0.0.1", "Client", 8080, 8081, "", "", "") + return supervisor.NewPeer(ID, task, host) +} diff --git a/scheduler/supervisor/task.go b/scheduler/supervisor/task.go index 415ee5e44..8a42803b8 100644 --- a/scheduler/supervisor/task.go +++ b/scheduler/supervisor/task.go @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +//go:generate mockgen -destination ./mocks/task_mock.go -package mocks d7y.io/dragonfly/v2/scheduler/supervisor TaskManager package supervisor diff --git a/scheduler/supervisor/task_test.go b/scheduler/supervisor/task_test.go new file mode 100644 index 000000000..33ea01f14 --- /dev/null +++ b/scheduler/supervisor/task_test.go @@ -0,0 +1,576 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package supervisor_test + +import ( + "strconv" + "testing" + + "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/scheduler/config" + "d7y.io/dragonfly/v2/scheduler/supervisor" + "d7y.io/dragonfly/v2/scheduler/supervisor/mocks" + "github.com/golang/mock/gomock" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +func TestTask_New(t *testing.T) { + tests := []struct { + name string + task *supervisor.Task + expect func(t *testing.T, task *supervisor.Task) + }{ + { + name: "create by normal config", + task: supervisor.NewTask("main", "127.0.0.1", &base.UrlMeta{}), + expect: func(t *testing.T, task *supervisor.Task) { + assert := assert.New(t) + assert.Equal("main", task.ID) + }, + }, + { + name: "create by special symbol", + task: supervisor.NewTask("\x07\b%$!!\x7F✌ (>‿<)✌", "d7y.io/dragonfly", &base.UrlMeta{Tag: "d7y-test"}), + expect: func(t *testing.T, task *supervisor.Task) { + assert := assert.New(t) + assert.Equal("\x07\b%$!!\x7F✌ (>‿<)✌", task.ID) + }, + }, + { + name: "create by http url", + task: supervisor.NewTask("task", "http://370.moe/", &base.UrlMeta{}), + expect: func(t *testing.T, task *supervisor.Task) { + assert := assert.New(t) + assert.Equal("task", task.ID) + }, + }, + { + name: "create by normal config", + task: supervisor.NewTask("task", "android://370.moe", &base.UrlMeta{}), + expect: func(t *testing.T, task *supervisor.Task) { + assert := assert.New(t) + assert.Equal("task", task.ID) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tc.expect(t, tc.task) + }) + } +} + +func TestTask_Status(t *testing.T) { + tests := []struct { + name string + status supervisor.TaskStatus + statusName string + judgeArray []bool + expect func(t *testing.T, task *supervisor.Task, status supervisor.TaskStatus, statusName string, judgeArray []bool) + }{ + { + name: "status Waiting", + status: supervisor.TaskStatusWaiting, + statusName: "Waiting", + judgeArray: []bool{false, false, true, false, false}, + expect: func(t *testing.T, task *supervisor.Task, status supervisor.TaskStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.TaskStatus.String(status), statusName) + assert.Equal(task.GetStatus(), status) + + statutusJudgeArray := []bool{ + task.IsSuccess(), task.CanSchedule(), + task.IsWaiting(), task.IsHealth(), task.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + { + name: "status Running", + status: supervisor.TaskStatusRunning, + statusName: "Running", + judgeArray: []bool{false, false, false, true, false}, + expect: func(t *testing.T, task *supervisor.Task, status supervisor.TaskStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.TaskStatus.String(status), statusName) + assert.Equal(task.GetStatus(), status) + + statutusJudgeArray := []bool{ + task.IsSuccess(), task.CanSchedule(), + task.IsWaiting(), task.IsHealth(), task.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + { + name: "status seeding", + status: supervisor.TaskStatusSeeding, + statusName: "Seeding", + judgeArray: []bool{false, true, false, true, false}, + expect: func(t *testing.T, task *supervisor.Task, status supervisor.TaskStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.TaskStatus.String(status), statusName) + assert.Equal(task.GetStatus(), status) + + statutusJudgeArray := []bool{ + task.IsSuccess(), task.CanSchedule(), + task.IsWaiting(), task.IsHealth(), task.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + { + name: "status success", + status: supervisor.TaskStatusSuccess, + statusName: "Success", + judgeArray: []bool{true, true, false, true, false}, + expect: func(t *testing.T, task *supervisor.Task, status supervisor.TaskStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.TaskStatus.String(status), statusName) + assert.Equal(task.GetStatus(), status) + + statutusJudgeArray := []bool{ + task.IsSuccess(), task.CanSchedule(), + task.IsWaiting(), task.IsHealth(), task.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + { + name: "status zombie", + status: supervisor.TaskStatusZombie, + statusName: "Zombie", + judgeArray: []bool{false, false, false, false, false}, + expect: func(t *testing.T, task *supervisor.Task, status supervisor.TaskStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.TaskStatus.String(status), statusName) + assert.Equal(task.GetStatus(), status) + + statutusJudgeArray := []bool{ + task.IsSuccess(), task.CanSchedule(), + task.IsWaiting(), task.IsHealth(), task.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + { + name: "status Fail", + status: supervisor.TaskStatusFail, + statusName: "Fail", + judgeArray: []bool{false, false, false, false, true}, + expect: func(t *testing.T, task *supervisor.Task, status supervisor.TaskStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.TaskStatus.String(status), statusName) + assert.Equal(task.GetStatus(), status) + + statutusJudgeArray := []bool{ + task.IsSuccess(), task.CanSchedule(), + task.IsWaiting(), task.IsHealth(), task.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + { + name: "unknown", + status: 100, + statusName: "unknown", + judgeArray: []bool{false, false, false, false, false}, + expect: func(t *testing.T, task *supervisor.Task, status supervisor.TaskStatus, statusName string, judgeArray []bool) { + assert := assert.New(t) + assert.Equal(supervisor.TaskStatus.String(status), statusName) + assert.Equal(task.GetStatus(), status) + + statutusJudgeArray := []bool{ + task.IsSuccess(), task.CanSchedule(), + task.IsWaiting(), task.IsHealth(), task.IsFail(), + } + assert.Equal(statutusJudgeArray, judgeArray) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + task := mockATask("task") + task.SetStatus(tc.status) + tc.expect(t, task, tc.status, tc.statusName, tc.judgeArray) + }) + } +} + +func TestTask_BackToSourcePeer(t *testing.T) { + tests := []struct { + name string + initialWeight int32 + add []string + expect func(t *testing.T, task *supervisor.Task, add []string) + }{ + { + name: "able to backsource", + initialWeight: 4, + add: []string{"0", "1", "2"}, + expect: func(t *testing.T, task *supervisor.Task, add []string) { + assert := assert.New(t) + assert.EqualValues(task.BackToSourceWeight.Load(), 1) + assert.True(task.CanBackToSource()) + assert.ElementsMatch(task.GetBackToSourcePeers(), add) + for _, ID := range add { + contain := task.ContainsBackToSourcePeer(ID) + assert.True(contain) + } + + }, + }, + { + name: "unable to backsource", + initialWeight: -1, + add: []string{}, + expect: func(t *testing.T, task *supervisor.Task, add []string) { + assert := assert.New(t) + assert.EqualValues(task.BackToSourceWeight.Load(), -1) + assert.False(task.CanBackToSource()) + + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + task := mockATask("task") + task.BackToSourceWeight.Store(tc.initialWeight) + for _, ID := range tc.add { + task.AddBackToSourcePeer(ID) + } + tc.expect(t, task, tc.add) + }) + } +} + +func TestTask_Pick(t *testing.T) { + tests := []struct { + name string + number int + pick func(peer *supervisor.Peer) bool + reverse bool + limit int + answer []string + }{ + { + name: "pick three odd", + number: 10, + pick: func(peer *supervisor.Peer) bool { + id, _ := strconv.Atoi(peer.ID) + return id%2 != 0 + }, + reverse: false, + limit: 3, + answer: []string{"1", "3", "5"}, + }, + { + name: "pick 100 odd", + number: 10, + pick: func(peer *supervisor.Peer) bool { + id, _ := strconv.Atoi(peer.ID) + return id%2 != 0 + }, + reverse: true, + limit: 3, + answer: []string{"5", "7", "9"}, + }, + { + name: "pick all odd", + number: 10, + pick: func(peer *supervisor.Peer) bool { + id, _ := strconv.Atoi(peer.ID) + return id%2 != 0 + }, + reverse: false, + limit: 100, + answer: []string{"1", "3", "5", "7", "9"}, + }, + { + name: "pick all", + number: 10, + pick: func(peer *supervisor.Peer) bool { + return true + }, + reverse: false, + limit: 100, + answer: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}, + }, + { + name: "pick nil", + number: 10, + pick: func(peer *supervisor.Peer) bool { + return false + }, + reverse: false, + limit: 100, + answer: []string{}, + }, + { + name: "invalid pickFn", + number: 10, + pick: (func(peer *supervisor.Peer) bool)(nil), + reverse: false, + limit: 100, + answer: []string{}, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + task := mockATask("task") + for i := 0; i < tc.number; i++ { + index := strconv.Itoa(i) + peer := mockAPeer(index, task) + peer.UpdateProgress((int32)(i), i) + task.AddPeer(peer) + } + var peers []*supervisor.Peer + if tc.reverse { + peers = task.PickReverse(tc.limit, tc.pick) + } else { + peers = task.Pick(tc.limit, tc.pick) + } + var peerIDs []string + for _, peer := range peers { + peerIDs = append(peerIDs, peer.ID) + } + assert := assert.New(t) + assert.ElementsMatch(peerIDs, tc.answer) + }) + } +} + +func TestTaskManager_New(t *testing.T) { + tests := []struct { + name string + config *config.GCConfig + mock func(m *mocks.MockGCMockRecorder) + expect func(t *testing.T, taskManager supervisor.TaskManager, err error) + }{ + { + name: "simple create", + config: config.New().Scheduler.GC, + mock: func(m *mocks.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).AnyTimes() + }, + expect: func(t *testing.T, taskManager supervisor.TaskManager, err error) { + assert := assert.New(t) + assert.NotNil(taskManager) + assert.Nil(err) + }, + }, + { + name: "gc failed", + config: config.New().Scheduler.GC, + mock: func(m *mocks.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(errors.New("mockError")).AnyTimes() + }, + expect: func(t *testing.T, taskManager supervisor.TaskManager, err error) { + assert := assert.New(t) + assert.Nil(taskManager) + assert.Error(err) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockAPeerManager := mocks.NewMockPeerManager(ctl) + mockGC := mocks.NewMockGC(ctl) + tc.mock(mockGC.EXPECT()) + + taskManager, err := supervisor.NewTaskManager(tc.config, mockGC, mockAPeerManager) + tc.expect(t, taskManager, err) + }) + } +} + +func TestTaskManager_Get(t *testing.T) { + tests := []struct { + name string + number int + fetch int + expect func(t *testing.T, task *supervisor.Task, success bool) + }{ + { + name: "fetch first task", + number: 3, + fetch: 0, + expect: func(t *testing.T, task *supervisor.Task, success bool) { + assert := assert.New(t) + assert.Equal("0", task.ID) + assert.True(success) + }, + }, + { + name: "fetch last task", + number: 3, + fetch: 2, + expect: func(t *testing.T, task *supervisor.Task, success bool) { + assert := assert.New(t) + assert.Equal("2", task.ID) + assert.True(success) + }, + }, + { + name: "fetch not exist task", + number: 3, + fetch: -1, + expect: func(t *testing.T, task *supervisor.Task, success bool) { + assert := assert.New(t) + assert.Equal((*supervisor.Task)(nil), task) + assert.False(success) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockHostManager := mocks.NewMockPeerManager(ctl) + mockGC := mocks.NewMockGC(ctl) + mockGC.EXPECT().Add(gomock.Any()).Return(nil).AnyTimes() + + cfg := config.New() + taskManager, _ := supervisor.NewTaskManager(cfg.Scheduler.GC, mockGC, mockHostManager) + for i := 0; i < tc.number; i++ { + index := strconv.Itoa(i) + task := mockATask(index) + taskManager.Add(task) + } + task, success := taskManager.Get(strconv.Itoa(tc.fetch)) + tc.expect(t, task, success) + }) + } +} + +func TestTaskManager_GetOrAdd(t *testing.T) { + tests := []struct { + name string + create int + add int + expect func(t *testing.T, task *supervisor.Task, success bool) + }{ + { + name: "get exist task", + create: 3, + add: 0, + expect: func(t *testing.T, task *supervisor.Task, success bool) { + assert := assert.New(t) + assert.Equal("2", task.ID) + assert.False(success) + }, + }, + { + name: "add not exist task", + create: 3, + add: 3, + expect: func(t *testing.T, task *supervisor.Task, success bool) { + assert := assert.New(t) + assert.Equal("2", task.ID) + assert.True(success) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockHostManager := mocks.NewMockPeerManager(ctl) + mockGC := mocks.NewMockGC(ctl) + mockGC.EXPECT().Add(gomock.Any()).Return(nil).AnyTimes() + + cfg := config.New() + taskManager, _ := supervisor.NewTaskManager(cfg.Scheduler.GC, mockGC, mockHostManager) + var tasks []*supervisor.Task + for i := 0; i < tc.create; i++ { + index := strconv.Itoa(i) + task := mockATask(index) + tasks = append(tasks, task) + } + for i := 0; i < tc.add; i++ { + taskManager.Add(tasks[i]) + } + task, success := taskManager.GetOrAdd(tasks[len(tasks)-1]) + tc.expect(t, task, success) + }) + } +} + +func TestTaskManager_Delete(t *testing.T) { + tests := []struct { + name string + number int + delete int + fetch int + expect func(t *testing.T, task *supervisor.Task, success bool) + }{ + { + name: "delete exist task", + number: 1, + delete: 0, + fetch: 0, + expect: func(t *testing.T, task *supervisor.Task, success bool) { + assert := assert.New(t) + assert.Nil(task) + assert.False(success) + }, + }, + { + name: "delete not exist task", + number: 1, + delete: 100, + fetch: 0, + expect: func(t *testing.T, task *supervisor.Task, success bool) { + assert := assert.New(t) + assert.NotNil(task) + assert.True(success) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockAPeerManager := mocks.NewMockPeerManager(ctl) + mockGC := mocks.NewMockGC(ctl) + mockGC.EXPECT().Add(gomock.Any()).Return(nil).AnyTimes() + + cfg := config.New() + taskManager, _ := supervisor.NewTaskManager(cfg.Scheduler.GC, mockGC, mockAPeerManager) + for i := 0; i < tc.number; i++ { + index := strconv.Itoa(i) + task := mockATask(index) + taskManager.Add(task) + } + taskManager.Delete(strconv.Itoa(tc.delete)) + task, success := taskManager.Get(strconv.Itoa(tc.fetch)) + + tc.expect(t, task, success) + }) + } +} + +func mockATask(ID string) *supervisor.Task { + urlMeta := &base.UrlMeta{ + Tag: "d7y-test", + } + return supervisor.NewTask(ID, "d7y.io/dragonfly", urlMeta) +}