Extract storage driver into peer tasks (#998)

* chore: extract storage instead load every time

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* fix: test

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* fix: gofmt

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2022-01-19 14:39:31 +08:00 committed by Gaius
parent c9b2c0acf8
commit c00f07c2de
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
13 changed files with 104 additions and 71 deletions

View File

@ -78,6 +78,8 @@ type peerTaskConductor struct {
storageManager storage.Manager
peerTaskManager *peerTaskManager
storage storage.TaskStorageDriver
// schedule options
schedulerOption config.SchedulerOption
schedulerClient schedulerclient.SchedulerClient
@ -307,6 +309,10 @@ func (pt *peerTaskConductor) GetTaskID() string {
return pt.taskID
}
func (pt *peerTaskConductor) GetStorage() storage.TaskStorageDriver {
return pt.storage
}
func (pt *peerTaskConductor) GetContentLength() int64 {
return pt.contentLength.Load()
}
@ -401,7 +407,8 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
pt.SetContentLength(l)
pt.SetTotalPieces(1)
ctx := pt.ctx
err := pt.peerTaskManager.storageManager.RegisterTask(ctx,
var err error
pt.storage, err = pt.peerTaskManager.storageManager.RegisterTask(ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.tinyData.PeerID,
@ -416,7 +423,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
pt.cancel(base.Code_ClientError, err.Error())
return
}
n, err := pt.peerTaskManager.storageManager.WritePiece(ctx,
n, err := pt.storage.WritePiece(ctx,
&storage.WritePieceRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.tinyData.PeerID,
@ -640,6 +647,7 @@ func (pt *peerTaskConductor) pullSinglePiece() {
}
request := &DownloadPieceRequest{
storage: pt.storage,
piece: pt.singlePiece.PieceInfo,
log: pt.Log(),
TaskID: pt.GetTaskID(),
@ -870,6 +878,7 @@ func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadP
pt.requestedPieces.Set(piece.PieceNum)
}
req := &DownloadPieceRequest{
storage: pt.storage,
piece: piece,
log: pt.Log(),
TaskID: pt.GetTaskID(),
@ -1080,9 +1089,9 @@ func (pt *peerTaskConductor) reportFailResult(request *DownloadPieceRequest, res
span.End()
}
func (pt *peerTaskConductor) InitStorage() error {
func (pt *peerTaskConductor) InitStorage() (err error) {
// prepare storage
err := pt.storageManager.RegisterTask(pt.ctx,
pt.storage, err = pt.storageManager.RegisterTask(pt.ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.GetPeerID(),
@ -1100,7 +1109,7 @@ func (pt *peerTaskConductor) InitStorage() error {
func (pt *peerTaskConductor) UpdateStorage() error {
// update storage
err := pt.storageManager.UpdateTask(pt.ctx,
err := pt.storage.UpdateTask(pt.ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
@ -1249,7 +1258,7 @@ func (pt *peerTaskConductor) fail() {
// Validate stores metadata and validates digest
func (pt *peerTaskConductor) Validate() error {
err := pt.peerTaskManager.storageManager.Store(pt.ctx,
err := pt.storage.Store(pt.ctx,
&storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.peerID,
@ -1266,7 +1275,7 @@ func (pt *peerTaskConductor) Validate() error {
if !pt.peerTaskManager.calculateDigest {
return nil
}
err = pt.storageManager.ValidateDigest(
err = pt.storage.ValidateDigest(
&storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
TaskID: pt.GetTaskID(),

View File

@ -60,6 +60,8 @@ type Task interface {
Context() context.Context
Log() *logger.SugaredLoggerOnWith
GetStorage() storage.TaskStorageDriver
GetPeerID() string
GetTaskID() string

View File

@ -9,6 +9,7 @@ import (
io "io"
reflect "reflect"
storage "d7y.io/dragonfly/v2/client/daemon/storage"
dflog "d7y.io/dragonfly/v2/internal/dflog"
gomock "github.com/golang/mock/gomock"
)
@ -187,6 +188,20 @@ func (mr *MockTaskMockRecorder) GetPieceMd5Sign() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieceMd5Sign", reflect.TypeOf((*MockTask)(nil).GetPieceMd5Sign))
}
// GetStorage mocks base method.
func (m *MockTask) GetStorage() storage.TaskStorageDriver {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetStorage")
ret0, _ := ret[0].(storage.TaskStorageDriver)
return ret0
}
// GetStorage indicates an expected call of GetStorage.
func (mr *MockTaskMockRecorder) GetStorage() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStorage", reflect.TypeOf((*MockTask)(nil).GetStorage))
}
// GetTaskID mocks base method.
func (m *MockTask) GetTaskID() string {
m.ctrl.T.Helper()

View File

@ -250,7 +250,6 @@ func setupMockManager(ctrl *gomock.Controller, ts *testSpec, opt componentsOptio
runningPeerTasks: sync.Map{},
pieceManager: &pieceManager{
calculateDigest: true,
storageManager: storageManager,
pieceDownloader: opt.pieceDownloader,
computePieceSize: func(contentLength int64) uint32 {
return opt.pieceSize

View File

@ -124,7 +124,7 @@ func (s *streamTask) Start(ctx context.Context) (io.ReadCloser, map[string]strin
}
func (s *streamTask) writeOnePiece(w io.Writer, pieceNum int32) (int64, error) {
pr, pc, err := s.peerTaskConductor.pieceManager.ReadPiece(s.ctx, &storage.ReadPieceRequest{
pr, pc, err := s.peerTaskConductor.storage.ReadPiece(s.ctx, &storage.ReadPieceRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: s.peerTaskConductor.peerID,
TaskID: s.peerTaskConductor.taskID,
@ -198,6 +198,7 @@ func (s *streamTask) writeToPipe(firstPiece *pieceInfo, pw *io.PipeWriter) {
}
return
case cur = <-s.pieceCh:
// FIXME check missing piece for non-block broker channel
continue
case <-s.peerTaskConductor.failCh:
ptError := fmt.Errorf("context done due to peer task fail: %d/%s",

View File

@ -237,7 +237,6 @@ func TestStreamPeerTask_BackSource_Partial_WithContentLength(t *testing.T) {
pm := &pieceManager{
calculateDigest: true,
storageManager: storageManager,
pieceDownloader: downloader,
computePieceSize: func(contentLength int64) uint32 {
return uint32(pieceSize)

View File

@ -25,6 +25,7 @@ import (
"strings"
"time"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/client/daemon/upload"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
@ -34,6 +35,7 @@ import (
type DownloadPieceRequest struct {
piece *base.PieceInfo
log *logger.SugaredLoggerOnWith
storage storage.TaskStorageDriver
TaskID string
PeerID string
DstPid string
@ -50,6 +52,7 @@ type DownloadPieceResult struct {
FinishTime int64
}
//go:generate mockgen -source piece_downloader.go -destination ../test/mock/peer/piece_downloader.go
type PieceDownloader interface {
DownloadPiece(context.Context, *DownloadPieceRequest) (io.Reader, io.Closer, error)
}

View File

@ -41,12 +41,10 @@ import (
type PieceManager interface {
DownloadSource(ctx context.Context, pt Task, request *scheduler.PeerTaskRequest) error
DownloadPiece(ctx context.Context, request *DownloadPieceRequest) (*DownloadPieceResult, error)
ReadPiece(ctx context.Context, req *storage.ReadPieceRequest) (io.Reader, io.Closer, error)
}
type pieceManager struct {
*rate.Limiter
storageManager storage.TaskStorageDriver
pieceDownloader PieceDownloader
computePieceSize func(contentLength int64) uint32
@ -57,7 +55,6 @@ var _ PieceManager = (*pieceManager)(nil)
func NewPieceManager(s storage.TaskStorageDriver, pieceDownloadTimeout time.Duration, opts ...func(*pieceManager)) (PieceManager, error) {
pm := &pieceManager{
storageManager: s,
computePieceSize: util.ComputePieceSize,
calculateDigest: true,
}
@ -169,7 +166,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec
},
}
result.Size, err = pm.storageManager.WritePiece(ctx, writePieceRequest)
result.Size, err = request.storage.WritePiece(ctx, writePieceRequest)
result.FinishTime = time.Now().UnixNano()
span.RecordError(err)
@ -181,10 +178,6 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec
return result, nil
}
func (pm *pieceManager) ReadPiece(ctx context.Context, req *storage.ReadPieceRequest) (io.Reader, io.Closer, error) {
return pm.storageManager.ReadPiece(ctx, req)
}
func (pm *pieceManager) processPieceFromSource(pt Task,
reader io.Reader, contentLength int64, pieceNum int32, pieceOffset uint64, pieceSize uint32, isLastPiece func(n int64) (int32, bool)) (
result *DownloadPieceResult, md5 string, err error) {
@ -210,7 +203,7 @@ func (pm *pieceManager) processPieceFromSource(pt Task,
reader = digestutils.NewDigestReader(pt.Log(), reader)
}
var n int64
result.Size, err = pm.storageManager.WritePiece(
result.Size, err = pt.GetStorage().WritePiece(
pt.Context(),
&storage.WritePieceRequest{
UnknownLength: unknownLength,
@ -270,7 +263,7 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc
if contentLength < 0 {
log.Warnf("can not get content length for %s", request.Url)
} else {
err = pm.storageManager.UpdateTask(ctx,
err = pt.GetStorage().UpdateTask(ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
@ -356,7 +349,7 @@ func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task,
if pieceNum == maxPieceNum-1 {
// last piece
err = pm.storageManager.UpdateTask(ctx,
err = pt.GetStorage().UpdateTask(ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
@ -430,7 +423,7 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
// last piece, piece size maybe 0
contentLength = int64(pieceSize)*int64(pieceNum) + result.Size
pt.SetTotalPieces(int32(math.Ceil(float64(contentLength) / float64(pieceSize))))
err = pm.storageManager.UpdateTask(ctx,
err = pt.GetStorage().UpdateTask(ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),

View File

@ -130,7 +130,10 @@ func TestPieceManager_DownloadSource(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
/********** prepare test start **********/
mockPeerTask := NewMockTask(ctrl)
var totalPieces = &atomic.Int32{}
var (
totalPieces = &atomic.Int32{}
taskStorage storage.TaskStorageDriver
)
mockPeerTask.EXPECT().SetContentLength(gomock.Any()).AnyTimes().DoAndReturn(
func(arg0 int64) error {
return nil
@ -151,6 +154,10 @@ func TestPieceManager_DownloadSource(t *testing.T) {
func() string {
return taskID
})
mockPeerTask.EXPECT().GetStorage().AnyTimes().DoAndReturn(
func() storage.TaskStorageDriver {
return taskStorage
})
mockPeerTask.EXPECT().AddTraffic(gomock.Any()).AnyTimes().DoAndReturn(func(int642 uint64) {})
mockPeerTask.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(*DownloadPieceRequest, *DownloadPieceResult, error) {
@ -166,7 +173,7 @@ func TestPieceManager_DownloadSource(t *testing.T) {
mockPeerTask.EXPECT().Log().AnyTimes().DoAndReturn(func() *logger.SugaredLoggerOnWith {
return logger.With("test case", tc.name)
})
err = storageManager.RegisterTask(context.Background(),
taskStorage, err = storageManager.RegisterTask(context.Background(),
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: mockPeerTask.GetPeerID(),

View File

@ -70,7 +70,7 @@ func TestLocalTaskStore_PutAndGetPiece_Simple(t *testing.T) {
var s = sm.(*storageManager)
err = s.CreateTask(
_, err = s.CreateTask(
RegisterTaskRequest{
CommonTaskRequest: CommonTaskRequest{
PeerID: peerID,
@ -243,7 +243,7 @@ func TestLocalTaskStore_PutAndGetPiece_Advance(t *testing.T) {
var s = sm.(*storageManager)
err = s.CreateTask(
_, err = s.CreateTask(
RegisterTaskRequest{
CommonTaskRequest: CommonTaskRequest{
PeerID: peerID,

View File

@ -43,6 +43,7 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc/base"
)
//go:generate mockgen -source storage_manager.go -destination ../test/mock/storage/manager.go
type TaskStorageDriver interface {
// WritePiece put a piece of a task to storage
WritePiece(ctx context.Context, req *WritePieceRequest) (int64, error)
@ -83,7 +84,7 @@ type Manager interface {
// KeepAlive tests if storage is used in given time duration
clientutil.KeepAlive
// RegisterTask registers a task in storage driver
RegisterTask(ctx context.Context, req RegisterTaskRequest) error
RegisterTask(ctx context.Context, req RegisterTaskRequest) (TaskStorageDriver, error)
// FindCompletedTask try to find a completed task for fast path
FindCompletedTask(taskID string) *ReusePeerTask
// CleanUp cleans all storage data
@ -191,27 +192,28 @@ func WithGCInterval(gcInterval time.Duration) func(*storageManager) error {
}
}
func (s *storageManager) RegisterTask(ctx context.Context, req RegisterTaskRequest) error {
if _, ok := s.LoadTask(
func (s *storageManager) RegisterTask(ctx context.Context, req RegisterTaskRequest) (TaskStorageDriver, error) {
ts, ok := s.LoadTask(
PeerTaskMetadata{
PeerID: req.PeerID,
TaskID: req.TaskID,
}); !ok {
})
if ok {
return ts, nil
}
// double check if task store exists
// if ok, just unlock and return
s.Lock()
defer s.Unlock()
if _, ok := s.LoadTask(
if ts, ok = s.LoadTask(
PeerTaskMetadata{
PeerID: req.PeerID,
TaskID: req.TaskID,
}); ok {
return nil
return ts, nil
}
// still not exist, create a new task store
return s.CreateTask(req)
}
return nil
}
func (s *storageManager) WritePiece(ctx context.Context, req *WritePieceRequest) (int64, error) {
@ -298,7 +300,7 @@ func (s *storageManager) UpdateTask(ctx context.Context, req *UpdateTaskRequest)
return t.(TaskStorageDriver).UpdateTask(ctx, req)
}
func (s *storageManager) CreateTask(req RegisterTaskRequest) error {
func (s *storageManager) CreateTask(req RegisterTaskRequest) (TaskStorageDriver, error) {
s.Keep()
logger.Debugf("init local task storage, peer id: %s, task id: %s", req.PeerID, req.TaskID)
@ -322,12 +324,12 @@ func (s *storageManager) CreateTask(req RegisterTaskRequest) error {
SugaredLoggerOnWith: logger.With("task", req.TaskID, "peer", req.PeerID, "component", "localTaskStore"),
}
if err := os.MkdirAll(t.dataDir, defaultDirectoryMode); err != nil && !os.IsExist(err) {
return err
return nil, err
}
t.touch()
metadata, err := os.OpenFile(t.metadataFilePath, os.O_CREATE|os.O_RDWR, defaultFileMode)
if err != nil {
return err
return nil, err
}
t.metadataFile = metadata
@ -341,20 +343,20 @@ func (s *storageManager) CreateTask(req RegisterTaskRequest) error {
t.DataFilePath = data
f, err := os.OpenFile(t.DataFilePath, os.O_CREATE|os.O_RDWR, defaultFileMode)
if err != nil {
return err
return nil, err
}
f.Close()
case string(config.AdvanceLocalTaskStoreStrategy):
dir, file := path.Split(req.Destination)
dirStat, err := os.Stat(dir)
if err != nil {
return err
return nil, err
}
t.DataFilePath = path.Join(dir, fmt.Sprintf(".%s.dfget.cache.%s", file, req.PeerID))
f, err := os.OpenFile(t.DataFilePath, os.O_CREATE|os.O_RDWR, defaultFileMode)
if err != nil {
return err
return nil, err
}
f.Close()
@ -367,7 +369,7 @@ func (s *storageManager) CreateTask(req RegisterTaskRequest) error {
// fallback to symbol link
if err := os.Symlink(t.DataFilePath, data); err != nil {
logger.Errorf("symbol link failed: %s", err)
return err
return nil, err
}
}
} else {
@ -375,7 +377,7 @@ func (s *storageManager) CreateTask(req RegisterTaskRequest) error {
// make symbol link for reload error gc
if err := os.Symlink(t.DataFilePath, data); err != nil {
logger.Errorf("symbol link failed: %s", err)
return err
return nil, err
}
}
}
@ -393,7 +395,7 @@ func (s *storageManager) CreateTask(req RegisterTaskRequest) error {
s.indexTask2PeerTask[req.TaskID] = []*localTaskStore{t}
}
s.indexRWMutex.Unlock()
return nil
return t, nil
}
func (s *storageManager) FindCompletedTask(taskID string) *ReusePeerTask {

View File

@ -1,10 +1,11 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: ../../../peer/piece_downloader.go
// Source: piece_downloader.go
// Package mock_peer is a generated GoMock package.
package mock_peer
import (
context "context"
io "io"
reflect "reflect"
@ -12,40 +13,41 @@ import (
gomock "github.com/golang/mock/gomock"
)
// MockPieceDownloader is a mock of PieceDownloader interface
// MockPieceDownloader is a mock of PieceDownloader interface.
type MockPieceDownloader struct {
ctrl *gomock.Controller
recorder *MockPieceDownloaderMockRecorder
}
// MockPieceDownloaderMockRecorder is the mock recorder for MockPieceDownloader
// MockPieceDownloaderMockRecorder is the mock recorder for MockPieceDownloader.
type MockPieceDownloaderMockRecorder struct {
mock *MockPieceDownloader
}
// NewMockPieceDownloader creates a new mock instance
// NewMockPieceDownloader creates a new mock instance.
func NewMockPieceDownloader(ctrl *gomock.Controller) *MockPieceDownloader {
mock := &MockPieceDownloader{ctrl: ctrl}
mock.recorder = &MockPieceDownloaderMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockPieceDownloader) EXPECT() *MockPieceDownloaderMockRecorder {
return m.recorder
}
// DownloadPiece mocks base method
func (m *MockPieceDownloader) DownloadPiece(arg0 *peer.DownloadPieceRequest) (io.ReadCloser, error) {
// DownloadPiece mocks base method.
func (m *MockPieceDownloader) DownloadPiece(arg0 context.Context, arg1 *peer.DownloadPieceRequest) (io.Reader, io.Closer, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DownloadPiece", arg0)
ret0, _ := ret[0].(io.ReadCloser)
ret1, _ := ret[1].(error)
return ret0, ret1
ret := m.ctrl.Call(m, "DownloadPiece", arg0, arg1)
ret0, _ := ret[0].(io.Reader)
ret1, _ := ret[1].(io.Closer)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// DownloadPiece indicates an expected call of DownloadPiece
func (mr *MockPieceDownloaderMockRecorder) DownloadPiece(arg0 interface{}) *gomock.Call {
// DownloadPiece indicates an expected call of DownloadPiece.
func (mr *MockPieceDownloaderMockRecorder) DownloadPiece(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadPiece", reflect.TypeOf((*MockPieceDownloader)(nil).DownloadPiece), arg0)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadPiece", reflect.TypeOf((*MockPieceDownloader)(nil).DownloadPiece), arg0, arg1)
}

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: ../../../storage/storage_manager.go
// Source: storage_manager.go
// Package mock_storage is a generated GoMock package.
package mock_storage
@ -356,11 +356,12 @@ func (mr *MockManagerMockRecorder) ReadPiece(ctx, req interface{}) *gomock.Call
}
// RegisterTask mocks base method.
func (m *MockManager) RegisterTask(ctx context.Context, req storage.RegisterTaskRequest) error {
func (m *MockManager) RegisterTask(ctx context.Context, req storage.RegisterTaskRequest) (storage.TaskStorageDriver, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RegisterTask", ctx, req)
ret0, _ := ret[0].(error)
return ret0
ret0, _ := ret[0].(storage.TaskStorageDriver)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// RegisterTask indicates an expected call of RegisterTask.