583 lines
14 KiB
Go
583 lines
14 KiB
Go
/*
|
|
* Copyright 2020 The Dragonfly Authors
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/md5"
|
|
"encoding/hex"
|
|
"io"
|
|
"math"
|
|
"math/rand"
|
|
"os"
|
|
"path"
|
|
"testing"
|
|
"time"
|
|
|
|
testifyassert "github.com/stretchr/testify/assert"
|
|
"go.uber.org/atomic"
|
|
|
|
commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
|
|
|
|
"d7y.io/dragonfly/v2/client/config"
|
|
"d7y.io/dragonfly/v2/client/daemon/test"
|
|
clientutil "d7y.io/dragonfly/v2/client/util"
|
|
logger "d7y.io/dragonfly/v2/internal/dflog"
|
|
"d7y.io/dragonfly/v2/internal/util"
|
|
"d7y.io/dragonfly/v2/pkg/digest"
|
|
"d7y.io/dragonfly/v2/pkg/net/http"
|
|
_ "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
|
|
)
|
|
|
|
func TestLocalTaskStore_PutAndGetPiece(t *testing.T) {
|
|
assert := testifyassert.New(t)
|
|
testBytes, err := os.ReadFile(test.File)
|
|
assert.Nil(err, "load test file")
|
|
md5Test, _ := calcFileMd5(test.File, nil)
|
|
|
|
dst := path.Join(test.DataDir, taskData+".copy")
|
|
defer os.Remove(dst)
|
|
|
|
testCases := []struct {
|
|
name string
|
|
strategy config.StoreStrategy
|
|
create func(s *storageManager, taskID, peerID string) (TaskStorageDriver, error)
|
|
}{
|
|
{
|
|
name: "normal",
|
|
strategy: config.SimpleLocalTaskStoreStrategy,
|
|
create: func(s *storageManager, taskID, peerID string) (TaskStorageDriver, error) {
|
|
return s.CreateTask(
|
|
&RegisterTaskRequest{
|
|
PeerTaskMetadata: PeerTaskMetadata{
|
|
PeerID: peerID,
|
|
TaskID: taskID,
|
|
},
|
|
DesiredLocation: dst,
|
|
ContentLength: int64(len(testBytes)),
|
|
})
|
|
},
|
|
},
|
|
{
|
|
name: "normal",
|
|
strategy: config.AdvanceLocalTaskStoreStrategy,
|
|
create: func(s *storageManager, taskID, peerID string) (TaskStorageDriver, error) {
|
|
return s.CreateTask(
|
|
&RegisterTaskRequest{
|
|
PeerTaskMetadata: PeerTaskMetadata{
|
|
PeerID: peerID,
|
|
TaskID: taskID,
|
|
},
|
|
DesiredLocation: dst,
|
|
ContentLength: int64(len(testBytes)),
|
|
})
|
|
},
|
|
},
|
|
{
|
|
name: "subtask",
|
|
strategy: config.AdvanceLocalTaskStoreStrategy,
|
|
create: func(s *storageManager, taskID, peerID string) (TaskStorageDriver, error) {
|
|
var (
|
|
parentPeerID = peerID + "-parent"
|
|
parentTaskID = taskID + "-parent"
|
|
)
|
|
|
|
_, err := s.CreateTask(
|
|
&RegisterTaskRequest{
|
|
PeerTaskMetadata: PeerTaskMetadata{
|
|
PeerID: parentPeerID,
|
|
TaskID: parentTaskID,
|
|
},
|
|
DesiredLocation: dst,
|
|
ContentLength: int64(len(testBytes)),
|
|
})
|
|
assert.Nil(err)
|
|
|
|
return s.RegisterSubTask(
|
|
context.Background(),
|
|
&RegisterSubTaskRequest{
|
|
Parent: PeerTaskMetadata{
|
|
PeerID: parentPeerID,
|
|
TaskID: parentTaskID,
|
|
},
|
|
SubTask: PeerTaskMetadata{
|
|
PeerID: peerID,
|
|
TaskID: taskID,
|
|
},
|
|
Range: &http.Range{
|
|
Start: 100,
|
|
Length: int64(len(testBytes)),
|
|
},
|
|
})
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name+"-"+string(tc.strategy), func(t *testing.T) {
|
|
var (
|
|
taskID = "task-d4bb1c273a9889fea14abd4651994fe8"
|
|
peerID = "peer-d4bb1c273a9889fea14abd4651994fe8"
|
|
pieceSize = 512
|
|
)
|
|
sm, err := NewStorageManager(config.SimpleLocalTaskStoreStrategy,
|
|
&config.StorageOption{
|
|
DataPath: path.Join(test.DataDir, "storage-test"),
|
|
TaskExpireTime: clientutil.Duration{
|
|
Duration: time.Minute,
|
|
},
|
|
}, func(request CommonTaskRequest) {
|
|
}, defaultDirectoryMode)
|
|
assert.Nil(err)
|
|
|
|
_, err = tc.create(sm.(*storageManager), taskID, peerID)
|
|
assert.Nil(err, "create task storage")
|
|
|
|
ts, ok := sm.(*storageManager).LoadTask(PeerTaskMetadata{
|
|
PeerID: peerID,
|
|
TaskID: taskID,
|
|
})
|
|
assert.True(ok, "load created task")
|
|
|
|
var pieces []struct {
|
|
index int
|
|
start int
|
|
end int // not contain in data
|
|
}
|
|
var piecesMd5 []string
|
|
for i := 0; i*pieceSize < len(testBytes); i++ {
|
|
start := i * pieceSize
|
|
end := start + pieceSize
|
|
end = min(end, len(testBytes))
|
|
pieces = append(pieces, struct {
|
|
index int
|
|
start int
|
|
end int
|
|
}{
|
|
index: i,
|
|
start: start,
|
|
end: end,
|
|
})
|
|
piecesMd5 = append(piecesMd5, calcPieceMd5(testBytes[start:end]))
|
|
}
|
|
rand.Seed(time.Now().UnixNano())
|
|
rand.Shuffle(len(pieces), func(i, j int) { pieces[i], pieces[j] = pieces[j], pieces[i] })
|
|
|
|
// random put all pieces
|
|
for _, p := range pieces {
|
|
_, err = ts.WritePiece(context.Background(), &WritePieceRequest{
|
|
PeerTaskMetadata: PeerTaskMetadata{
|
|
TaskID: taskID,
|
|
},
|
|
PieceMetadata: PieceMetadata{
|
|
Num: int32(p.index),
|
|
Md5: piecesMd5[p.index],
|
|
Offset: uint64(p.start),
|
|
Range: http.Range{
|
|
Start: int64(p.start),
|
|
Length: int64(p.end - p.start),
|
|
},
|
|
Style: commonv1.PieceStyle_PLAIN,
|
|
},
|
|
Reader: bytes.NewBuffer(testBytes[p.start:p.end]),
|
|
})
|
|
assert.Nil(err, "put piece")
|
|
}
|
|
|
|
if lts, ok := ts.(*localTaskStore); ok {
|
|
md5TaskData, _ := calcFileMd5(path.Join(lts.dataDir, taskData), nil)
|
|
assert.Equal(md5Test, md5TaskData, "md5 must match")
|
|
} else if lsts, ok := ts.(*localSubTaskStore); ok {
|
|
md5TaskData, _ := calcFileMd5(path.Join(lsts.parent.dataDir, taskData), lsts.Range)
|
|
assert.Equal(md5Test, md5TaskData, "md5 must match")
|
|
}
|
|
|
|
// shuffle again for get all pieces
|
|
rand.Shuffle(len(pieces), func(i, j int) { pieces[i], pieces[j] = pieces[j], pieces[i] })
|
|
for _, p := range pieces {
|
|
rd, cl, err := ts.ReadPiece(context.Background(), &ReadPieceRequest{
|
|
PeerTaskMetadata: PeerTaskMetadata{
|
|
TaskID: taskID,
|
|
},
|
|
PieceMetadata: PieceMetadata{
|
|
Num: int32(p.index),
|
|
Md5: piecesMd5[p.index],
|
|
Offset: uint64(p.start),
|
|
Range: http.Range{
|
|
Start: int64(p.start),
|
|
Length: int64(p.end - p.start),
|
|
},
|
|
Style: commonv1.PieceStyle_PLAIN,
|
|
},
|
|
})
|
|
assert.Nil(err, "get piece reader should be ok")
|
|
data, err := io.ReadAll(rd)
|
|
cl.Close()
|
|
assert.Nil(err, "read piece should be ok")
|
|
assert.Equal(p.end-p.start, len(data), "piece length should match")
|
|
assert.Equal(testBytes[p.start:p.end], data, "piece data should match")
|
|
}
|
|
|
|
rd, err := ts.ReadAllPieces(context.Background(), &ReadAllPiecesRequest{
|
|
PeerTaskMetadata: PeerTaskMetadata{
|
|
TaskID: taskID,
|
|
},
|
|
Range: nil,
|
|
})
|
|
assert.Nil(err, "get all pieces reader should be ok")
|
|
data, err := io.ReadAll(rd)
|
|
assert.Nil(err, "read all pieces should be ok")
|
|
rd.Close()
|
|
assert.Equal(testBytes, data, "all pieces data should match")
|
|
|
|
if lts, ok := ts.(*localTaskStore); ok {
|
|
lts.genMetadata(0, &WritePieceRequest{
|
|
NeedGenMetadata: func(n int64) (total int32, length int64, gen bool) {
|
|
return int32(len(pieces)), int64(len(testBytes)), true
|
|
},
|
|
})
|
|
assert.Equal(digest.SHA256FromStrings(piecesMd5...), lts.PieceMd5Sign)
|
|
|
|
// clean up test data
|
|
lts.lastAccess.Store(time.Now().Add(-1 * time.Hour).UnixNano())
|
|
ok = lts.CanReclaim()
|
|
assert.True(ok, "task should gc")
|
|
err = lts.Reclaim()
|
|
assert.Nil(err, "task gc")
|
|
} else if lsts, ok := ts.(*localSubTaskStore); ok {
|
|
lsts.genMetadata(0, &WritePieceRequest{
|
|
NeedGenMetadata: func(n int64) (total int32, length int64, gen bool) {
|
|
return int32(len(pieces)), int64(len(testBytes)), true
|
|
},
|
|
})
|
|
assert.Equal(digest.SHA256FromStrings(piecesMd5...), lsts.PieceMd5Sign)
|
|
|
|
// keep original offset
|
|
err = lsts.Store(context.Background(),
|
|
&StoreRequest{
|
|
CommonTaskRequest: CommonTaskRequest{
|
|
Destination: dst,
|
|
},
|
|
MetadataOnly: false,
|
|
StoreDataOnly: false,
|
|
TotalPieces: 0,
|
|
OriginalOffset: true,
|
|
})
|
|
assert.Nil(err)
|
|
md5Store, err := calcFileMd5(dst, lsts.Range)
|
|
assert.Nil(err)
|
|
assert.Equal(md5Test, md5Store)
|
|
|
|
// just ranged data
|
|
err = lsts.Store(context.Background(),
|
|
&StoreRequest{
|
|
CommonTaskRequest: CommonTaskRequest{
|
|
Destination: dst,
|
|
},
|
|
MetadataOnly: false,
|
|
StoreDataOnly: false,
|
|
TotalPieces: 0,
|
|
OriginalOffset: false,
|
|
})
|
|
assert.Nil(err)
|
|
md5Store, err = calcFileMd5(dst, nil)
|
|
assert.Nil(err)
|
|
assert.Equal(md5Test, md5Store)
|
|
|
|
// clean up test data
|
|
lsts.parent.lastAccess.Store(time.Now().Add(-1 * time.Hour).UnixNano())
|
|
lsts.parent.Done = true
|
|
|
|
ok = lsts.CanReclaim()
|
|
assert.True(ok, "sub task should gc")
|
|
err = lsts.Reclaim()
|
|
assert.Nil(err, "sub task gc")
|
|
|
|
ok = lsts.parent.CanReclaim()
|
|
assert.True(ok, "parent task should gc")
|
|
err = lsts.parent.Reclaim()
|
|
assert.Nil(err, "parent task gc")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestLocalTaskStore_StoreTaskData_Simple(t *testing.T) {
|
|
assert := testifyassert.New(t)
|
|
src := path.Join(test.DataDir, taskData)
|
|
dst := path.Join(test.DataDir, taskData+".copy")
|
|
meta := path.Join(test.DataDir, taskData+".meta")
|
|
// prepare test data
|
|
testData := []byte("test data")
|
|
err := os.WriteFile(src, testData, defaultFileMode)
|
|
assert.Nil(err, "prepare test data")
|
|
defer os.Remove(src)
|
|
defer os.Remove(dst)
|
|
defer os.Remove(meta)
|
|
|
|
data, err := os.OpenFile(src, os.O_RDWR, defaultFileMode)
|
|
assert.Nil(err, "open test data")
|
|
defer data.Close()
|
|
|
|
matadata, err := os.OpenFile(meta, os.O_RDWR|os.O_CREATE, defaultFileMode)
|
|
assert.Nil(err, "open test meta data")
|
|
matadata.Close()
|
|
ts := localTaskStore{
|
|
SugaredLoggerOnWith: logger.With("test", "localTaskStore"),
|
|
persistentMetadata: persistentMetadata{
|
|
TaskID: "test",
|
|
DataFilePath: src,
|
|
},
|
|
dataDir: test.DataDir,
|
|
metadataFilePath: meta,
|
|
}
|
|
ts.lastAccess.Store(time.Now().UnixNano())
|
|
err = ts.Store(context.Background(), &StoreRequest{
|
|
CommonTaskRequest: CommonTaskRequest{
|
|
TaskID: ts.TaskID,
|
|
Destination: dst,
|
|
},
|
|
})
|
|
assert.Nil(err, "store test data")
|
|
bs, err := os.ReadFile(dst)
|
|
assert.Nil(err, "read output test data")
|
|
assert.Equal(testData, bs, "data must match")
|
|
}
|
|
|
|
func calcFileMd5(filePath string, rg *http.Range) (string, error) {
|
|
var md5String string
|
|
file, err := os.Open(filePath)
|
|
if err != nil {
|
|
return md5String, err
|
|
}
|
|
defer file.Close()
|
|
|
|
var rd io.Reader = file
|
|
if rg != nil {
|
|
rd = io.LimitReader(file, rg.Length)
|
|
_, err = file.Seek(rg.Start, io.SeekStart)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
|
|
hash := md5.New()
|
|
if _, err := io.Copy(hash, rd); err != nil {
|
|
return md5String, err
|
|
}
|
|
hashInBytes := hash.Sum(nil)[:16]
|
|
md5String = hex.EncodeToString(hashInBytes)
|
|
return md5String, nil
|
|
}
|
|
|
|
func calcPieceMd5(data []byte) string {
|
|
hash := md5.New()
|
|
hash.Write(data)
|
|
return hex.EncodeToString(hash.Sum(nil)[:16])
|
|
}
|
|
|
|
func Test_computePiecePosition(t *testing.T) {
|
|
var testCases = []struct {
|
|
name string
|
|
total int64
|
|
rg *http.Range
|
|
start int32
|
|
end int32
|
|
piece uint32
|
|
}{
|
|
{
|
|
name: "0",
|
|
total: 500,
|
|
rg: &http.Range{
|
|
Start: 0,
|
|
Length: 10,
|
|
},
|
|
start: 0,
|
|
end: 0,
|
|
piece: 100,
|
|
},
|
|
{
|
|
name: "1",
|
|
total: 500,
|
|
rg: &http.Range{
|
|
Start: 30,
|
|
Length: 60,
|
|
},
|
|
start: 0,
|
|
end: 0,
|
|
piece: 100,
|
|
},
|
|
{
|
|
name: "2",
|
|
total: 500,
|
|
rg: &http.Range{
|
|
Start: 30,
|
|
Length: 130,
|
|
},
|
|
start: 0,
|
|
end: 1,
|
|
piece: 100,
|
|
},
|
|
{
|
|
name: "3",
|
|
total: 500,
|
|
rg: &http.Range{
|
|
Start: 350,
|
|
Length: 100,
|
|
},
|
|
start: 3,
|
|
end: 4,
|
|
piece: 100,
|
|
},
|
|
{
|
|
name: "4",
|
|
total: 500,
|
|
rg: &http.Range{
|
|
Start: 400,
|
|
Length: 100,
|
|
},
|
|
start: 4,
|
|
end: 4,
|
|
piece: 100,
|
|
},
|
|
{
|
|
name: "5",
|
|
total: 500,
|
|
rg: &http.Range{
|
|
Start: 0,
|
|
Length: 500,
|
|
},
|
|
start: 0,
|
|
end: 4,
|
|
piece: 100,
|
|
},
|
|
}
|
|
|
|
assert := testifyassert.New(t)
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
start, end := computePiecePosition(tc.total, tc.rg, func(length int64) uint32 {
|
|
return tc.piece
|
|
})
|
|
assert.Equal(tc.start, start)
|
|
assert.Equal(tc.end, end)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestLocalTaskStore_partialCompleted(t *testing.T) {
|
|
var testCases = []struct {
|
|
name string
|
|
ContentLength int64
|
|
ReadyPieceCount int32
|
|
Range http.Range
|
|
Found bool
|
|
}{
|
|
{
|
|
name: "range bytes=x-y partial completed",
|
|
ContentLength: 1024,
|
|
ReadyPieceCount: 1,
|
|
Range: http.Range{
|
|
Start: 1,
|
|
Length: 1023,
|
|
},
|
|
Found: true,
|
|
},
|
|
{
|
|
name: "range bytes=x-y no partial completed",
|
|
ContentLength: util.DefaultPieceSize * 10,
|
|
ReadyPieceCount: 1,
|
|
Range: http.Range{
|
|
Start: 1,
|
|
Length: util.DefaultPieceSize * 2,
|
|
},
|
|
Found: false,
|
|
},
|
|
{
|
|
name: "range bytes=x- no partial completed",
|
|
ContentLength: util.DefaultPieceSizeLimit * 1,
|
|
ReadyPieceCount: 1,
|
|
Range: http.Range{
|
|
Start: 1,
|
|
Length: math.MaxInt - 1,
|
|
},
|
|
Found: false,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
assert := testifyassert.New(t)
|
|
lts := &localTaskStore{
|
|
persistentMetadata: persistentMetadata{
|
|
ContentLength: tc.ContentLength,
|
|
Pieces: map[int32]PieceMetadata{},
|
|
},
|
|
}
|
|
for i := int32(0); i < tc.ReadyPieceCount; i++ {
|
|
lts.Pieces[i] = PieceMetadata{}
|
|
}
|
|
ok := lts.partialCompleted(&tc.Range)
|
|
assert.Equal(tc.Found, ok)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestLocalTaskStore_CanReclaim(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
lts *localTaskStore
|
|
expect bool
|
|
}{
|
|
{
|
|
name: "normal task",
|
|
lts: &localTaskStore{},
|
|
expect: false,
|
|
},
|
|
{
|
|
name: "invalid task",
|
|
lts: &localTaskStore{
|
|
invalid: *atomic.NewBool(true),
|
|
},
|
|
expect: true,
|
|
},
|
|
{
|
|
name: "never expire task",
|
|
lts: &localTaskStore{
|
|
expireTime: 0,
|
|
},
|
|
expect: false,
|
|
},
|
|
{
|
|
name: "expired task",
|
|
lts: &localTaskStore{
|
|
expireTime: time.Second,
|
|
lastAccess: *atomic.NewInt64(1),
|
|
},
|
|
expect: true,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
assert := testifyassert.New(t)
|
|
assert.Equal(tc.lts.CanReclaim(), tc.expect)
|
|
})
|
|
}
|
|
}
|