From f63c705b9849d24c1233f2ab6631211a1289f918 Mon Sep 17 00:00:00 2001 From: sunwp <244372610@qq.com> Date: Thu, 20 Jan 2022 16:57:38 +0800 Subject: [PATCH] Add begin seed piece hint task registered successfully (#997) * cdn obtainSeeds add begin seed piece hint register successfully Signed-off-by: sunwp <244372610@qq.com> * feat: scheduler handle begin of piece Signed-off-by: Gaius * test: trigger cdn task Signed-off-by: Gaius Co-authored-by: Gaius --- .github/workflows/compatibility-e2e.yml | 2 +- cdn/rpcserver/rpcserver.go | 12 ++++ scheduler/resource/cdn.go | 30 +++------ scheduler/resource/cdn_test.go | 84 +++++++++++++------------ scheduler/resource/resource.go | 3 +- 5 files changed, 68 insertions(+), 63 deletions(-) diff --git a/.github/workflows/compatibility-e2e.yml b/.github/workflows/compatibility-e2e.yml index e2dd7607e..462c9bf80 100644 --- a/.github/workflows/compatibility-e2e.yml +++ b/.github/workflows/compatibility-e2e.yml @@ -13,7 +13,7 @@ env: KIND_VERSION: v0.11.1 CONTAINERD_VERSION: v1.5.2 KIND_CONFIG_PATH: test/testdata/kind/config.yaml - DRAGONFLY_STABLE_IMAGE_TAG: v2.0.1-rc.10 + DRAGONFLY_STABLE_IMAGE_TAG: v2.0.2-beta.3 DRAGONFLY_CHARTS_PATH: deploy/helm-charts/charts/dragonfly DRAGONFLY_CHARTS_CONFIG_PATH: test/testdata/charts/config.yaml DRAGONFLY_FILE_SERVER_PATH: test/testdata/k8s/file-server.yaml diff --git a/cdn/rpcserver/rpcserver.go b/cdn/rpcserver/rpcserver.go index 543dba352..f7a571c91 100644 --- a/cdn/rpcserver/rpcserver.go +++ b/cdn/rpcserver/rpcserver.go @@ -35,6 +35,7 @@ import ( "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/pkg/rpc/base/common" "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" cdnserver "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/server" "d7y.io/dragonfly/v2/pkg/util/digestutils" @@ -91,6 +92,17 @@ func (css *Server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, } peerID := idgen.CDNPeerID(css.config.AdvertiseIP) hostID := idgen.CDNHostID(hostutils.FQDNHostname, int32(css.config.ListenPort)) + // begin piece, hint register success + psc <- &cdnsystem.PieceSeed{ + PeerId: peerID, + HostUuid: hostID, + PieceInfo: &base.PieceInfo{ + PieceNum: common.BeginOfPiece, + }, + Done: false, + ContentLength: registeredTask.SourceFileLength, + TotalPieceCount: registeredTask.TotalPieceCount, + } for piece := range pieceChan { pieceSeed := &cdnsystem.PieceSeed{ PeerId: peerID, diff --git a/scheduler/resource/cdn.go b/scheduler/resource/cdn.go index bce9d16ae..eac5b68c3 100644 --- a/scheduler/resource/cdn.go +++ b/scheduler/resource/cdn.go @@ -27,6 +27,7 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/dfnet" "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/rpc/base/common" "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" cdnclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client" rpcscheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler" @@ -51,17 +52,12 @@ type cdn struct { } // New cdn interface -func newCDN(peerManager PeerManager, hostManager HostManager, dynconfig config.DynconfigInterface, opts ...grpc.DialOption) (CDN, error) { - client, err := newCDNClient(dynconfig, hostManager, opts...) - if err != nil { - return nil, err - } - +func newCDN(peerManager PeerManager, hostManager HostManager, client CDNClient) CDN { return &cdn{ client: client, peerManager: peerManager, hostManager: hostManager, - }, nil + } } // TriggerTask start to trigger cdn task @@ -75,24 +71,16 @@ func (c *cdn) TriggerTask(ctx context.Context, task *Task) (*Peer, *rpcscheduler return nil, nil, err } - var ( - initialized bool - peer *Peer - ) - - // Receive pieces from cdn + var peer *Peer for { piece, err := stream.Recv() if err != nil { return nil, nil, err } - task.Log.Infof("receive piece: %#v %#v", piece, piece.PieceInfo) - - // Init cdn peer - if !initialized { - initialized = true - + // Handle begin of piece + if piece.PieceInfo != nil && piece.PieceInfo.PieceNum == common.BeginOfPiece { + task.Log.Infof("receive begin o piece: %#v %#v", piece, piece.PieceInfo) peer, err = c.initPeer(task, piece) if err != nil { return nil, nil, err @@ -101,9 +89,10 @@ func (c *cdn) TriggerTask(ctx context.Context, task *Task) (*Peer, *rpcscheduler if err := peer.FSM.Event(PeerEventDownload); err != nil { return nil, nil, err } + continue } - // Get end piece + // Handle end of piece if piece.Done { peer.Log.Infof("receive end of piece: %#v %#v", piece, piece.PieceInfo) @@ -134,6 +123,7 @@ func (c *cdn) TriggerTask(ctx context.Context, task *Task) (*Peer, *rpcscheduler } // Update piece info + peer.Log.Infof("receive piece: %#v %#v", piece, piece.PieceInfo) peer.Pieces.Set(uint(piece.PieceInfo.PieceNum)) // TODO(244372610) CDN should set piece cost peer.AppendPieceCost(0) diff --git a/scheduler/resource/cdn_test.go b/scheduler/resource/cdn_test.go index 5965e5213..f9954fdb8 100644 --- a/scheduler/resource/cdn_test.go +++ b/scheduler/resource/cdn_test.go @@ -17,9 +17,11 @@ package resource import ( + "context" "encoding/json" "errors" "fmt" + "reflect" "testing" gomock "github.com/golang/mock/gomock" @@ -27,6 +29,7 @@ import ( "d7y.io/dragonfly/v2/internal/dfnet" "d7y.io/dragonfly/v2/manager/types" + rpcscheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler" "d7y.io/dragonfly/v2/scheduler/config" configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks" ) @@ -34,47 +37,13 @@ import ( func TestCDN_newCDN(t *testing.T) { tests := []struct { name string - mock func(dynconfig *configmocks.MockDynconfigInterfaceMockRecorder, hostManager *MockHostManagerMockRecorder) - expect func(t *testing.T, err error) + expect func(t *testing.T, cdn CDN) }{ { name: "new cdn", - mock: func(dynconfig *configmocks.MockDynconfigInterfaceMockRecorder, hostManager *MockHostManagerMockRecorder) { - gomock.InOrder( - dynconfig.Get().Return(&config.DynconfigData{ - CDNs: []*config.CDN{{ID: 1}}, - }, nil).Times(1), - hostManager.Store(gomock.Any()).Return().Times(1), - dynconfig.Register(gomock.Any()).Return().Times(1), - ) - }, - expect: func(t *testing.T, err error) { + expect: func(t *testing.T, cdn CDN) { assert := assert.New(t) - assert.NoError(err) - }, - }, - { - name: "new cdn failed because of dynconfig get error data", - mock: func(dynconfig *configmocks.MockDynconfigInterfaceMockRecorder, hostManager *MockHostManagerMockRecorder) { - dynconfig.Get().Return(nil, errors.New("foo")).Times(1) - }, - expect: func(t *testing.T, err error) { - assert := assert.New(t) - assert.EqualError(err, "foo") - }, - }, - { - name: "new cdn failed because of cdn list is empty", - mock: func(dynconfig *configmocks.MockDynconfigInterfaceMockRecorder, hostManager *MockHostManagerMockRecorder) { - gomock.InOrder( - dynconfig.Get().Return(&config.DynconfigData{ - CDNs: []*config.CDN{}, - }, nil).Times(1), - ) - }, - expect: func(t *testing.T, err error) { - assert := assert.New(t) - assert.EqualError(err, "address list of cdn is empty") + assert.Equal(reflect.TypeOf(cdn).Elem().Name(), "cdn") }, }, } @@ -83,13 +52,46 @@ func TestCDN_newCDN(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - dynconfig := configmocks.NewMockDynconfigInterface(ctl) hostManager := NewMockHostManager(ctl) peerManager := NewMockPeerManager(ctl) - tc.mock(dynconfig.EXPECT(), hostManager.EXPECT()) + client := NewMockCDNClient(ctl) - _, err := newCDN(peerManager, hostManager, dynconfig) - tc.expect(t, err) + tc.expect(t, newCDN(peerManager, hostManager, client)) + }) + } +} + +func TestCDN_TriggerTask(t *testing.T) { + tests := []struct { + name string + mock func(mc *MockCDNClientMockRecorder) + expect func(t *testing.T, peer *Peer, result *rpcscheduler.PeerResult, err error) + }{ + { + name: "start obtain seed stream failed", + mock: func(mc *MockCDNClientMockRecorder) { + mc.ObtainSeeds(gomock.Any(), gomock.Any()).Return(nil, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, peer *Peer, result *rpcscheduler.PeerResult, err error) { + assert := assert.New(t) + assert.EqualError(err, "foo") + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + hostManager := NewMockHostManager(ctl) + peerManager := NewMockPeerManager(ctl) + client := NewMockCDNClient(ctl) + tc.mock(client.EXPECT()) + + cdn := newCDN(peerManager, hostManager, client) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta) + peer, result, err := cdn.TriggerTask(context.Background(), mockTask) + tc.expect(t, peer, result, err) }) } } diff --git a/scheduler/resource/resource.go b/scheduler/resource/resource.go index eaedd5d6f..9008026c7 100644 --- a/scheduler/resource/resource.go +++ b/scheduler/resource/resource.go @@ -71,10 +71,11 @@ func New(cfg *config.Config, gc gc.GC, dynconfig config.DynconfigInterface, opts } // Initialize cdn interface - cdn, err := newCDN(peerManager, hostManager, dynconfig, opts...) + client, err := newCDNClient(dynconfig, hostManager, opts...) if err != nil { return nil, err } + cdn := newCDN(peerManager, hostManager, client) return &resource{ cdn: cdn,