refactor: modernize the code by new features provided by golang stand… (#3963)

refactor: modernize the code by new features provided by golang standard library

Signed-off-by: chlins <chlins.zhang@gmail.com>
This commit is contained in:
Chlins Zhang 2025-04-17 15:48:42 +08:00 committed by GitHub
parent ea52a849cf
commit d0bfdd4e77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 74 additions and 98 deletions

View File

@ -875,7 +875,7 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
}
}
for i := 0; i < ptcCount; i++ {
for i := range ptcCount {
request := &schedulerv1.PeerTaskRequest{
Url: ts.url,
UrlMeta: urlMeta,
@ -920,7 +920,7 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
}
assert.True(success, "task should success")
for i := 0; i < 3; i++ {
for range 3 {
ptm.runningPeerTasks.Range(func(key, value any) bool {
noRunningTask = false
return false

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"maps"
"os"
"strings"
"time"
@ -323,9 +324,7 @@ func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, taskID s
attr[headers.ContentLength] = fmt.Sprintf("%d", length)
if exa != nil {
for k, v := range exa.Header {
attr[k] = v
}
maps.Copy(attr, exa.Header)
}
if reuseRange != nil {

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"maps"
"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel/trace"
@ -177,9 +178,7 @@ func (s *streamTask) Start(ctx context.Context) (io.ReadCloser, map[string]strin
return nil, attr, err
}
if exa != nil {
for k, v := range exa.Header {
attr[k] = v
}
maps.Copy(attr, exa.Header)
}
rc, err := s.peerTaskConductor.StorageManager.ReadAllPieces(
ctx,
@ -197,9 +196,7 @@ func (s *streamTask) Start(ctx context.Context) (io.ReadCloser, map[string]strin
return nil, attr, err
}
if exa != nil {
for k, v := range exa.Header {
attr[k] = v
}
maps.Copy(attr, exa.Header)
}
}
@ -373,9 +370,7 @@ pieceReady:
return nil, attr, err
}
if exa != nil {
for k, v := range exa.Header {
attr[k] = v
}
maps.Copy(attr, exa.Header)
}
attr[headers.ContentLength] = fmt.Sprintf("%d", s.peerTaskConductor.GetContentLength()-s.skipBytes)

View File

@ -107,9 +107,7 @@ func (i *intervalSleepReader) Read(p []byte) (n int, err error) {
return 0, io.EOF
}
end := i.offset + i.size
if end > len(i.data) {
end = len(i.data)
}
end = min(end, len(i.data))
n = copy(p, i.data[i.offset:end])
time.Sleep(i.interval)

View File

@ -66,9 +66,9 @@ func (pc *pieceTestManager) Run() {
// producer
go func() {
slice := make([]*DownloadPieceRequest, 0)
for i := 0; i < 4; i++ {
for range 4 {
for _, peer := range pc.peers {
for j := 0; j < pc.pieceNum; j++ {
for j := range pc.pieceNum {
slice = append(slice, &DownloadPieceRequest{
piece: &commonv1.PieceInfo{PieceNum: int32(j)},
DstPid: peer.id,

View File

@ -902,9 +902,7 @@ func newPieceGroup(i int32, reminderPieces int32, startPieceNum int32, minPieceC
// calculate piece group first and last range byte with parsedRange.Start
startByte := int64(start) * int64(pieceSize)
endByte := int64(end+1)*int64(pieceSize) - 1
if endByte > parsedRange.Length-1 {
endByte = parsedRange.Length - 1
}
endByte = min(endByte, parsedRange.Length-1)
// adjust by range start
startByte += parsedRange.Start
@ -935,7 +933,7 @@ func (pm *pieceManager) concurrentDownloadSourceByPiece(
downloadedPieceCount := atomic.NewInt32(startPieceNum)
for i := 0; i < con; i++ {
for i := range con {
go func(i int) {
for {
select {

View File

@ -365,7 +365,7 @@ func setupMembers(assert *assert.Assertions, memberCount int) []*peerExchange {
ports, err := freeport.GetFreePorts(2 * memberCount)
assert.Nil(err)
for i := 0; i < memberCount; i++ {
for i := range memberCount {
rpcPort, gossipPort := ports[2*i], ports[2*i+1]
testMembers = append(testMembers, &testMember{
idx: i,
@ -378,7 +378,7 @@ func setupMembers(assert *assert.Assertions, memberCount int) []*peerExchange {
})
}
for i := 0; i < memberCount; i++ {
for i := range memberCount {
peerExchangeServers = append(peerExchangeServers, setupMember(assert, testMembers[i], members))
}
return peerExchangeServers

View File

@ -573,7 +573,7 @@ func (s *server) startDownloadWorkers(
lock := sync.Mutex{}
sender := &sequentialResultSender{realSender: stream}
for i := 0; i < s.recursiveConcurrent; i++ {
for i := range s.recursiveConcurrent {
go func(i int) {
logKV := []any{
"recursiveDownloader", fmt.Sprintf("%d", i),

View File

@ -163,9 +163,7 @@ func TestLocalTaskStore_PutAndGetPiece(t *testing.T) {
for i := 0; i*pieceSize < len(testBytes); i++ {
start := i * pieceSize
end := start + pieceSize
if end > len(testBytes) {
end = len(testBytes)
}
end = min(end, len(testBytes))
pieces = append(pieces, struct {
index int
start int

View File

@ -29,6 +29,7 @@ import (
"os"
"path"
"path/filepath"
"slices"
"sort"
"strings"
"sync"
@ -726,11 +727,9 @@ func (s *storageManager) ReloadPersistentTask(gcCallback GCCallback) {
done := make(chan struct{})
reloadGoroutineCount := s.storeOption.ReloadGoroutineCount
if count < reloadGoroutineCount {
reloadGoroutineCount = count
}
reloadGoroutineCount = min(reloadGoroutineCount, count)
for i := 0; i < reloadGoroutineCount; i++ {
for range reloadGoroutineCount {
go func() {
for {
select {
@ -981,7 +980,7 @@ func (s *storageManager) TryGC() (bool, error) {
// remove reclaimed task in markedTasks
for i, k := range markedTasks {
if k.TaskID == key.TaskID && k.PeerID == key.PeerID {
markedTasks = append(markedTasks[:i], markedTasks[i+1:]...)
markedTasks = slices.Delete(markedTasks, i, i+1)
break
}
}

View File

@ -533,9 +533,9 @@ func benchmarkCacheGetConcurrent(b *testing.B, exp time.Duration) {
each := b.N / workers
wg.Add(workers)
b.StartTimer()
for i := 0; i < workers; i++ {
for range workers {
go func() {
for j := 0; j < each; j++ {
for range each {
tc.Get(v1)
}
wg.Done()
@ -555,9 +555,9 @@ func BenchmarkRWMutexMapGetConcurrent(b *testing.B) {
each := b.N / workers
wg.Add(workers)
b.StartTimer()
for i := 0; i < workers; i++ {
for range workers {
go func() {
for j := 0; j < each; j++ {
for range each {
mu.RLock()
_, _ = m[v1]
mu.RUnlock()
@ -584,7 +584,7 @@ func benchmarkCacheGetManyConcurrent(b *testing.B, exp time.Duration) {
n := 10000
tc := New(exp, 0)
keys := make([]string, n)
for i := 0; i < n; i++ {
for i := range n {
k := v1 + strconv.Itoa(i)
keys[i] = k
tc.Set(k, v2, DefaultExpiration)
@ -594,7 +594,7 @@ func benchmarkCacheGetManyConcurrent(b *testing.B, exp time.Duration) {
wg.Add(n)
for _, v := range keys {
go func(k string) {
for j := 0; j < each; j++ {
for range each {
tc.Get(k)
}
wg.Done()

View File

@ -74,7 +74,7 @@ func TestSequence(t *testing.T) {
}
}()
var values []int
for i := 0; i < len(tc.values); i++ {
for range len(tc.values) {
val, ok := q.Dequeue()
assert.True(ok, "dequeue should be ok")
values = append(values, *val)
@ -139,7 +139,7 @@ func TestRandom(t *testing.T) {
}
}()
var values []int
for i := 0; i < len(tc.values); i++ {
for range len(tc.values) {
val, ok := q.Dequeue()
assert.True(ok, "dequeue should be ok")
values = append(values, *val)
@ -158,7 +158,7 @@ func TestRandom(t *testing.T) {
func benchmarkRandom(b *testing.B, exponent int, input, output int) {
queue := NewRandom[int](exponent)
done := false
for i := 0; i < input; i++ {
for i := range input {
go func(i int) {
for {
if done {

View File

@ -70,7 +70,7 @@ func TestSafeSetAdd_Concurrent(t *testing.T) {
var wg sync.WaitGroup
wg.Add(len(nums))
for i := 0; i < len(nums); i++ {
for i := range len(nums) {
go func(i int) {
s.Add(i)
wg.Done()
@ -238,7 +238,7 @@ func TestSafeSetLen_Concurrent(t *testing.T) {
wg.Add(1)
go func() {
elems := s.Len()
for i := 0; i < N; i++ {
for range N {
newElems := s.Len()
if newElems < elems {
t.Errorf("Len shrunk from %v to %v", elems, newElems)
@ -247,7 +247,7 @@ func TestSafeSetLen_Concurrent(t *testing.T) {
wg.Done()
}()
for i := 0; i < N; i++ {
for range N {
s.Add(rand.Int())
}
wg.Wait()
@ -302,7 +302,7 @@ func TestSafeSetValues_Concurrent(t *testing.T) {
wg.Add(1)
go func() {
elems := s.Values()
for i := 0; i < N; i++ {
for range N {
newElems := s.Values()
if len(newElems) < len(elems) {
t.Errorf("Values shrunk from %v to %v", elems, newElems)
@ -311,7 +311,7 @@ func TestSafeSetValues_Concurrent(t *testing.T) {
wg.Done()
}()
for i := 0; i < N; i++ {
for i := range N {
s.Add(i)
}
wg.Wait()
@ -359,7 +359,7 @@ func TestSafeSetClear_Concurrent(t *testing.T) {
var wg sync.WaitGroup
wg.Add(len(nums))
for i := 0; i < len(nums); i++ {
for i := range len(nums) {
go func(i int) {
s.Add(i)
s.Clear()

View File

@ -33,7 +33,7 @@ func Run(ctx context.Context,
cancel bool
cause error
)
for i := 0; i < maxAttempts; i++ {
for i := range maxAttempts {
if i > 0 {
time.Sleep(math.RandBackoffSeconds(initBackoff, maxBackoff, 2.0, i))
}

View File

@ -16,6 +16,8 @@
package slices
import "slices"
// Contains returns true if elements is present in a collection.
func Contains[T comparable](s []T, els ...T) bool {
ss := make(map[T]struct{}, len(s))
@ -63,7 +65,7 @@ func RemoveDuplicates[T comparable](s []T) []T {
// Remove removes an element from a collection.
func Remove[T comparable](s []T, i int) []T {
return append(s[:i], s[i+1:]...)
return slices.Delete(s, i, i+1)
}
// Reverse reverses elements in a collection.

View File

@ -21,6 +21,7 @@ import (
"io"
"net/http"
"regexp"
"slices"
"strconv"
"time"
@ -285,10 +286,5 @@ func exportPassThroughHeader(header http.Header) map[string]string {
}
func detectTemporary(statusCode int) bool {
for _, code := range notTemporaryStatusCode {
if code == statusCode {
return false
}
}
return true
return !slices.Contains(notTemporaryStatusCode, statusCode)
}

View File

@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"net/url"
"slices"
"strconv"
"strings"
"sync"
@ -78,11 +79,10 @@ func (e UnexpectedStatusCodeError) Got() int {
// CheckResponseCode returns UnexpectedStatusError if the given response code is not
// one of the allowed status codes; otherwise nil.
func CheckResponseCode(respCode int, allowed []int) error {
for _, v := range allowed {
if respCode == v {
return nil
}
if slices.Contains(allowed, respCode) {
return nil
}
return UnexpectedStatusCodeError{allowed, respCode}
}

View File

@ -17,6 +17,7 @@
package strings
import (
"slices"
"strings"
)
@ -27,13 +28,7 @@ func IsBlank(s string) bool {
// Contains reports whether the string contains the element.
func Contains(slice []string, ele string) bool {
for _, one := range slice {
if one == ele {
return true
}
}
return false
return slices.Contains(slice, ele)
}
// Remove the duplicate elements in the string slice.

View File

@ -554,7 +554,7 @@ func TestPeerManager_RunGC(t *testing.T) {
assert := assert.New(t)
peerManager.Store(mockPeer)
mockPeer.FSM.SetState(PeerStateSucceeded)
for i := 0; i < PeerCountLimitForTask+1; i++ {
for range PeerCountLimitForTask + 1 {
peer := NewPeer(idgen.PeerIDV1("127.0.0.1"), mockTask, mockHost)
mockPeer.Task.StorePeer(peer)
}

View File

@ -192,11 +192,9 @@ func (e *evaluatorBase) calculateMultiElementAffinityScore(dst, src string) floa
elementLen = math.Min(len(dstElements), len(srcElements))
// Maximum element length is 5.
if elementLen > maxElementLen {
elementLen = maxElementLen
}
elementLen = min(elementLen, maxElementLen)
for i := 0; i < elementLen; i++ {
for i := range elementLen {
if !strings.EqualFold(dstElements[i], srcElements[i]) {
break
}

View File

@ -993,7 +993,7 @@ func TestEvaluatorBase_IsBadParent(t *testing.T) {
totalPieceCount: 1,
mock: func(peer *standard.Peer) {
peer.FSM.SetState(standard.PeerStateRunning)
for i := 0; i < 30; i++ {
for i := range 30 {
peer.AppendPieceCost(time.Duration(i))
}
peer.AppendPieceCost(50)
@ -1009,7 +1009,7 @@ func TestEvaluatorBase_IsBadParent(t *testing.T) {
totalPieceCount: 1,
mock: func(peer *standard.Peer) {
peer.FSM.SetState(standard.PeerStateRunning)
for i := 0; i < 30; i++ {
for i := range 30 {
peer.AppendPieceCost(time.Duration(i))
}
peer.AppendPieceCost(18)

View File

@ -1043,7 +1043,7 @@ func TestScheduling_FindCandidateParents(t *testing.T) {
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
var mockPeers []*standard.Peer
for i := 0; i < 11; i++ {
for i := range 11 {
mockHost := standard.NewHost(
idgen.HostIDV2("127.0.0.1", uuid.New().String(), false), mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
@ -1361,7 +1361,7 @@ func TestScheduling_FindParentAndCandidateParents(t *testing.T) {
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
var mockPeers []*standard.Peer
for i := 0; i < 11; i++ {
for i := range 11 {
mockHost := standard.NewHost(
idgen.HostIDV2("127.0.0.1", uuid.New().String(), false), mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
@ -1623,7 +1623,7 @@ func TestScheduling_FindSuccessParent(t *testing.T) {
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
var mockPeers []*standard.Peer
for i := 0; i < 11; i++ {
for i := range 11 {
mockHost := standard.NewHost(
idgen.HostIDV2("127.0.0.1", uuid.New().String(), false), mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)

View File

@ -35,7 +35,7 @@ var _ = Describe("Download with dfget and proxy", func() {
singleDfgetTest("dfget daemon download should be ok",
dragonflyNamespace, "component=dfdaemon",
"dragonfly-dfdaemon-", "dfdaemon")
for i := 0; i < 3; i++ {
for i := range 3 {
singleDfgetTest(
fmt.Sprintf("dfget daemon proxy-%d should be ok", i),
dragonflyE2ENamespace,

View File

@ -40,7 +40,7 @@ func init() {
var _ = AfterSuite(func() {
for _, server := range servers {
for i := 0; i < server.replicas; i++ {
for i := range server.replicas {
out, err := util.KubeCtlCommand("-n", server.namespace, "get", "pod", "-l", fmt.Sprintf("component=%s", server.name),
"-o", fmt.Sprintf("jsonpath='{.items[%d].metadata.name}'", i)).CombinedOutput()
if err != nil {

View File

@ -47,7 +47,7 @@ var _ = Describe("Preheat with manager", func() {
Context("preheat", func() {
It("preheat files should be ok", Label("preheat", "file"), func() {
seedPeerPods := make([]*util.PodExec, 3)
for i := 0; i < 3; i++ {
for i := range 3 {
seedPeerPods[i] = getSeedPeerExec(i)
}
fsPod := getFileServerExec()
@ -110,7 +110,7 @@ var _ = Describe("Preheat with manager", func() {
}
seedPeerPods := make([]*util.PodExec, 3)
for i := 0; i < 3; i++ {
for i := range 3 {
seedPeerPods[i] = getSeedPeerExec(i)
}
fsPod := getFileServerExec()
@ -177,7 +177,7 @@ var _ = Describe("Preheat with manager", func() {
}
seedPeerPods := make([]*util.PodExec, 3)
for i := 0; i < 3; i++ {
for i := range 3 {
seedPeerPods[i] = getSeedPeerExec(i)
}
fsPod := getFileServerExec()
@ -245,7 +245,7 @@ var _ = Describe("Preheat with manager", func() {
}
seedPeerPods := make([]*util.PodExec, 3)
for i := 0; i < 3; i++ {
for i := range 3 {
seedPeerPods[i] = getSeedPeerExec(i)
}
fsPod := getFileServerExec()

View File

@ -57,7 +57,7 @@ var _ = Describe("Download Concurrency", func() {
Expect(testFile.GetSha256()).To(Equal(sha256sum))
seedClientPods := make([]*util.PodExec, 3)
for i := 0; i < 3; i++ {
for i := range 3 {
seedClientPods[i], err = util.SeedClientExec(i)
fmt.Println(err)
Expect(err).NotTo(HaveOccurred())
@ -82,7 +82,7 @@ var _ = Describe("Download Concurrency", func() {
Expect(testFile.GetSha256()).To(Equal(sha256sum))
seedClientPods := make([]*util.PodExec, 3)
for i := 0; i < 3; i++ {
for i := range 3 {
seedClientPods[i], err = util.SeedClientExec(i)
fmt.Println(err)
Expect(err).NotTo(HaveOccurred())

View File

@ -71,7 +71,7 @@ var _ = Describe("Containerd with CRI support", func() {
time.Sleep(1 * time.Second)
seedClientPods := make([]*util.PodExec, 3)
for i := 0; i < 3; i++ {
for i := range 3 {
seedClientPods[i], err = util.SeedClientExec(i)
fmt.Println(err)
Expect(err).NotTo(HaveOccurred())

View File

@ -140,9 +140,7 @@ func parseRangeHeader(rangeHeader string, fileSize int64) (start, end int64) {
end = fileSize - 1
bytes, _ := strconv.ParseInt(parts[1], 10, 64)
start = fileSize - bytes
if start < 0 {
start = 0
}
start = max(start, 0)
case parts[1] == "": // N-: from N to end
start, _ = strconv.ParseInt(parts[0], 10, 64)

View File

@ -59,7 +59,7 @@ func (f *fileHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err != nil {
log.Printf("wrong X-Dragonfly-E2E-Status-Code format %s, error: %s", str, err)
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(fmt.Sprintf("wrong X-Dragonfly-E2E-Status-Code format")))
_, _ = w.Write(fmt.Appendf([]byte{}, "wrong X-Dragonfly-E2E-Status-Code format"))
return
}
w.WriteHeader(code)
@ -70,12 +70,12 @@ func (f *fileHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rgs, err := parseRange(s, math.MaxInt)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(fmt.Sprintf("wrong range format")))
_, _ = w.Write(fmt.Appendf([]byte{}, "wrong range format"))
return
}
if len(rgs) > 1 || len(rgs) == 0 {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(fmt.Sprintf("unsupport range format")))
_, _ = w.Write(fmt.Appendf([]byte{}, "unsupport range format"))
return
}
rg = &rgs[0]
@ -89,7 +89,7 @@ func (f *fileHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
filePath := path.Join(f.dir, upath)
if !strings.HasPrefix(filePath, f.dir) {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(fmt.Sprintf("target is not in correct dir")))
_, _ = w.Write(fmt.Appendf([]byte{}, "target is not in correct dir"))
return
}
fileInfo, err := os.Stat(filePath)
@ -99,19 +99,19 @@ func (f *fileHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} else {
w.WriteHeader(http.StatusInternalServerError)
}
_, _ = w.Write([]byte(fmt.Sprintf("%s", err)))
_, _ = w.Write(fmt.Appendf([]byte{}, "%s", err))
return
}
if fileInfo.IsDir() {
// todo list files
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(fmt.Sprintf("target is dir not file")))
_, _ = w.Write(fmt.Appendf([]byte{}, "target is dir not file"))
return
}
file, err := os.Open(filePath)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(fmt.Sprintf("%s", err)))
_, _ = w.Write(fmt.Appendf([]byte{}, "%s", err))
return
}
defer file.Close()

View File

@ -102,7 +102,7 @@ func main() {
wgCollect.Add(1)
go collect(wgCollect, resultCh)
for i := 0; i < con; i++ {
for range con {
wgProcess.Add(1)
go process(ctx, wgProcess, resultCh)
}