diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 8d39c619f..bdb9a0333 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -25,7 +25,7 @@ jobs: uses: actions/checkout@v2 - name: Golangci lint - uses: golangci/golangci-lint-action@v2 + uses: golangci/golangci-lint-action@v2.5.2 with: version: latest diff --git a/.gitignore b/.gitignore index 9e3464c2d..831aff192 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,32 @@ mysql !.vscode/launch.json !.vscode/extensions.json *.code-workspace + +### macOS ### +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk diff --git a/.golangci.yml b/.golangci.yml index c836c2555..b85097cbb 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -4,7 +4,7 @@ run: linters-settings: gocyclo: - min-complexity: 20 + min-complexity: 40 linters: disable-all: true diff --git a/cdnsystem/daemon/mgr/cdn/cache_data_mgr.go b/cdnsystem/daemon/mgr/cdn/cache_data_mgr.go index 83f38e1e4..d72d17676 100644 --- a/cdnsystem/daemon/mgr/cdn/cache_data_mgr.go +++ b/cdnsystem/daemon/mgr/cdn/cache_data_mgr.go @@ -48,11 +48,11 @@ func newCacheDataManager(storeMgr storage.Manager) *cacheDataManager { // writeFileMetaDataByTask stores the metadata of task by task to storage. func (mm *cacheDataManager) writeFileMetaDataByTask(ctx context.Context, task *types.SeedTask) (*storage.FileMetaData, error) { - mm.cacheLocker.Lock(task.TaskId, false) - defer mm.cacheLocker.UnLock(task.TaskId, false) + mm.cacheLocker.Lock(task.TaskID, false) + defer mm.cacheLocker.UnLock(task.TaskID, false) metaData := &storage.FileMetaData{ - TaskId: task.TaskId, - TaskURL: task.TaskUrl, + TaskID: task.TaskID, + TaskURL: task.TaskURL, PieceSize: task.PieceSize, SourceFileLen: task.SourceFileLength, AccessTime: getCurrentTimeMillisFunc(), @@ -60,7 +60,7 @@ func (mm *cacheDataManager) writeFileMetaDataByTask(ctx context.Context, task *t TotalPieceCount: task.PieceTotal, } - if err := mm.storage.WriteFileMetaData(ctx, task.TaskId, metaData); err != nil { + if err := mm.storage.WriteFileMetaData(ctx, task.TaskID, metaData); err != nil { return nil, errors.Wrapf(err, "failed to write file metadata to storage") } @@ -197,8 +197,8 @@ func (mm *cacheDataManager) readDownloadFile(ctx context.Context, taskID string) } func (mm *cacheDataManager) resetRepo(ctx context.Context, task *types.SeedTask) error { - mm.cacheLocker.Lock(task.TaskId, false) - defer mm.cacheLocker.UnLock(task.TaskId, false) + mm.cacheLocker.Lock(task.TaskID, false) + defer mm.cacheLocker.UnLock(task.TaskID, false) return mm.storage.ResetRepo(ctx, task) } diff --git a/cdnsystem/daemon/mgr/cdn/cache_detector.go b/cdnsystem/daemon/mgr/cdn/cache_detector.go index aa0599e37..165b9f811 100644 --- a/cdnsystem/daemon/mgr/cdn/cache_detector.go +++ b/cdnsystem/daemon/mgr/cdn/cache_detector.go @@ -65,7 +65,7 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.SeedTask) //} result, err := cd.doDetect(ctx, task) if err != nil { - logger.WithTaskID(task.TaskId).Infof("failed to detect cache, reset cache: %v", err) + logger.WithTaskID(task.TaskID).Infof("failed to detect cache, reset cache: %v", err) metaData, err := cd.resetCache(ctx, task) if err == nil { result = &cacheResult{ @@ -75,46 +75,46 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.SeedTask) } return result, err } - if err := cd.cacheDataManager.updateAccessTime(ctx, task.TaskId, getCurrentTimeMillisFunc()); err != nil { - logger.WithTaskID(task.TaskId).Warnf("failed to update task access time ") + if err := cd.cacheDataManager.updateAccessTime(ctx, task.TaskID, getCurrentTimeMillisFunc()); err != nil { + logger.WithTaskID(task.TaskID).Warnf("failed to update task access time ") } return result, nil } // detectCache the actual detect action which detects file metaData and pieces metaData of specific task func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask) (result *cacheResult, err error) { - fileMetaData, err := cd.cacheDataManager.readFileMetaData(ctx, task.TaskId) + fileMetaData, err := cd.cacheDataManager.readFileMetaData(ctx, task.TaskID) if err != nil { return nil, errors.Wrapf(err, "failed to read file meta data") } if err := checkSameFile(task, fileMetaData); err != nil { return nil, errors.Wrapf(err, "task does not match meta information of task file") } - expired, err := cd.resourceClient.IsExpired(task.Url, task.Header, fileMetaData.ExpireInfo) + expired, err := cd.resourceClient.IsExpired(task.URL, task.Header, fileMetaData.ExpireInfo) if err != nil { // 如果获取失败,则认为没有过期,防止打爆源 - logger.WithTaskID(task.TaskId).Errorf("failed to check if the task expired: %v", err) + logger.WithTaskID(task.TaskID).Errorf("failed to check if the task expired: %v", err) } - logger.WithTaskID(task.TaskId).Debugf("task expired result: %t", expired) + logger.WithTaskID(task.TaskID).Debugf("task expired result: %t", expired) if expired { - return nil, errors.Wrapf(cdnerrors.ErrResourceExpired, "url:%s, expireInfo:%+v", task.Url, + return nil, errors.Wrapf(cdnerrors.ErrResourceExpired, "url:%s, expireInfo:%+v", task.URL, fileMetaData.ExpireInfo) } // not expired if fileMetaData.Finish { // quickly detect the cache situation through the meta data - return cd.parseByReadMetaFile(ctx, task.TaskId, fileMetaData) + return cd.parseByReadMetaFile(ctx, task.TaskID, fileMetaData) } // check if the resource supports range request. if so, // detect the cache situation by reading piece meta and data file - supportRange, err := cd.resourceClient.IsSupportRange(task.Url, task.Header) + supportRange, err := cd.resourceClient.IsSupportRange(task.URL, task.Header) if err != nil { - return nil, errors.Wrapf(err, "failed to check if url(%s) supports range request", task.Url) + return nil, errors.Wrapf(err, "failed to check if url(%s) supports range request", task.URL) } if !supportRange { - return nil, errors.Wrapf(cdnerrors.ErrResourceNotSupportRangeRequest, "url:%s", task.Url) + return nil, errors.Wrapf(cdnerrors.ErrResourceNotSupportRangeRequest, "url:%s", task.URL) } - return cd.parseByReadFile(ctx, task.TaskId, fileMetaData) + return cd.parseByReadFile(ctx, task.TaskID, fileMetaData) } // parseByReadMetaFile detect cache by read meta and pieceMeta files of task diff --git a/cdnsystem/daemon/mgr/cdn/cache_detector_util.go b/cdnsystem/daemon/mgr/cdn/cache_detector_util.go index 591c97d28..5b983ebfd 100644 --- a/cdnsystem/daemon/mgr/cdn/cache_detector_util.go +++ b/cdnsystem/daemon/mgr/cdn/cache_detector_util.go @@ -42,12 +42,12 @@ func checkSameFile(task *types.SeedTask, metaData *storage.FileMetaData) error { task.PieceSize) } - if metaData.TaskId != task.TaskId { - return errors.Errorf("meta task TaskId(%s) is not equals with task TaskId(%s)", metaData.TaskId, task.TaskId) + if metaData.TaskID != task.TaskID { + return errors.Errorf("meta task TaskId(%s) is not equals with task TaskId(%s)", metaData.TaskID, task.TaskID) } - if metaData.TaskURL != task.TaskUrl { - return errors.Errorf("meta task taskUrl(%s) is not equals with task taskUrl(%s)", metaData.TaskURL, task.Url) + if metaData.TaskURL != task.TaskURL { + return errors.Errorf("meta task taskUrl(%s) is not equals with task taskUrl(%s)", metaData.TaskURL, task.URL) } if !stringutils.IsBlank(metaData.SourceRealMd5) && !stringutils.IsBlank(task.RequestMd5) && metaData.SourceRealMd5 != task.RequestMd5 { diff --git a/cdnsystem/daemon/mgr/cdn/cache_writer.go b/cdnsystem/daemon/mgr/cdn/cache_writer.go index 003acf97e..3649c84c8 100644 --- a/cdnsystem/daemon/mgr/cdn/cache_writer.go +++ b/cdnsystem/daemon/mgr/cdn/cache_writer.go @@ -28,7 +28,7 @@ import ( ) type protocolContent struct { - TaskId string + TaskID string pieceNum int32 pieceSize int32 pieceContent *bytes.Buffer @@ -80,13 +80,13 @@ func (cw *cacheWriter) startWriter(ctx context.Context, reader io.Reader, task * if int(pieceContLeft) <= n { bb.Write(buf[:pieceContLeft]) pc := &protocolContent{ - TaskId: task.TaskId, + TaskID: task.TaskID, pieceNum: curPieceNum, pieceSize: task.PieceSize, pieceContent: bb, } jobCh <- pc - logger.WithTaskID(task.TaskId).Debugf("send protocolContent to jobCh, pieceNum: %d", curPieceNum) + logger.WithTaskID(task.TaskID).Debugf("send protocolContent to jobCh, pieceNum: %d", curPieceNum) curPieceNum++ // write the data left to a new buffer @@ -105,16 +105,16 @@ func (cw *cacheWriter) startWriter(ctx context.Context, reader io.Reader, task * if err == io.EOF { if bb.Len() > 0 { pc := &protocolContent{ - TaskId: task.TaskId, + TaskID: task.TaskID, pieceNum: curPieceNum, pieceSize: task.PieceSize, pieceContent: bb, } jobCh <- pc curPieceNum++ - logger.WithTaskID(task.TaskId).Debugf("send the last protocolContent, pieceNum: %d", curPieceNum) + logger.WithTaskID(task.TaskID).Debugf("send the last protocolContent, pieceNum: %d", curPieceNum) } - logger.WithTaskID(task.TaskId).Info("send all protocolContents and wait for cdnWriter") + logger.WithTaskID(task.TaskID).Info("send all protocolContents and wait for cdnWriter") break } if err != nil { @@ -127,12 +127,12 @@ func (cw *cacheWriter) startWriter(ctx context.Context, reader io.Reader, task * close(jobCh) wg.Wait() - storageInfo, err := cw.cacheDataManager.statDownloadFile(ctx, task.TaskId) + storageInfo, err := cw.cacheDataManager.statDownloadFile(ctx, task.TaskID) if err != nil { return &downloadMetadata{backSourceLength: backSourceFileLength}, errors.Wrapf(err, "failed to get cdn file length") } - pieceMd5Sign, _, err := cw.cacheDataManager.getPieceMd5Sign(ctx, task.TaskId) + pieceMd5Sign, _, err := cw.cacheDataManager.getPieceMd5Sign(ctx, task.TaskID) if err != nil { return &downloadMetadata{backSourceLength: backSourceFileLength}, errors.Wrapf(err, "failed to get piece md5 sign") } diff --git a/cdnsystem/daemon/mgr/cdn/cache_writer_util.go b/cdnsystem/daemon/mgr/cdn/cache_writer_util.go index 7c47a01b4..b0112a77f 100644 --- a/cdnsystem/daemon/mgr/cdn/cache_writer_util.go +++ b/cdnsystem/daemon/mgr/cdn/cache_writer_util.go @@ -69,8 +69,8 @@ func (cw *cacheWriter) writerPool(ctx context.Context, wg *sync.WaitGroup, write pieceLen := originPieceLen // 经过处理后写到存储介质的真实长度 pieceStyle := types.PlainUnspecified - if err := cw.writeToFile(ctx, job.TaskId, waitToWriteContent, int64(job.pieceNum)*int64(job.pieceSize), pieceMd5); err != nil { - logger.WithTaskID(job.TaskId).Errorf("failed to write file, pieceNum %d: %v", job.pieceNum, err) + if err := cw.writeToFile(ctx, job.TaskID, waitToWriteContent, int64(job.pieceNum)*int64(job.pieceSize), pieceMd5); err != nil { + logger.WithTaskID(job.TaskID).Errorf("failed to write file, pieceNum %d: %v", job.pieceNum, err) // todo redo the job? continue } @@ -95,16 +95,16 @@ func (cw *cacheWriter) writerPool(ctx context.Context, wg *sync.WaitGroup, write go func(record *storage.PieceMetaRecord) { defer wg.Done() // todo 可以先塞入channel,然后启动单独goroutine顺序写文件 - if err := cw.cacheDataManager.appendPieceMetaData(ctx, job.TaskId, record); err != nil { - logger.WithTaskID(job.TaskId).Errorf("failed to append piece meta data to file:%v", err) + if err := cw.cacheDataManager.appendPieceMetaData(ctx, job.TaskID, record); err != nil { + logger.WithTaskID(job.TaskID).Errorf("failed to append piece meta data to file:%v", err) } }(pieceRecord) if cw.cdnReporter != nil { - if err := cw.cdnReporter.reportPieceMetaRecord(ctx, job.TaskId, pieceRecord, + if err := cw.cdnReporter.reportPieceMetaRecord(ctx, job.TaskID, pieceRecord, DownloaderReport); err != nil { // NOTE: should we do this job again? - logger.WithTaskID(job.TaskId).Errorf("failed to report piece status, pieceNum %d pieceMetaRecord %s: %v", job.pieceNum, pieceRecord, err) + logger.WithTaskID(job.TaskID).Errorf("failed to report piece status, pieceNum %d pieceMetaRecord %s: %v", job.pieceNum, pieceRecord, err) continue } } diff --git a/cdnsystem/daemon/mgr/cdn/downloader.go b/cdnsystem/daemon/mgr/cdn/downloader.go index afeeeb2f4..ceff6c93c 100644 --- a/cdnsystem/daemon/mgr/cdn/downloader.go +++ b/cdnsystem/daemon/mgr/cdn/downloader.go @@ -41,7 +41,7 @@ func (cm *Manager) download(task *types.SeedTask, detectResult *cacheResult) (io headers[RangeHeaderName] = fmt.Sprintf("bytes=%s", breakRange) } } - logger.WithTaskID(task.TaskId).Infof("start download url %s at range:%d-%d: with header: %+v", task.Url, detectResult.breakPoint, + logger.WithTaskID(task.TaskID).Infof("start download url %s at range:%d-%d: with header: %+v", task.URL, detectResult.breakPoint, task.SourceFileLength, task.Header) - return cm.resourceClient.Download(task.Url, headers) + return cm.resourceClient.Download(task.URL, headers) } diff --git a/cdnsystem/daemon/mgr/cdn/manager.go b/cdnsystem/daemon/mgr/cdn/manager.go index ca29de285..cc2a8aef4 100644 --- a/cdnsystem/daemon/mgr/cdn/manager.go +++ b/cdnsystem/daemon/mgr/cdn/manager.go @@ -81,38 +81,38 @@ func NewManager(cfg *config.Config, cacheStore storage.Manager, progressMgr mgr. func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTask *types.SeedTask, err error) { // obtain taskId write lock - cm.cdnLocker.Lock(task.TaskId, false) - defer cm.cdnLocker.UnLock(task.TaskId, false) + cm.cdnLocker.Lock(task.TaskID, false) + defer cm.cdnLocker.UnLock(task.TaskID, false) // first: detect Cache detectResult, err := cm.detector.detectCache(ctx, task) if err != nil { return getUpdateTaskInfoWithStatusOnly(types.TaskInfoCdnStatusFailed), errors.Wrapf(err, "failed to detect cache") } - logger.WithTaskID(task.TaskId).Debugf("detects cache result: %+v", detectResult) + logger.WithTaskID(task.TaskID).Debugf("detects cache result: %+v", detectResult) // second: report detect result - err = cm.cdnReporter.reportCache(ctx, task.TaskId, detectResult) + err = cm.cdnReporter.reportCache(ctx, task.TaskID, detectResult) if err != nil { - logger.WithTaskID(task.TaskId).Errorf("failed to report cache, reset detectResult:%v", err) + logger.WithTaskID(task.TaskID).Errorf("failed to report cache, reset detectResult:%v", err) } // full cache if detectResult.breakPoint == -1 { - logger.WithTaskID(task.TaskId).Infof("cache full hit on local") + logger.WithTaskID(task.TaskID).Infof("cache full hit on local") return getUpdateTaskInfo(types.TaskInfoCdnStatusSuccess, detectResult.fileMetaData.SourceRealMd5, detectResult.fileMetaData.PieceMd5Sign, detectResult.fileMetaData.SourceFileLen, detectResult.fileMetaData.CdnFileLength), nil } - server.StatSeedStart(task.TaskId, task.Url) + server.StatSeedStart(task.TaskID, task.URL) start := time.Now() // third: start to download the source file body, expireInfo, err := cm.download(task, detectResult) // download fail if err != nil { - server.StatSeedFinish(task.TaskId, task.Url, false, err, start.Nanosecond(), time.Now().Nanosecond(), 0, 0) + server.StatSeedFinish(task.TaskID, task.URL, false, err, start.Nanosecond(), time.Now().Nanosecond(), 0, 0) return getUpdateTaskInfoWithStatusOnly(types.TaskInfoCdnStatusSourceError), err } defer body.Close() //update Expire info - cm.updateExpireInfo(ctx, task.TaskId, expireInfo) + cm.updateExpireInfo(ctx, task.TaskID, expireInfo) fileMd5 := md5.New() if detectResult.fileMd5 != nil { fileMd5 = detectResult.fileMd5 @@ -121,12 +121,12 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa // forth: write to storage downloadMetadata, err := cm.writer.startWriter(ctx, reader, task, detectResult) if err != nil { - server.StatSeedFinish(task.TaskId, task.Url, false, err, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength, + server.StatSeedFinish(task.TaskID, task.URL, false, err, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength, downloadMetadata.realSourceFileLength) - logger.WithTaskID(task.TaskId).Errorf("failed to write for task: %v", err) + logger.WithTaskID(task.TaskID).Errorf("failed to write for task: %v", err) return getUpdateTaskInfoWithStatusOnly(types.TaskInfoCdnStatusFailed), err } - server.StatSeedFinish(task.TaskId, task.Url, true, nil, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength, + server.StatSeedFinish(task.TaskID, task.URL, true, nil, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength, downloadMetadata.realSourceFileLength) sourceMD5 := reader.Md5() // fifth: handle CDN result @@ -147,7 +147,7 @@ func (cm *Manager) Delete(ctx context.Context, taskID string) error { } func (cm *Manager) handleCDNResult(ctx context.Context, task *types.SeedTask, sourceMd5 string, downloadMetadata *downloadMetadata) (bool, error) { - logger.WithTaskID(task.TaskId).Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata) + logger.WithTaskID(task.TaskID).Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata) var isSuccess = true var errorMsg string // check md5 @@ -165,7 +165,7 @@ func (cm *Manager) handleCDNResult(ctx context.Context, task *types.SeedTask, so isSuccess = false } if !stringutils.IsBlank(errorMsg) { - logger.WithTaskID(task.TaskId).Error(errorMsg) + logger.WithTaskID(task.TaskID).Error(errorMsg) } sourceFileLen := task.SourceFileLength if isSuccess && task.SourceFileLength <= 0 { @@ -177,7 +177,7 @@ func (cm *Manager) handleCDNResult(ctx context.Context, task *types.SeedTask, so if !isSuccess { cdnFileLength = 0 } - if err := cm.cacheDataManager.updateStatusAndResult(ctx, task.TaskId, &storage.FileMetaData{ + if err := cm.cacheDataManager.updateStatusAndResult(ctx, task.TaskID, &storage.FileMetaData{ Finish: true, Success: isSuccess, SourceRealMd5: sourceMd5, @@ -193,7 +193,7 @@ func (cm *Manager) handleCDNResult(ctx context.Context, task *types.SeedTask, so return false, errors.New(errorMsg) } - logger.WithTaskID(task.TaskId).Infof("success to get task, downloadMetadata:%+v realMd5: %s", downloadMetadata, sourceMd5) + logger.WithTaskID(task.TaskID).Infof("success to get task, downloadMetadata:%+v realMd5: %s", downloadMetadata, sourceMd5) return true, nil } diff --git a/cdnsystem/daemon/mgr/cdn/storage/disk/disk.go b/cdnsystem/daemon/mgr/cdn/storage/disk/disk.go index 64fd0a6af..d32f1e902 100644 --- a/cdnsystem/daemon/mgr/cdn/storage/disk/disk.go +++ b/cdnsystem/daemon/mgr/cdn/storage/disk/disk.go @@ -243,7 +243,7 @@ func (s *diskStorageMgr) DeleteTask(ctx context.Context, taskID string) error { } func (s *diskStorageMgr) ResetRepo(ctx context.Context, task *types.SeedTask) error { - return s.DeleteTask(ctx, task.TaskId) + return s.DeleteTask(ctx, task.TaskID) } func init() { diff --git a/cdnsystem/daemon/mgr/cdn/storage/hybrid/hybrid.go b/cdnsystem/daemon/mgr/cdn/storage/hybrid/hybrid.go index ac67ddcaf..cb39c9bb3 100644 --- a/cdnsystem/daemon/mgr/cdn/storage/hybrid/hybrid.go +++ b/cdnsystem/daemon/mgr/cdn/storage/hybrid/hybrid.go @@ -251,13 +251,13 @@ func (h *hybridStorageMgr) CreateUploadLink(ctx context.Context, taskID string) } func (h *hybridStorageMgr) ResetRepo(ctx context.Context, task *types.SeedTask) error { - if err := h.deleteTaskFiles(ctx, task.TaskId, false, true); err != nil { - logger.WithTaskID(task.TaskId).Errorf("reset repo: failed to delete task files: %v", err) + if err := h.deleteTaskFiles(ctx, task.TaskID, false, true); err != nil { + logger.WithTaskID(task.TaskID).Errorf("reset repo: failed to delete task files: %v", err) } // 判断是否有足够空间存放 - shmPath, err := h.tryShmSpace(ctx, task.Url, task.TaskId, task.SourceFileLength) + shmPath, err := h.tryShmSpace(ctx, task.URL, task.TaskID, task.SourceFileLength) if err == nil { - return fileutils.SymbolicLink(shmPath, h.diskStore.GetPath(storage.GetDownloadRaw(task.TaskId))) + return fileutils.SymbolicLink(shmPath, h.diskStore.GetPath(storage.GetDownloadRaw(task.TaskID))) } return nil } diff --git a/cdnsystem/daemon/mgr/cdn/storage/storage_gc.go b/cdnsystem/daemon/mgr/cdn/storage/storage_gc.go index 5d93921dd..92cdbb79c 100644 --- a/cdnsystem/daemon/mgr/cdn/storage/storage_gc.go +++ b/cdnsystem/daemon/mgr/cdn/storage/storage_gc.go @@ -144,7 +144,7 @@ func (cleaner *Cleaner) sortInert(ctx context.Context, gapTasks, intervalTasks * if metaData.Interval > 0 && gap <= metaData.Interval+(int64(cleaner.Cfg.IntervalThreshold.Seconds())*int64(time.Millisecond)) { - info, err := cleaner.StorageMgr.StatDownloadFile(ctx, metaData.TaskId) + info, err := cleaner.StorageMgr.StatDownloadFile(ctx, metaData.TaskID) if err != nil { return err } @@ -154,7 +154,7 @@ func (cleaner *Cleaner) sortInert(ctx context.Context, gapTasks, intervalTasks * v = make([]string, 0) } tasks := v.([]string) - tasks = append(tasks, metaData.TaskId) + tasks = append(tasks, metaData.TaskID) intervalTasks.Put(info.Size, tasks) return nil } @@ -164,7 +164,7 @@ func (cleaner *Cleaner) sortInert(ctx context.Context, gapTasks, intervalTasks * v = make([]string, 0) } tasks := v.([]string) - tasks = append(tasks, metaData.TaskId) + tasks = append(tasks, metaData.TaskID) gapTasks.Put(gap, tasks) return nil } diff --git a/cdnsystem/daemon/mgr/cdn/storage/storage_mgr.go b/cdnsystem/daemon/mgr/cdn/storage/storage_mgr.go index 4d6757a3d..61e9d8439 100644 --- a/cdnsystem/daemon/mgr/cdn/storage/storage_mgr.go +++ b/cdnsystem/daemon/mgr/cdn/storage/storage_mgr.go @@ -65,7 +65,7 @@ type BuildOptions interface { // fileMetaData type FileMetaData struct { - TaskId string `json:"taskId"` + TaskID string `json:"taskId"` TaskURL string `json:"taskUrl"` PieceSize int32 `json:"pieceSize"` SourceFileLen int64 `json:"sourceFileLen"` diff --git a/cdnsystem/daemon/mgr/task/manager.go b/cdnsystem/daemon/mgr/task/manager.go index fffc70bac..c242d22b2 100644 --- a/cdnsystem/daemon/mgr/task/manager.go +++ b/cdnsystem/daemon/mgr/task/manager.go @@ -72,48 +72,48 @@ func NewManager(cfg *config.Config, cdnMgr mgr.CDNMgr, progressMgr mgr.SeedProgr func (tm *Manager) Register(ctx context.Context, req *types.TaskRegisterRequest) (pieceChan <-chan *types.SeedPiece, err error) { task, err := tm.addOrUpdateTask(ctx, req) if err != nil { - logger.WithTaskID(req.TaskId).Infof("failed to add or update task with req: %+v: %v", req, err) + logger.WithTaskID(req.TaskID).Infof("failed to add or update task with req: %+v: %v", req, err) return nil, err } - logger.WithTaskID(task.TaskId).Debugf("success get task info: %+v", task) + logger.WithTaskID(task.TaskID).Debugf("success get task info: %+v", task) // update accessTime for taskId - if err := tm.accessTimeMap.Add(task.TaskId, time.Now()); err != nil { - logger.WithTaskID(task.TaskId).Warnf("failed to update accessTime: %v", err) + if err := tm.accessTimeMap.Add(task.TaskID, time.Now()); err != nil { + logger.WithTaskID(task.TaskID).Warnf("failed to update accessTime: %v", err) } // trigger CDN if err := tm.triggerCdnSyncAction(ctx, task); err != nil { return nil, errors.Wrapf(err, "failed to trigger cdn") } - logger.WithTaskID(task.TaskId).Infof("successfully trigger cdn sync action") + logger.WithTaskID(task.TaskID).Infof("successfully trigger cdn sync action") // watch seed progress - return tm.progressMgr.WatchSeedProgress(ctx, task.TaskId) + return tm.progressMgr.WatchSeedProgress(ctx, task.TaskID) } // triggerCdnSyncAction func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.SeedTask) error { - synclock.Lock(task.TaskId, true) + synclock.Lock(task.TaskID, true) if !task.IsFrozen() { - logger.WithTaskID(task.TaskId).Infof("seedTask is running or has been downloaded successfully, status:%s", task.CdnStatus) - synclock.UnLock(task.TaskId, true) + logger.WithTaskID(task.TaskID).Infof("seedTask is running or has been downloaded successfully, status:%s", task.CdnStatus) + synclock.UnLock(task.TaskID, true) return nil } - synclock.UnLock(task.TaskId, true) + synclock.UnLock(task.TaskID, true) - synclock.Lock(task.TaskId, false) - defer synclock.UnLock(task.TaskId, false) + synclock.Lock(task.TaskID, false) + defer synclock.UnLock(task.TaskID, false) // reconfirm if !task.IsFrozen() { - logger.WithTaskID(task.TaskId).Infof("reconfirm find seedTask is running or has been downloaded successfully, status:%s", task.CdnStatus) + logger.WithTaskID(task.TaskID).Infof("reconfirm find seedTask is running or has been downloaded successfully, status:%s", task.CdnStatus) return nil } if task.IsWait() { - tm.progressMgr.InitSeedProgress(ctx, task.TaskId) - logger.WithTaskID(task.TaskId).Infof("successfully init seed progress for task") + tm.progressMgr.InitSeedProgress(ctx, task.TaskID) + logger.WithTaskID(task.TaskID).Infof("successfully init seed progress for task") } - updatedTask, err := tm.updateTask(task.TaskId, &types.SeedTask{ + updatedTask, err := tm.updateTask(task.TaskID, &types.SeedTask{ CdnStatus: types.TaskInfoCdnStatusRunning, }) if err != nil { @@ -123,15 +123,14 @@ func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.SeedTas go func() { updateTaskInfo, err := tm.cdnMgr.TriggerCDN(ctx, task) if err != nil { - logger.WithTaskID(task.TaskId).Errorf("trigger cdn get error: %v", err) + logger.WithTaskID(task.TaskID).Errorf("trigger cdn get error: %v", err) } - go tm.progressMgr.PublishTask(ctx, task.TaskId, updateTaskInfo) - updatedTask, err = tm.updateTask(task.TaskId, updateTaskInfo) + go tm.progressMgr.PublishTask(ctx, task.TaskID, updateTaskInfo) + updatedTask, err = tm.updateTask(task.TaskID, updateTaskInfo) if err != nil { - logger.WithTaskID(task.TaskId).Errorf("failed to update task:%v", err) - } else { - logger.WithTaskID(task.TaskId).Infof("successfully update task cdn updatedTask:%+v", updatedTask) + logger.WithTaskID(task.TaskID).Errorf("failed to update task:%v", err) } + logger.WithTaskID(task.TaskID).Infof("successfully update task cdn updatedTask:%+v", updatedTask) }() return nil } diff --git a/cdnsystem/daemon/mgr/task/manager_util.go b/cdnsystem/daemon/mgr/task/manager_util.go index a2a3c2484..8b5162321 100644 --- a/cdnsystem/daemon/mgr/task/manager_util.go +++ b/cdnsystem/daemon/mgr/task/manager_util.go @@ -40,7 +40,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis if request.Filter != nil { taskURL = urlutils.FilterURLParam(request.URL, request.Filter) } - taskID := request.TaskId + taskID := request.TaskID synclock.Lock(taskID, false) defer synclock.UnLock(taskID, false) if key, err := tm.taskURLUnReachableStore.Get(taskID); err == nil { @@ -54,11 +54,11 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis } var task *types.SeedTask newTask := &types.SeedTask{ - TaskId: taskID, + TaskID: taskID, Header: request.Header, RequestMd5: request.Md5, - Url: request.URL, - TaskUrl: taskURL, + URL: request.URL, + TaskURL: taskURL, CdnStatus: types.TaskInfoCdnStatusWaiting, SourceFileLength: IllegalSourceFileLen, } @@ -80,9 +80,9 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis } // get sourceContentLength with req.Header - sourceFileLength, err := tm.resourceClient.GetContentLength(task.Url, request.Header) + sourceFileLength, err := tm.resourceClient.GetContentLength(task.URL, request.Header) if err != nil { - logger.WithTaskID(task.TaskId).Errorf("failed to get url (%s) content length: %v", task.Url, err) + logger.WithTaskID(task.TaskID).Errorf("failed to get url (%s) content length: %v", task.URL, err) if cdnerrors.IsURLNotReachable(err) { tm.taskURLUnReachableStore.Add(taskID, time.Now()) @@ -103,7 +103,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis pieceSize := computePieceSize(task.SourceFileLength) task.PieceSize = pieceSize } - tm.taskStore.Add(task.TaskId, task) + tm.taskStore.Add(task.TaskID, task) logger.Debugf("success add task:%+v into taskStore", task) return task, nil } @@ -169,7 +169,7 @@ func isSameTask(task1, task2 *types.SeedTask) bool { if task1 == task2 { return true } - if task1.TaskUrl != task2.TaskUrl { + if task1.TaskURL != task2.TaskURL { return false } diff --git a/cdnsystem/server/server.go b/cdnsystem/server/server.go index dba51f694..23f00dc33 100644 --- a/cdnsystem/server/server.go +++ b/cdnsystem/server/server.go @@ -30,11 +30,17 @@ import ( "d7y.io/dragonfly/v2/cdnsystem/plugins" "d7y.io/dragonfly/v2/cdnsystem/server/service" "d7y.io/dragonfly/v2/cdnsystem/source" + + // Init http client _ "d7y.io/dragonfly/v2/cdnsystem/source/httpprotocol" + + // Init OSS client _ "d7y.io/dragonfly/v2/cdnsystem/source/ossprotocol" "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/server" + + // Server registered to grpc _ "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/server" "d7y.io/dragonfly/v2/pkg/rpc/manager" configServer "d7y.io/dragonfly/v2/pkg/rpc/manager/client" diff --git a/cdnsystem/server/service/cdn_seed_server.go b/cdnsystem/server/service/cdn_seed_server.go index e9e7d0814..acf3b6afb 100644 --- a/cdnsystem/server/service/cdn_seed_server.go +++ b/cdnsystem/server/service/cdn_seed_server.go @@ -72,7 +72,7 @@ func constructRegisterRequest(req *cdnsystem.SeedRequest) (*types.TaskRegisterRe Header: header, URL: req.Url, Md5: header["md5"], - TaskId: req.TaskId, + TaskID: req.TaskId, Filter: strings.Split(req.Filter, "&"), }, nil } @@ -168,11 +168,11 @@ func (css *CdnSeedServer) GetPieceTasks(ctx context.Context, req *base.PieceTask return nil, dferrors.Newf(dfcodes.CdnError, "failed to get task(%s) from cdn: %v", req.TaskId, err) } if task.IsError() { - return nil, dferrors.Newf(dfcodes.CdnTaskDownloadFail, "fail to download task(%s), cdnStatus: %s", task.TaskId, task.CdnStatus) + return nil, dferrors.Newf(dfcodes.CdnTaskDownloadFail, "fail to download task(%s), cdnStatus: %s", task.TaskID, task.CdnStatus) } pieces, err := css.taskMgr.GetPieces(ctx, req.TaskId) if err != nil { - return nil, dferrors.Newf(dfcodes.CdnError, "failed to get pieces of task(%s) from cdn: %v", task.TaskId, err) + return nil, dferrors.Newf(dfcodes.CdnError, "failed to get pieces of task(%s) from cdn: %v", task.TaskID, err) } pieceInfos := make([]*base.PieceInfo, 0) var count int32 = 0 diff --git a/cdnsystem/source/ossprotocol/oss_source_client.go b/cdnsystem/source/ossprotocol/oss_source_client.go index 22ed71d9d..3addf71af 100644 --- a/cdnsystem/source/ossprotocol/oss_source_client.go +++ b/cdnsystem/source/ossprotocol/oss_source_client.go @@ -96,7 +96,7 @@ func (osc *ossSourceClient) IsExpired(url string, header, expireInfo map[string] } func (osc *ossSourceClient) Download(url string, header map[string]string) (io.ReadCloser, map[string]string, error) { - ossObject, err := ParseOssObject(url) + ossObject, err := ParseOSSObject(url) if err != nil { return nil, nil, errors.Wrapf(err, "failed to parse oss object from url:%s", url) } @@ -153,7 +153,7 @@ func (osc *ossSourceClient) getMeta(url string, header map[string]string) (http. if err != nil { return nil, errors.Wrapf(err, "failed to get oss client") } - ossObject, err := ParseOssObject(url) + ossObject, err := ParseOSSObject(url) if err != nil { return nil, errors.Wrapf(cdnerrors.ErrURLNotReachable, "failed to parse oss object: %v", err) } @@ -183,12 +183,12 @@ func getOptions(header map[string]string) []oss.Option { return opts } -type ossObject struct { +type OSSObject struct { bucket string object string } -func ParseOssObject(ossURL string) (*ossObject, error) { +func ParseOSSObject(ossURL string) (*OSSObject, error) { url, err := url.Parse(ossURL) if url.Scheme != "oss" { return nil, fmt.Errorf("url:%s is not oss object", ossURL) @@ -196,7 +196,7 @@ func ParseOssObject(ossURL string) (*ossObject, error) { if err != nil { return nil, err } - return &ossObject{ + return &OSSObject{ bucket: url.Host, object: url.Path[1:], }, nil diff --git a/cdnsystem/types/seed_task_info.go b/cdnsystem/types/seed_task_info.go index d3283d757..31b3fa187 100644 --- a/cdnsystem/types/seed_task_info.go +++ b/cdnsystem/types/seed_task_info.go @@ -17,9 +17,9 @@ package types type SeedTask struct { - TaskId string `json:"taskId,omitempty"` - Url string `json:"url,omitempty"` - TaskUrl string `json:"taskUrl,omitempty"` + TaskID string `json:"taskId,omitempty"` + URL string `json:"url,omitempty"` + TaskURL string `json:"taskUrl,omitempty"` SourceFileLength int64 `json:"sourceFileLength,omitempty"` CdnFileLength int64 `json:"cdnFileLength,omitempty"` PieceSize int32 `json:"pieceSize,omitempty"` diff --git a/cdnsystem/types/task_register_request.go b/cdnsystem/types/task_register_request.go index 2b4ec78c6..5cdd31c21 100644 --- a/cdnsystem/types/task_register_request.go +++ b/cdnsystem/types/task_register_request.go @@ -19,7 +19,7 @@ package types // TaskRegisterRequest type TaskRegisterRequest struct { URL string `json:"rawURL,omitempty"` - TaskId string `json:"taskId,omitempty"` + TaskID string `json:"taskId,omitempty"` Md5 string `json:"md5,omitempty"` Filter []string `json:"filter,omitempty"` Header map[string]string `json:"header,omitempty"` diff --git a/client/config/dfget.go b/client/config/dfget.go index 1afde18a2..4a35ac41b 100644 --- a/client/config/dfget.go +++ b/client/config/dfget.go @@ -173,7 +173,7 @@ func (cfg *ClientOption) checkOutput() error { } dir, _ := path.Split(cfg.Output) - if err := MkdirAll(dir, 0777, basic.UserId, basic.UserGroup); err != nil { + if err := MkdirAll(dir, 0777, basic.UserID, basic.UserGroup); err != nil { return err } diff --git a/client/config/dfget_darwin.go b/client/config/dfget_darwin.go index 656f25418..1a969238d 100644 --- a/client/config/dfget_darwin.go +++ b/client/config/dfget_darwin.go @@ -21,11 +21,11 @@ package config import "d7y.io/dragonfly/v2/pkg/unit" var dfgetConfig = ClientOption{ - URL: "", - LockFile: "/tmp/dfget.lock", - Output: "", - Timeout: 0, - BenchmarkRate: 128 * unit.KB, + URL: "", + LockFile: "/tmp/dfget.lock", + Output: "", + Timeout: 0, + BenchmarkRate: 128 * unit.KB, RateLimit: 0, Md5: "", DigestMethod: "", diff --git a/client/config/dfget_linux.go b/client/config/dfget_linux.go index fdf68d2e0..56330f972 100644 --- a/client/config/dfget_linux.go +++ b/client/config/dfget_linux.go @@ -19,9 +19,9 @@ package config var dfgetConfig = ClientOption{ - URL: "", - LockFile: "/var/run/dfget.lock", - Output: "", + URL: "", + LockFile: "/var/run/dfget.lock", + Output: "", Timeout: 0, Md5: "", DigestMethod: "", diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index b3e2fc602..bdcd62922 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -67,14 +67,14 @@ type peerTask struct { // host info about current host host *scheduler.PeerHost // callback holds some actions, like init, done, fail actions - callback PeerTaskCallback + callback TaskCallback // schedule options schedulerOption config.SchedulerOption // peer task meta info - peerId string - taskId string + peerID string + taskID string contentLength int64 totalPiece int32 completedLength int64 @@ -124,16 +124,16 @@ func (pt *peerTask) ReportPieceResult(pieceTask *base.PieceInfo, pieceResult *sc panic("implement me") } -func (pt *peerTask) SetCallback(callback PeerTaskCallback) { +func (pt *peerTask) SetCallback(callback TaskCallback) { pt.callback = callback } func (pt *peerTask) GetPeerID() string { - return pt.peerId + return pt.peerID } func (pt *peerTask) GetTaskID() string { - return pt.taskId + return pt.taskID } func (pt *peerTask) GetContentLength() int64 { @@ -164,7 +164,7 @@ func (pt *peerTask) Log() *logger.SugaredLoggerOnWith { return pt.SugaredLoggerOnWith } -func (pt *peerTask) pullPieces(pti PeerTask, cleanUnfinishedFunc func()) { +func (pt *peerTask) pullPieces(pti Task, cleanUnfinishedFunc func()) { // when there is a single piece, try to download first if pt.singlePiece != nil { go pt.pullSinglePiece(pti, cleanUnfinishedFunc) @@ -292,7 +292,7 @@ func (pt *peerTask) isExitPeerPacketCode(pp *scheduler.PeerPacket) bool { return false } -func (pt *peerTask) pullSinglePiece(pti PeerTask, cleanUnfinishedFunc func()) { +func (pt *peerTask) pullSinglePiece(pti Task, cleanUnfinishedFunc func()) { pt.Infof("single piece, dest peer id: %s, piece num: %d, size: %d", pt.singlePiece.DstPid, pt.singlePiece.PieceInfo.PieceNum, pt.singlePiece.PieceInfo.RangeSize) @@ -332,7 +332,7 @@ func (pt *peerTask) pullSinglePiece(pti PeerTask, cleanUnfinishedFunc func()) { // TODO when main peer is not available, switch to steel peers // piece manager need peer task interface, pti make it compatibility for stream peer task -func (pt *peerTask) pullPiecesFromPeers(pti PeerTask, cleanUnfinishedFunc func()) { +func (pt *peerTask) pullPiecesFromPeers(pti Task, cleanUnfinishedFunc func()) { defer func() { close(pt.failedPieceCh) cleanUnfinishedFunc() @@ -387,8 +387,8 @@ loop: pt.Debugf("try to get pieces, number: %d, limit: %d", num, limit) piecePacket, err := pt.preparePieceTasks( &base.PieceTaskRequest{ - TaskId: pt.taskId, - SrcPid: pt.peerId, + TaskId: pt.taskID, + SrcPid: pt.peerID, StartNum: num, Limit: limit, }) @@ -507,7 +507,7 @@ loop: } } -func (pt *peerTask) downloadPieceWorker(id int32, pti PeerTask, requests chan *DownloadPieceRequest) { +func (pt *peerTask) downloadPieceWorker(id int32, pti Task, requests chan *DownloadPieceRequest) { for { select { case request := <-requests: @@ -630,8 +630,8 @@ retry: } pt.Errorf("get piece task from peer(%s) error: %s, code: %d", peer.PeerId, err, code) perr := pt.peerPacketStream.Send(&scheduler.PieceResult{ - TaskId: pt.taskId, - SrcPid: pt.peerId, + TaskId: pt.taskID, + SrcPid: pt.peerID, DstPid: peer.PeerId, Success: false, Code: code, @@ -672,8 +672,8 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer if len(pp.PieceInfos) == 0 { count++ er := pt.peerPacketStream.Send(&scheduler.PieceResult{ - TaskId: pt.taskId, - SrcPid: pt.peerId, + TaskId: pt.taskID, + SrcPid: pt.peerID, DstPid: peer.PeerId, Success: false, Code: dfcodes.ClientWaitPieceReady, diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 2472d0ae9..0d2890343 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -42,7 +42,7 @@ type FilePeerTaskRequest struct { // FilePeerTask represents a peer task to download a file type FilePeerTask interface { - PeerTask + Task // Start start the special peer task, return a *FilePeerTaskProgress channel for updating download progress Start(ctx context.Context) (chan *FilePeerTaskProgress, error) } @@ -62,7 +62,7 @@ type ProgressState struct { type FilePeerTaskProgress struct { State *ProgressState - TaskId string + TaskID string PeerID string ContentLength int64 CompletedLength int64 @@ -130,8 +130,8 @@ func newFilePeerTask(ctx context.Context, if piece, ok := result.DirectPiece.(*scheduler.RegisterResult_PieceContent); ok { return ctx, nil, &TinyData{ span: span, - TaskId: result.TaskId, - PeerId: request.PeerId, + TaskID: result.TaskId, + PeerID: request.PeerId, Content: piece.PieceContent, }, nil } @@ -165,8 +165,8 @@ func newFilePeerTask(ctx context.Context, peerPacketStream: peerPacketStream, pieceManager: pieceManager, peerPacketReady: make(chan bool), - peerId: request.PeerId, - taskId: result.TaskId, + peerID: request.PeerId, + taskID: result.TaskId, singlePiece: singlePiece, done: make(chan struct{}), span: span, @@ -245,8 +245,8 @@ func (pt *filePeerTask) ReportPieceResult(piece *base.PieceInfo, pieceResult *sc Code: pieceResult.Code, Msg: "downloading", }, - TaskId: pt.taskId, - PeerID: pt.peerId, + TaskID: pt.taskID, + PeerID: pt.peerID, ContentLength: pt.contentLength, CompletedLength: pt.completedLength, PeerTaskDone: false, @@ -275,7 +275,7 @@ func (pt *filePeerTask) finish() error { defer pt.recoverFromPanic() // send EOF piece result to scheduler _ = pt.peerPacketStream.Send( - scheduler.NewEndPieceResult(pt.taskId, pt.peerId, pt.readyPieces.Settled())) + scheduler.NewEndPieceResult(pt.taskID, pt.peerID, pt.readyPieces.Settled())) pt.Debugf("finish end piece result sent") var ( @@ -299,8 +299,8 @@ func (pt *filePeerTask) finish() error { Code: code, Msg: message, }, - TaskId: pt.taskId, - PeerID: pt.peerId, + TaskID: pt.taskID, + PeerID: pt.peerID, ContentLength: pt.contentLength, CompletedLength: pt.completedLength, PeerTaskDone: true, @@ -345,7 +345,7 @@ func (pt *filePeerTask) cleanUnfinished() { defer pt.recoverFromPanic() // send EOF piece result to scheduler _ = pt.peerPacketStream.Send( - scheduler.NewEndPieceResult(pt.taskId, pt.peerId, pt.readyPieces.Settled())) + scheduler.NewEndPieceResult(pt.taskID, pt.peerID, pt.readyPieces.Settled())) pt.Debugf("clean up end piece result sent") pg := &FilePeerTaskProgress{ @@ -354,8 +354,8 @@ func (pt *filePeerTask) cleanUnfinished() { Code: pt.failedCode, Msg: pt.failedReason, }, - TaskId: pt.taskId, - PeerID: pt.peerId, + TaskID: pt.taskID, + PeerID: pt.peerID, ContentLength: pt.contentLength, CompletedLength: pt.completedLength, PeerTaskDone: true, diff --git a/client/daemon/peer/peertask_file_callback.go b/client/daemon/peer/peertask_file_callback.go index 0a36a95e0..7a14a8041 100644 --- a/client/daemon/peer/peertask_file_callback.go +++ b/client/daemon/peer/peertask_file_callback.go @@ -37,7 +37,7 @@ func (p *filePeerTaskCallback) GetStartTime() time.Time { return p.start } -func (p *filePeerTaskCallback) Init(pt PeerTask) error { +func (p *filePeerTaskCallback) Init(pt Task) error { // prepare storage err := p.ptm.storageManager.RegisterTask(p.ctx, storage.RegisterTaskRequest{ @@ -55,7 +55,7 @@ func (p *filePeerTaskCallback) Init(pt PeerTask) error { return err } -func (p *filePeerTaskCallback) Update(pt PeerTask) error { +func (p *filePeerTaskCallback) Update(pt Task) error { // update storage err := p.ptm.storageManager.UpdateTask(p.ctx, &storage.UpdateTaskRequest{ @@ -72,7 +72,7 @@ func (p *filePeerTaskCallback) Update(pt PeerTask) error { return err } -func (p *filePeerTaskCallback) Done(pt PeerTask) error { +func (p *filePeerTaskCallback) Done(pt Task) error { var cost = time.Now().Sub(p.start).Milliseconds() pt.Log().Infof("file peer task done, cost: %dms", cost) e := p.ptm.storageManager.Store( @@ -111,7 +111,7 @@ func (p *filePeerTaskCallback) Done(pt PeerTask) error { return nil } -func (p *filePeerTaskCallback) Fail(pt PeerTask, code base.Code, reason string) error { +func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) error { p.ptm.PeerTaskDone(p.req.PeerId) var end = time.Now() pt.Log().Errorf("file peer task failed, code: %d, reason: %s", code, reason) diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index 83361bbf9..e4621407c 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -38,8 +38,8 @@ import ( schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" ) -// PeerTaskManager processes all peer tasks request -type PeerTaskManager interface { +// TaskManager processes all peer tasks request +type TaskManager interface { // StartFilePeerTask starts a peer task to download a file // return a progress channel for request download progress // tiny stands task file is tiny and task is done @@ -56,8 +56,8 @@ type PeerTaskManager interface { Stop(ctx context.Context) error } -// PeerTask represents common interface to operate a peer task -type PeerTask interface { +// Task represents common interface to operate a peer task +type Task interface { Context() context.Context Log() *logger.SugaredLoggerOnWith ReportPieceResult(pieceTask *base.PieceInfo, pieceResult *scheduler.PieceResult) error @@ -67,25 +67,25 @@ type PeerTask interface { GetContentLength() int64 // SetContentLength will called after download completed, when download from source without content length SetContentLength(int64) error - SetCallback(PeerTaskCallback) + SetCallback(TaskCallback) AddTraffic(int64) GetTraffic() int64 } -// PeerTaskCallback inserts some operations for peer task download lifecycle -type PeerTaskCallback interface { - Init(pt PeerTask) error - Done(pt PeerTask) error - Update(pt PeerTask) error - Fail(pt PeerTask, code base.Code, reason string) error +// TaskCallback inserts some operations for peer task download lifecycle +type TaskCallback interface { + Init(pt Task) error + Done(pt Task) error + Update(pt Task) error + Fail(pt Task, code base.Code, reason string) error GetStartTime() time.Time } type TinyData struct { // span is used by peer task manager to record events without peer task span trace.Span - TaskId string - PeerId string + TaskID string + PeerID string Content []byte } @@ -113,7 +113,7 @@ func NewPeerTaskManager( storageManager storage.Manager, schedulerClient schedulerclient.SchedulerClient, schedulerOption config.SchedulerOption, - perPeerRateLimit rate.Limit) (PeerTaskManager, error) { + perPeerRateLimit rate.Limit) (TaskManager, error) { ptm := &peerTaskManager{ host: host, @@ -138,7 +138,7 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer // tiny file content is returned by scheduler, just write to output if tiny != nil { defer tiny.span.End() - log := logger.With("peer", tiny.PeerId, "task", tiny.TaskId, "component", "peerTaskManager") + log := logger.With("peer", tiny.PeerID, "task", tiny.TaskID, "component", "peerTaskManager") _, err = os.Stat(req.Output) if err == nil { // remove exist file diff --git a/client/daemon/peer/peertask_manager_mock_test.go b/client/daemon/peer/peertask_manager_mock_test.go index 7bd0442d0..596c4d70b 100644 --- a/client/daemon/peer/peertask_manager_mock_test.go +++ b/client/daemon/peer/peertask_manager_mock_test.go @@ -190,7 +190,7 @@ func (mr *MockPeerTaskMockRecorder) SetContentLength(arg0 interface{}) *gomock.C } // SetCallback mocks base method -func (m *MockPeerTask) SetCallback(arg0 PeerTaskCallback) { +func (m *MockPeerTask) SetCallback(arg0 TaskCallback) { m.ctrl.Call(m, "SetCallback", arg0) } @@ -269,7 +269,7 @@ func (m *MockPeerTaskCallback) EXPECT() *MockPeerTaskCallbackMockRecorder { } // Init mocks base method -func (m *MockPeerTaskCallback) Init(pt PeerTask) error { +func (m *MockPeerTaskCallback) Init(pt Task) error { ret := m.ctrl.Call(m, "Init", pt) ret0, _ := ret[0].(error) return ret0 @@ -281,7 +281,7 @@ func (mr *MockPeerTaskCallbackMockRecorder) Init(pt interface{}) *gomock.Call { } // Done mocks base method -func (m *MockPeerTaskCallback) Done(pt PeerTask) error { +func (m *MockPeerTaskCallback) Done(pt Task) error { ret := m.ctrl.Call(m, "Done", pt) ret0, _ := ret[0].(error) return ret0 @@ -293,7 +293,7 @@ func (mr *MockPeerTaskCallbackMockRecorder) Done(pt interface{}) *gomock.Call { } // Update mocks base method -func (m *MockPeerTaskCallback) Update(pt PeerTask) error { +func (m *MockPeerTaskCallback) Update(pt Task) error { ret := m.ctrl.Call(m, "Update", pt) ret0, _ := ret[0].(error) return ret0 @@ -305,7 +305,7 @@ func (mr *MockPeerTaskCallbackMockRecorder) Update(pt interface{}) *gomock.Call } // Fail mocks base method -func (m *MockPeerTaskCallback) Fail(pt PeerTask, reason string) error { +func (m *MockPeerTaskCallback) Fail(pt Task, reason string) error { ret := m.ctrl.Call(m, "Fail", pt, reason) ret0, _ := ret[0].(error) return ret0 diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index d09f1045a..f7a1c2ee5 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -41,7 +41,7 @@ import ( // StreamPeerTask represents a peer task with stream io for reading directly without once more disk io type StreamPeerTask interface { - PeerTask + Task // Start start the special peer task, return a io.Reader for stream io // when all data transferred, reader return a io.EOF // attribute stands some extra data, like HTTP response Header @@ -111,8 +111,8 @@ func newStreamPeerTask(ctx context.Context, if piece, ok := result.DirectPiece.(*scheduler.RegisterResult_PieceContent); ok { return ctx, nil, &TinyData{ span: span, - TaskId: result.TaskId, - PeerId: request.PeerId, + TaskID: result.TaskId, + PeerID: request.PeerId, Content: piece.PieceContent, }, nil } @@ -144,8 +144,8 @@ func newStreamPeerTask(ctx context.Context, peerPacketStream: peerPacketStream, pieceManager: pieceManager, peerPacketReady: make(chan bool), - peerId: request.PeerId, - taskId: result.TaskId, + peerID: request.PeerId, + taskID: result.TaskId, singlePiece: singlePiece, done: make(chan struct{}), span: span, @@ -266,8 +266,8 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin } else { attr[headers.TransferEncoding] = "chunked" } - attr[config.HeaderDragonflyTask] = s.taskId - attr[config.HeaderDragonflyPeer] = s.peerId + attr[config.HeaderDragonflyTask] = s.taskID + attr[config.HeaderDragonflyPeer] = s.peerID go func(first int32) { defer func() { @@ -364,7 +364,7 @@ func (s *streamPeerTask) finish() error { s.once.Do(func() { // send EOF piece result to scheduler _ = s.peerPacketStream.Send( - scheduler.NewEndPieceResult(s.taskId, s.peerId, s.readyPieces.Settled())) + scheduler.NewEndPieceResult(s.taskID, s.peerID, s.readyPieces.Settled())) s.Debugf("end piece result sent") close(s.done) //close(s.successPieceCh) @@ -382,7 +382,7 @@ func (s *streamPeerTask) cleanUnfinished() { s.once.Do(func() { // send EOF piece result to scheduler _ = s.peerPacketStream.Send( - scheduler.NewEndPieceResult(s.taskId, s.peerId, s.readyPieces.Settled())) + scheduler.NewEndPieceResult(s.taskID, s.peerID, s.readyPieces.Settled())) s.Debugf("end piece result sent") close(s.done) //close(s.successPieceCh) @@ -407,8 +407,8 @@ func (s *streamPeerTask) SetContentLength(i int64) error { func (s *streamPeerTask) writeTo(w io.Writer, pieceNum int32) (int64, error) { pr, pc, err := s.pieceManager.ReadPiece(s.ctx, &storage.ReadPieceRequest{ PeerTaskMetaData: storage.PeerTaskMetaData{ - PeerID: s.peerId, - TaskID: s.taskId, + PeerID: s.peerID, + TaskID: s.taskID, }, PieceMetaData: storage.PieceMetaData{ Num: pieceNum, diff --git a/client/daemon/peer/peertask_stream_callback.go b/client/daemon/peer/peertask_stream_callback.go index 06f1be330..2008faa71 100644 --- a/client/daemon/peer/peertask_stream_callback.go +++ b/client/daemon/peer/peertask_stream_callback.go @@ -37,7 +37,7 @@ func (p *streamPeerTaskCallback) GetStartTime() time.Time { return p.start } -func (p *streamPeerTaskCallback) Init(pt PeerTask) error { +func (p *streamPeerTaskCallback) Init(pt Task) error { // prepare storage err := p.ptm.storageManager.RegisterTask(p.ctx, storage.RegisterTaskRequest{ @@ -55,7 +55,7 @@ func (p *streamPeerTaskCallback) Init(pt PeerTask) error { return err } -func (p *streamPeerTaskCallback) Update(pt PeerTask) error { +func (p *streamPeerTaskCallback) Update(pt Task) error { // update storage err := p.ptm.storageManager.UpdateTask(p.ctx, &storage.UpdateTaskRequest{ @@ -72,7 +72,7 @@ func (p *streamPeerTaskCallback) Update(pt PeerTask) error { return err } -func (p *streamPeerTaskCallback) Done(pt PeerTask) error { +func (p *streamPeerTaskCallback) Done(pt Task) error { var cost = time.Now().Sub(p.start).Milliseconds() pt.Log().Infof("stream peer task done, cost: %dms", cost) e := p.ptm.storageManager.Store( @@ -110,7 +110,7 @@ func (p *streamPeerTaskCallback) Done(pt PeerTask) error { return nil } -func (p *streamPeerTaskCallback) Fail(pt PeerTask, code base.Code, reason string) error { +func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) error { p.ptm.PeerTaskDone(p.req.PeerId) var end = time.Now() pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", code, reason) diff --git a/client/daemon/peer/piece_downloader_optimized.go b/client/daemon/peer/piece_downloader_optimized.go index bd2cc2738..55ada53b1 100644 --- a/client/daemon/peer/piece_downloader_optimized.go +++ b/client/daemon/peer/piece_downloader_optimized.go @@ -62,6 +62,7 @@ func (o optimizedPieceDownloader) DownloadPiece(ctx context.Context, request *Do panic(err) } // TODO refactor httputil.NewClientConn + //nolint client := httputil.NewClientConn(conn, nil) // add default timeout ctx, cancel := context.WithTimeout(ctx, 30*time.Second) diff --git a/client/daemon/peer/piece_manager.go b/client/daemon/peer/piece_manager.go index 3e94b52b6..a1200019a 100644 --- a/client/daemon/peer/piece_manager.go +++ b/client/daemon/peer/piece_manager.go @@ -26,6 +26,8 @@ import ( cdnconfig "d7y.io/dragonfly/v2/cdnsystem/config" "d7y.io/dragonfly/v2/cdnsystem/source" + + // Init http client _ "d7y.io/dragonfly/v2/cdnsystem/source/httpprotocol" "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" @@ -38,8 +40,8 @@ import ( ) type PieceManager interface { - DownloadSource(ctx context.Context, pt PeerTask, request *scheduler.PeerTaskRequest) error - DownloadPiece(ctx context.Context, peerTask PeerTask, request *DownloadPieceRequest) bool + DownloadSource(ctx context.Context, pt Task, request *scheduler.PeerTaskRequest) error + DownloadPiece(ctx context.Context, peerTask Task, request *DownloadPieceRequest) bool ReadPiece(ctx context.Context, req *storage.ReadPieceRequest) (io.Reader, io.Closer, error) } @@ -95,7 +97,7 @@ func WithLimiter(limiter *rate.Limiter) func(*pieceManager) { } } -func (pm *pieceManager) DownloadPiece(ctx context.Context, pt PeerTask, request *DownloadPieceRequest) (success bool) { +func (pm *pieceManager) DownloadPiece(ctx context.Context, pt Task, request *DownloadPieceRequest) (success bool) { var ( start = time.Now().UnixNano() end int64 @@ -163,7 +165,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, pt PeerTask, request return } -func (pm *pieceManager) pushSuccessResult(peerTask PeerTask, dstPid string, piece *base.PieceInfo, start int64, end int64) { +func (pm *pieceManager) pushSuccessResult(peerTask Task, dstPid string, piece *base.PieceInfo, start int64, end int64) { err := peerTask.ReportPieceResult( piece, &scheduler.PieceResult{ @@ -183,7 +185,7 @@ func (pm *pieceManager) pushSuccessResult(peerTask PeerTask, dstPid string, piec } } -func (pm *pieceManager) pushFailResult(peerTask PeerTask, dstPid string, piece *base.PieceInfo, start int64, end int64) { +func (pm *pieceManager) pushFailResult(peerTask Task, dstPid string, piece *base.PieceInfo, start int64, end int64) { err := peerTask.ReportPieceResult( piece, &scheduler.PieceResult{ @@ -207,7 +209,7 @@ func (pm *pieceManager) ReadPiece(ctx context.Context, req *storage.ReadPieceReq return pm.storageManager.ReadPiece(ctx, req) } -func (pm *pieceManager) processPieceFromSource(pt PeerTask, +func (pm *pieceManager) processPieceFromSource(pt Task, reader io.Reader, contentLength int64, pieceNum int32, pieceOffset uint64, pieceSize int32) (int64, error) { var ( success bool @@ -290,7 +292,7 @@ func (pm *pieceManager) processPieceFromSource(pt PeerTask, return n, nil } -func (pm *pieceManager) DownloadSource(ctx context.Context, pt PeerTask, request *scheduler.PeerTaskRequest) error { +func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *scheduler.PeerTaskRequest) error { if request.UrlMata == nil { request.UrlMata = &base.UrlMeta{ Header: map[string]string{}, diff --git a/client/daemon/peerhost.go b/client/daemon/peerhost.go index a3e06994a..a7999e939 100644 --- a/client/daemon/peerhost.go +++ b/client/daemon/peerhost.go @@ -71,7 +71,7 @@ type peerHost struct { StorageManager storage.Manager GCManager gc.Manager - PeerTaskManager peer.PeerTaskManager + PeerTaskManager peer.TaskManager PieceManager peer.PieceManager } diff --git a/client/daemon/proxy/proxy.go b/client/daemon/proxy/proxy.go index 8bf9f5910..f4107d908 100644 --- a/client/daemon/proxy/proxy.go +++ b/client/daemon/proxy/proxy.go @@ -72,7 +72,7 @@ type Proxy struct { directHandler http.Handler // peerTaskManager is the peer task manager - peerTaskManager peer.PeerTaskManager + peerTaskManager peer.TaskManager // peerHost is the peer host info peerHost *scheduler.PeerHost @@ -104,7 +104,7 @@ func WithPeerHost(peerHost *scheduler.PeerHost) Option { } // WithPeerTaskManager sets the peer.PeerTaskManager -func WithPeerTaskManager(peerTaskManager peer.PeerTaskManager) Option { +func WithPeerTaskManager(peerTaskManager peer.TaskManager) Option { return func(p *Proxy) *Proxy { p.peerTaskManager = peerTaskManager return p diff --git a/client/daemon/proxy/proxy_manager.go b/client/daemon/proxy/proxy_manager.go index ac3b7cdf4..86ad7ac68 100644 --- a/client/daemon/proxy/proxy_manager.go +++ b/client/daemon/proxy/proxy_manager.go @@ -48,7 +48,7 @@ type proxyManager struct { config.ListenOption } -func NewProxyManager(peerHost *scheduler.PeerHost, peerTaskManager peer.PeerTaskManager, opts *config.ProxyOption) (Manager, error) { +func NewProxyManager(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskManager, opts *config.ProxyOption) (Manager, error) { registry := opts.RegistryMirror proxies := opts.Proxies hijackHTTPS := opts.HijackHTTPS diff --git a/client/daemon/service/manager.go b/client/daemon/service/manager.go index c4d7af574..710212c48 100644 --- a/client/daemon/service/manager.go +++ b/client/daemon/service/manager.go @@ -50,7 +50,7 @@ type Manager interface { type manager struct { clientutil.KeepAlive peerHost *scheduler.PeerHost - peerTaskManager peer.PeerTaskManager + peerTaskManager peer.TaskManager storageManager storage.Manager downloadServer rpc.Server @@ -60,7 +60,7 @@ type manager struct { var _ dfdaemonserver.DaemonServer = &manager{} -func NewManager(peerHost *scheduler.PeerHost, peerTaskManager peer.PeerTaskManager, storageManager storage.Manager, downloadOpts []grpc.ServerOption, peerOpts []grpc.ServerOption) (Manager, error) { +func NewManager(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskManager, storageManager storage.Manager, downloadOpts []grpc.ServerOption, peerOpts []grpc.ServerOption) (Manager, error) { mgr := &manager{ KeepAlive: clientutil.NewKeepAlive("service manager"), peerHost: peerHost, @@ -155,8 +155,8 @@ func (m *manager) Download(ctx context.Context, } if tiny != nil { results <- &dfdaemongrpc.DownResult{ - TaskId: tiny.TaskId, - PeerId: tiny.PeerId, + TaskId: tiny.TaskID, + PeerId: tiny.PeerID, CompletedLength: uint64(len(tiny.Content)), Done: true, } @@ -179,11 +179,11 @@ func (m *manager) Download(ctx context.Context, return dferrors.New(dfcodes.UnknownError, err.Error()) } if !p.State.Success { - log.Errorf("task %s/%s failed: %d/%s", p.PeerID, p.TaskId, p.State.Code, p.State.Msg) + log.Errorf("task %s/%s failed: %d/%s", p.PeerID, p.TaskID, p.State.Code, p.State.Msg) return dferrors.New(p.State.Code, p.State.Msg) } results <- &dfdaemongrpc.DownResult{ - TaskId: p.TaskId, + TaskId: p.TaskID, PeerId: p.PeerID, CompletedLength: uint64(p.CompletedLength), Done: p.PeerTaskDone, @@ -191,7 +191,7 @@ func (m *manager) Download(ctx context.Context, // peer task sets PeerTaskDone to true only once if p.PeerTaskDone { p.DoneCallback() - log.Infof("task %s/%s done", p.PeerID, p.TaskId) + log.Infof("task %s/%s done", p.PeerID, p.TaskID) if req.Uid != 0 && req.Gid != 0 { log.Infof("change own to uid %d gid %d", req.Uid, req.Gid) if err = os.Chown(req.Output, int(req.Uid), int(req.Gid)); err != nil { diff --git a/client/daemon/service/manager_test.go b/client/daemon/service/manager_test.go index 116f99316..653dab1be 100644 --- a/client/daemon/service/manager_test.go +++ b/client/daemon/service/manager_test.go @@ -61,7 +61,7 @@ func TestDownloadManager_ServeDownload(t *testing.T) { State: &peer.ProgressState{ Success: true, }, - TaskId: "", + TaskID: "", PeerID: "", ContentLength: 100, CompletedLength: int64(i), diff --git a/client/daemon/test/mock/peer/peertask_manager.go b/client/daemon/test/mock/peer/peertask_manager.go index d7b4f0788..0cdb61b75 100644 --- a/client/daemon/test/mock/peer/peertask_manager.go +++ b/client/daemon/test/mock/peer/peertask_manager.go @@ -249,7 +249,7 @@ func (mr *MockPeerTaskMockRecorder) ReportPieceResult(pieceTask, pieceResult int } // SetCallback mocks base method. -func (m *MockPeerTask) SetCallback(arg0 peer.PeerTaskCallback) { +func (m *MockPeerTask) SetCallback(arg0 peer.TaskCallback) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetCallback", arg0) } @@ -298,7 +298,7 @@ func (m *MockPeerTaskCallback) EXPECT() *MockPeerTaskCallbackMockRecorder { } // Done mocks base method. -func (m *MockPeerTaskCallback) Done(pt peer.PeerTask) error { +func (m *MockPeerTaskCallback) Done(pt peer.Task) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Done", pt) ret0, _ := ret[0].(error) @@ -312,7 +312,7 @@ func (mr *MockPeerTaskCallbackMockRecorder) Done(pt interface{}) *gomock.Call { } // Fail mocks base method. -func (m *MockPeerTaskCallback) Fail(pt peer.PeerTask, code base.Code, reason string) error { +func (m *MockPeerTaskCallback) Fail(pt peer.Task, code base.Code, reason string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Fail", pt, code, reason) ret0, _ := ret[0].(error) @@ -340,7 +340,7 @@ func (mr *MockPeerTaskCallbackMockRecorder) GetStartTime() *gomock.Call { } // Init mocks base method. -func (m *MockPeerTaskCallback) Init(pt peer.PeerTask) error { +func (m *MockPeerTaskCallback) Init(pt peer.Task) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Init", pt) ret0, _ := ret[0].(error) @@ -354,7 +354,7 @@ func (mr *MockPeerTaskCallbackMockRecorder) Init(pt interface{}) *gomock.Call { } // Update mocks base method. -func (m *MockPeerTaskCallback) Update(pt peer.PeerTask) error { +func (m *MockPeerTaskCallback) Update(pt peer.Task) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Update", pt) ret0, _ := ret[0].(error) diff --git a/client/daemon/transport/transport.go b/client/daemon/transport/transport.go index 5913578f0..129bf795b 100644 --- a/client/daemon/transport/transport.go +++ b/client/daemon/transport/transport.go @@ -48,7 +48,7 @@ type transport struct { shouldUseDragonfly func(req *http.Request) bool // peerTaskManager is the peer task manager - peerTaskManager peer.PeerTaskManager + peerTaskManager peer.TaskManager // peerHost is the peer host info peerHost *scheduler.PeerHost @@ -72,7 +72,7 @@ func WithPeerHost(peerHost *scheduler.PeerHost) Option { } // WithHTTPSHosts sets the rules for hijacking https requests -func WithPeerTaskManager(peerTaskManager peer.PeerTaskManager) Option { +func WithPeerTaskManager(peerTaskManager peer.TaskManager) Option { return func(rt *transport) *transport { rt.peerTaskManager = peerTaskManager return rt diff --git a/client/dfget/dfget.go b/client/dfget/dfget.go index 91369c18a..3e0cabf7b 100644 --- a/client/dfget/dfget.go +++ b/client/dfget/dfget.go @@ -33,6 +33,8 @@ import ( logger "d7y.io/dragonfly/v2/pkg/dflog" "d7y.io/dragonfly/v2/pkg/rpc/base" dfdaemongrpc "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon" + + // Init daemon rpc client _ "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" dfclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" "github.com/go-http-utils/headers" @@ -74,7 +76,7 @@ func Download(cfg *config.DfgetConfig, client dfclient.DaemonClient) error { Output: output, BizId: cfg.CallSystem, Filter: filter, - Uid: int64(basic.UserId), + Uid: int64(basic.UserID), Gid: int64(basic.UserGroup), } var ( @@ -165,8 +167,8 @@ func downloadFromSource(cfg *config.DfgetConfig, hdr map[string]string) (err err end = time.Now() fmt.Printf("Download from source success, time cost: %dms\n", end.Sub(start).Milliseconds()) // change permission - logger.Infof("change own to uid %d gid %d", basic.UserId, basic.UserGroup) - if err = os.Chown(cfg.Output, basic.UserId, basic.UserGroup); err != nil { + logger.Infof("change own to uid %d gid %d", basic.UserID, basic.UserGroup) + if err = os.Chown(cfg.Output, basic.UserID, basic.UserGroup); err != nil { logger.Errorf("change own failed: %s", err) return err } diff --git a/cmd/dependency/dependency.go b/cmd/dependency/dependency.go index 35fb72f22..f894f1f39 100644 --- a/cmd/dependency/dependency.go +++ b/cmd/dependency/dependency.go @@ -109,18 +109,18 @@ func InitMonitor(verbose bool, pprofPort int, jaeger string) func() { vm := statsview.New() if err := vm.Start(); err != nil { logger.Warnf("serve pprof error:%v", err) - } else { - fc <- func() { vm.Stop() } } + fc <- func() { vm.Stop() } }() } if jaeger != "" { - if ff, err := initJaegerTracer(jaeger); err != nil { + ff, err := initJaegerTracer(jaeger) + if err != nil { logger.Warnf("init jaeger tracer error:%v", err) - } else { - fc <- ff } + + fc <- ff } return func() { diff --git a/cmd/dfget/cmd/root.go b/cmd/dfget/cmd/root.go index e7d032c64..1d398c179 100644 --- a/cmd/dfget/cmd/root.go +++ b/cmd/dfget/cmd/root.go @@ -160,10 +160,9 @@ func runDfget() error { daemonClient, err := checkAndSpawnDaemon() if err != nil { logger.Errorf("check and spawn daemon error:%v", err) - } else { - logger.Info("check and spawn daemon success") } + logger.Info("check and spawn daemon success") return dfget.Download(dfgetConfig, daemonClient) } diff --git a/docs/en/development/local.md b/docs/en/development/local.md index abcd267b4..6bb26a702 100644 --- a/docs/en/development/local.md +++ b/docs/en/development/local.md @@ -45,8 +45,8 @@ $ tail -f log/**/*.log ==> log/dragonfly/scheduler/core.log <== {"level":"info","ts":"2021-02-26 05:43:37.332","caller":"cmd/root.go:57","msg":"start to run scheduler"} {"level":"info","ts":"2021-02-26 05:43:37.338","caller":"server/server.go:35","msg":"start server at port %!s(int=8002)"} -{"level":"info","ts":"2021-02-26 05:43:37.342","caller":"schedule_worker/sender.go:49","msg":"start sender worker : 50"} -{"level":"info","ts":"2021-02-26 05:43:37.343","caller":"schedule_worker/worker_group.go:64","msg":"start scheduler worker number:6"} +{"level":"info","ts":"2021-02-26 05:43:37.342","caller":"worker/sender.go:49","msg":"start sender worker : 50"} +{"level":"info","ts":"2021-02-26 05:43:37.343","caller":"worker/worker_group.go:64","msg":"start scheduler worker number:6"} ``` ### Step 4: Stop dragonfly diff --git a/go.mod b/go.mod index 09d6f9dee..a58732e40 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( go.opentelemetry.io/otel/trace v0.20.0 go.uber.org/atomic v1.6.0 go.uber.org/zap v1.16.0 - golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 + golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 // indirect golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf diff --git a/hack/wait-for.sh b/hack/wait-for.sh index cf7f78cb6..c388ab3ff 100755 --- a/hack/wait-for.sh +++ b/hack/wait-for.sh @@ -181,4 +181,4 @@ case "$PROTOCOL" in ;; esac -wait_for "$@" \ No newline at end of file +wait_for "$@" diff --git a/pkg/.DS_Store b/pkg/.DS_Store deleted file mode 100644 index d6dc01aa8..000000000 Binary files a/pkg/.DS_Store and /dev/null differ diff --git a/pkg/basic/user.go b/pkg/basic/user.go index 0104fc9da..69e949f67 100644 --- a/pkg/basic/user.go +++ b/pkg/basic/user.go @@ -29,7 +29,7 @@ var ( HomeDir string TmpDir string Username string - UserId int + UserID int UserGroup int ) @@ -40,7 +40,7 @@ func init() { } Username = u.Username - UserId, _ = strconv.Atoi(u.Uid) + UserID, _ = strconv.Atoi(u.Uid) UserGroup, _ = strconv.Atoi(u.Gid) HomeDir = u.HomeDir diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index ed0688205..7cc8b921d 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -124,7 +124,7 @@ func (sc *schedulerClient) RegisterPeerTask(ctx context.Context, ptr *scheduler. func (sc *schedulerClient) doRegisterPeerTask(ctx context.Context, ptr *scheduler.PeerTaskRequest, exclusiveNodes []string, opts []grpc.CallOption) (rr *scheduler.RegisterResult, err error) { var ( - taskId string + taskID string suc bool code base.Code schedulerNode string @@ -142,12 +142,12 @@ func (sc *schedulerClient) doRegisterPeerTask(ctx context.Context, ptr *schedule return client.RegisterPeerTask(ctx, ptr, opts...) }, 0.5, 5.0, 5, nil); err == nil { rr = res.(*scheduler.RegisterResult) - taskId = rr.TaskId + taskID = rr.TaskId suc = true code = dfcodes.Success - if taskId != key { - logger.WithPeerID(ptr.PeerId).Warnf("register peer task correct taskId from %s to %s", key, taskId) - sc.Connection.CorrectKey2NodeRelation(key, taskId) + if taskID != key { + logger.WithPeerID(ptr.PeerId).Warnf("register peer task correct taskId from %s to %s", key, taskID) + sc.Connection.CorrectKey2NodeRelation(key, taskID) } } else { if de, ok := err.(*dferrors.DfError); ok { @@ -156,7 +156,7 @@ func (sc *schedulerClient) doRegisterPeerTask(ctx context.Context, ptr *schedule } logger.With("peerId", ptr.PeerId, "errMsg", err). Infof("register peer task result:%t[%d] for taskId:%s,url:%s,peerIp:%s,securityDomain:%s,idc:%s,scheduler:%s", - suc, int32(code), taskId, ptr.Url, ptr.PeerHost.Ip, ptr.PeerHost.SecurityDomain, ptr.PeerHost.Idc, schedulerNode) + suc, int32(code), taskID, ptr.Url, ptr.PeerHost.Ip, ptr.PeerHost.SecurityDomain, ptr.PeerHost.Idc, schedulerNode) if err != nil { var preNode string diff --git a/pkg/util/statutils/stat_linux.go b/pkg/util/statutils/stat_linux.go index ec90fd008..226d09f51 100644 --- a/pkg/util/statutils/stat_linux.go +++ b/pkg/util/statutils/stat_linux.go @@ -52,9 +52,8 @@ func CtimeSec(info os.FileInfo) int64 { func GetSysStat(info os.FileInfo) *syscall.Stat_t { if stat, ok := info.Sys().(*syscall.Stat_t); ok { return stat - } else { - return nil } + return nil } func FreeSpace(diskPath string) (unit.Bytes, error) { diff --git a/scheduler/manager/cdn_manager.go b/scheduler/manager/cdn_manager.go index 6a90a55a9..5b27c4fc3 100644 --- a/scheduler/manager/cdn_manager.go +++ b/scheduler/manager/cdn_manager.go @@ -134,13 +134,13 @@ func (cm *CDNManager) TriggerTask(task *types.Task, callback func(peerTask *type go safe.Call(func() { stream, err := cm.client.ObtainSeeds(context.TODO(), &cdnsystem.SeedRequest{ - TaskId: task.TaskId, - Url: task.Url, + TaskId: task.TaskID, + Url: task.URL, Filter: task.Filter, - UrlMeta: task.UrlMata, + UrlMeta: task.URLMata, }) if err != nil { - logger.Warnf("receive a failure state from cdn: taskId[%s] error:%v", task.TaskId, err) + logger.Warnf("receive a failure state from cdn: taskId[%s] error:%v", task.TaskID, err) e, ok := err.(*dferrors.DfError) if !ok { e = dferrors.New(dfcodes.CdnError, err.Error()) @@ -176,7 +176,7 @@ func (cm *CDNManager) doCallback(task *types.Task, err *dferrors.DfError) { if err != nil { time.Sleep(time.Second * 5) - cm.taskManager.Delete(task.TaskId) + cm.taskManager.Delete(task.TaskID) cm.taskManager.PeerTask.DeleteTask(task) } }) @@ -219,20 +219,20 @@ func (cm *CDNManager) Work(task *types.Task, stream *client.PieceSeedStream) { if !ok { dferr = dferrors.New(dfcodes.CdnError, err.Error()) } - logger.Warnf("receive a failure state from cdn: taskId[%s] error:%v", task.TaskId, err) + logger.Warnf("receive a failure state from cdn: taskId[%s] error:%v", task.TaskID, err) cm.doCallback(task, dferr) return } if ps == nil { - logger.Warnf("receive a nil pieceSeed or state from cdn: taskId[%s]", task.TaskId) + logger.Warnf("receive a nil pieceSeed or state from cdn: taskId[%s]", task.TaskID) } else { pieceNum := int32(-1) if ps.PieceInfo != nil { pieceNum = ps.PieceInfo.PieceNum } cm.processPieceSeed(task, ps) - logger.Debugf("receive a pieceSeed from cdn: taskId[%s]-%d done [%v]", task.TaskId, pieceNum, ps.Done) + logger.Debugf("receive a pieceSeed from cdn: taskId[%s]-%d done [%v]", task.TaskID, pieceNum, ps.Done) if waitCallback { waitCallback = false @@ -322,13 +322,20 @@ func (cm *CDNManager) getHostUUID(ps *cdnsystem.PieceSeed) string { func (cm *CDNManager) createPiece(task *types.Task, ps *cdnsystem.PieceSeed, pt *types.PeerTask) *types.Piece { p := task.GetOrCreatePiece(ps.PieceInfo.PieceNum) - p.PieceInfo = *ps.PieceInfo + p.PieceInfo = base.PieceInfo{ + PieceNum: ps.PieceInfo.PieceNum, + RangeStart: ps.PieceInfo.RangeStart, + RangeSize: ps.PieceInfo.RangeSize, + PieceMd5: ps.PieceInfo.PieceMd5, + PieceOffset: ps.PieceInfo.PieceOffset, + PieceStyle: ps.PieceInfo.PieceStyle, + } return p } func (cm *CDNManager) getTinyFileContent(task *types.Task, cdnHost *types.Host) (content []byte, err error) { resp, err := cm.client.GetPieceTasks(context.TODO(), dfnet.NetAddr{Type: dfnet.TCP, Addr: fmt.Sprintf("%s:%d", cdnHost.Ip, cdnHost.RpcPort)}, &base.PieceTaskRequest{ - TaskId: task.TaskId, + TaskId: task.TaskID, SrcPid: "scheduler", StartNum: 0, Limit: 2, @@ -343,7 +350,7 @@ func (cm *CDNManager) getTinyFileContent(task *types.Task, cdnHost *types.Host) // TODO download the tiny file // http://host:port/download/{taskId 前3位}/{taskId}?peerId={peerId}; url := fmt.Sprintf("http://%s:%d/download/%s/%s?peerId=scheduler", - cdnHost.Ip, cdnHost.DownPort, task.TaskId[:3], task.TaskId) + cdnHost.Ip, cdnHost.DownPort, task.TaskID[:3], task.TaskID) client := &http.Client{ Timeout: time.Second * 5, } diff --git a/scheduler/manager/cdn_manager_test.go b/scheduler/manager/cdn_manager_test.go index 17a7d615a..8cc73f200 100644 --- a/scheduler/manager/cdn_manager_test.go +++ b/scheduler/manager/cdn_manager_test.go @@ -25,6 +25,14 @@ import ( ) func TestCDNHostsToServers(t *testing.T) { + mockServerInfo := &manager.ServerInfo{ + HostInfo: &manager.HostInfo{ + HostName: "foo", + }, + RpcPort: 8002, + DownPort: 8001, + } + tests := []struct { name string hosts []*manager.ServerInfo @@ -44,13 +52,7 @@ func TestCDNHostsToServers(t *testing.T) { expect: func(t *testing.T, data interface{}) { assert := testifyassert.New(t) assert.EqualValues(map[string]*manager.ServerInfo{ - "foo": &manager.ServerInfo{ - HostInfo: &manager.HostInfo{ - HostName: "foo", - }, - RpcPort: 8002, - DownPort: 8001, - }, + "foo": mockServerInfo, }, data) }, }, diff --git a/scheduler/manager/host_manager.go b/scheduler/manager/host_manager.go index fbff8297b..71a3e1ec1 100644 --- a/scheduler/manager/host_manager.go +++ b/scheduler/manager/host_manager.go @@ -43,11 +43,11 @@ func (m *HostManager) Add(host *types.Host) *types.Host { return v.(*types.Host) } - copyHost := types.CopyHost(host) - m.CalculateLoad(copyHost) - m.data.Store(host.Uuid, copyHost) + h := types.Init(host) + m.CalculateLoad(h) + m.data.Store(host.Uuid, h) - return copyHost + return h } func (m *HostManager) Delete(uuid string) { @@ -72,8 +72,7 @@ func (m *HostManager) CalculateLoad(host *types.Host) { if host.Type == types.HostTypePeer { host.SetTotalUploadLoad(HostLoadPeer) host.SetTotalDownloadLoad(HostLoadPeer) - } else { - host.SetTotalUploadLoad(HostLoadCDN) - host.SetTotalDownloadLoad(HostLoadCDN) } + host.SetTotalUploadLoad(HostLoadCDN) + host.SetTotalDownloadLoad(HostLoadCDN) } diff --git a/scheduler/manager/peer_task.go b/scheduler/manager/peer_task.go index 38c5bc02b..30c770d40 100644 --- a/scheduler/manager/peer_task.go +++ b/scheduler/manager/peer_task.go @@ -86,7 +86,7 @@ func (m *PeerTask) Add(pid string, task *types.Task, host *types.Host) *types.Pe pt := types.NewPeerTask(pid, task, host, m.addToGCQueue) m.data.Store(pid, pt) - m.taskManager.Touch(task.TaskId) + m.taskManager.Touch(task.TaskID) r, ok := m.dataRanger.Load(pt.Task) if ok { @@ -345,7 +345,7 @@ func (m *PeerTask) printDebugInfo() string { } func (m *PeerTask) RefreshDownloadMonitor(pt *types.PeerTask) { - logger.Debugf("[%s][%s] downloadMonitorWorkingLoop refresh ", pt.Task.TaskId, pt.Pid) + logger.Debugf("[%s][%s] downloadMonitorWorkingLoop refresh ", pt.Task.TaskID, pt.Pid) status := pt.GetNodeStatus() if status != types.PeerTaskStatusHealth { m.downloadMonitorQueue.AddAfter(pt, time.Second*2) @@ -363,9 +363,8 @@ func (m *PeerTask) RefreshDownloadMonitor(pt *types.PeerTask) { func (m *PeerTask) CDNCallback(pt *types.PeerTask, err *dferrors.DfError) { if err != nil { pt.SendError(err) - } else { - m.downloadMonitorQueue.Add(pt) } + m.downloadMonitorQueue.Add(pt) } func (m *PeerTask) SetDownloadingMonitorCallBack(callback func(*types.PeerTask)) { @@ -381,7 +380,7 @@ func (m *PeerTask) downloadMonitorWorkingLoop() { if m.downloadMonitorCallBack != nil { pt, _ := v.(*types.PeerTask) if pt != nil { - logger.Debugf("[%s][%s] downloadMonitorWorkingLoop status[%d]", pt.Task.TaskId, pt.Pid, pt.GetNodeStatus()) + logger.Debugf("[%s][%s] downloadMonitorWorkingLoop status[%d]", pt.Task.TaskID, pt.Pid, pt.GetNodeStatus()) if pt.Success || (pt.Host != nil && pt.Host.Type == types.HostTypeCdn) { // clear from monitor } else { diff --git a/scheduler/manager/task_manager_test.go b/scheduler/manager/task_manager_test.go index e6a01ebd9..07434405d 100644 --- a/scheduler/manager/task_manager_test.go +++ b/scheduler/manager/task_manager_test.go @@ -40,7 +40,7 @@ func TestTaskManager_Set(t *testing.T) { }, key: "foo", task: &types.Task{ - TaskId: "bar", + TaskID: "bar", }, expect: func(t *testing.T, d interface{}) { assert := assert.New(t) @@ -68,7 +68,7 @@ func TestTaskManager_Set(t *testing.T) { }, key: "", task: &types.Task{ - TaskId: "bar", + TaskID: "bar", }, expect: func(t *testing.T, d interface{}) { assert := assert.New(t) @@ -101,7 +101,7 @@ func TestTaskManager_Add(t *testing.T) { }, key: "foo", task: &types.Task{ - TaskId: "bar", + TaskID: "bar", }, expect: func(t *testing.T, d interface{}, err error) { assert := assert.New(t) @@ -129,7 +129,7 @@ func TestTaskManager_Add(t *testing.T) { }, key: "", task: &types.Task{ - TaskId: "bar", + TaskID: "bar", }, expect: func(t *testing.T, d interface{}, err error) { assert := assert.New(t) @@ -144,7 +144,7 @@ func TestTaskManager_Add(t *testing.T) { }, key: "foo", task: &types.Task{ - TaskId: "bar", + TaskID: "bar", }, expect: func(t *testing.T, d interface{}, err error) { assert := assert.New(t) @@ -162,6 +162,10 @@ func TestTaskManager_Add(t *testing.T) { } func TestTaskManager_Get(t *testing.T) { + mockTask := &types.Task{ + TaskID: "bar", + } + tests := []struct { name string taskManager *TaskManager @@ -172,15 +176,13 @@ func TestTaskManager_Get(t *testing.T) { name: "get existing task", taskManager: &TaskManager{ lock: new(sync.RWMutex), - data: map[string]*types.Task{"foo": &types.Task{ - TaskId: "bar", - }}, + data: map[string]*types.Task{"foo": mockTask}, }, key: "foo", expect: func(t *testing.T, task *types.Task, found bool) { assert := assert.New(t) assert.Equal(true, found) - assert.Equal("bar", task.TaskId) + assert.Equal("bar", task.TaskID) }, }, { @@ -250,6 +252,10 @@ func TestTaskManager_Delete(t *testing.T) { } func TestTaskManager_Touch(t *testing.T) { + mockTask := &types.Task{ + TaskID: "bar", + } + tests := []struct { name string taskManager *TaskManager @@ -261,9 +267,7 @@ func TestTaskManager_Touch(t *testing.T) { name: "touch existing task", taskManager: &TaskManager{ lock: new(sync.RWMutex), - data: map[string]*types.Task{"foo": &types.Task{ - TaskId: "bar", - }}, + data: map[string]*types.Task{"foo": mockTask}, }, key: "foo", expect: func(t *testing.T, task *types.Task, found bool) { diff --git a/scheduler/scheduler/evaluator.go b/scheduler/scheduler/evaluator.go index c99e12d01..d6501a187 100644 --- a/scheduler/scheduler/evaluator.go +++ b/scheduler/scheduler/evaluator.go @@ -183,7 +183,7 @@ func (e *evaluator) selectParentCandidates(peer *types.PeerTask) (list []*types. if pt == nil { return true } else if peer.Task != pt.Task { - msg = append(msg, fmt.Sprintf("%s task[%s] not same", pt.Pid, pt.Task.TaskId)) + msg = append(msg, fmt.Sprintf("%s task[%s] not same", pt.Pid, pt.Task.TaskID)) return true } else if pt.IsDown() { msg = append(msg, fmt.Sprintf("%s is down", pt.Pid)) @@ -213,7 +213,7 @@ func (e *evaluator) selectParentCandidates(peer *types.PeerTask) (list []*types. return true }) if len(list) == 0 { - logger.Debugf("[%s][%s] scheduler failed: \n%s", peer.Task.TaskId, peer.Pid, strings.Join(msg, "\n")) + logger.Debugf("[%s][%s] scheduler failed: \n%s", peer.Task.TaskID, peer.Pid, strings.Join(msg, "\n")) } return diff --git a/scheduler/scheduler/evaluator_factory.go b/scheduler/scheduler/evaluator_factory.go index c5399585c..00477f889 100644 --- a/scheduler/scheduler/evaluator_factory.go +++ b/scheduler/scheduler/evaluator_factory.go @@ -68,7 +68,7 @@ func (ef *evaluatorFactory) get(task *types.Task) Evaluator { if ef.abtest { name := "" - if strings.HasSuffix(task.TaskId, idgen.TwinsBSuffix) { + if strings.HasSuffix(task.TaskID, idgen.TwinsBSuffix) { if ef.bscheduler != "" { name = ef.bscheduler } diff --git a/scheduler/scheduler/scheduler.go b/scheduler/scheduler/scheduler.go index 103892c99..fba8534f6 100644 --- a/scheduler/scheduler/scheduler.go +++ b/scheduler/scheduler/scheduler.go @@ -125,15 +125,14 @@ func (s *Scheduler) ScheduleParent(peer *types.PeerTask) (primary *types.PeerTas peer.AddParent(primary, 1) s.taskManager.PeerTask.Update(primary) s.taskManager.PeerTask.Update(oldParent) - } else { - logger.Debugf("[%s][%s]SchedulerParent scheduler a empty parent", peer.Task.TaskId, peer.Pid) } + logger.Debugf("[%s][%s]SchedulerParent scheduler a empty parent", peer.Task.TaskID, peer.Pid) return } func (s *Scheduler) ScheduleBadNode(peer *types.PeerTask) (adjustNodes []*types.PeerTask, err error) { - logger.Debugf("[%s][%s]SchedulerBadNode scheduler node is bad", peer.Task.TaskId, peer.Pid) + logger.Debugf("[%s][%s]SchedulerBadNode scheduler node is bad", peer.Task.TaskID, peer.Pid) parent := peer.GetParent() if parent != nil && parent.DstPeerTask != nil { pNode := parent.DstPeerTask @@ -155,7 +154,7 @@ func (s *Scheduler) ScheduleBadNode(peer *types.PeerTask) (adjustNodes []*types. if node.GetParent() != nil { parentID = node.GetParent().DstPeerTask.Pid } - logger.Debugf("[%s][%s]SchedulerBadNode [%s] scheduler a new parent [%s]", peer.Task.TaskId, peer.Pid, + logger.Debugf("[%s][%s]SchedulerBadNode [%s] scheduler a new parent [%s]", peer.Task.TaskID, peer.Pid, node.Pid, parentID) } diff --git a/scheduler/server/scheduler_server.go b/scheduler/server/scheduler_server.go index 26b0ab547..c5e8fe755 100644 --- a/scheduler/server/scheduler_server.go +++ b/scheduler/server/scheduler_server.go @@ -28,13 +28,13 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc/scheduler" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/service" - "d7y.io/dragonfly/v2/scheduler/service/schedule_worker" + "d7y.io/dragonfly/v2/scheduler/service/worker" "d7y.io/dragonfly/v2/scheduler/types" ) type SchedulerServer struct { service *service.SchedulerService - worker schedule_worker.IWorker + worker worker.IWorker config config.SchedulerConfig } @@ -50,8 +50,8 @@ func WithSchedulerService(service *service.SchedulerService) Option { } } -// WithWorker sets the schedule_worker.IWorker -func WithWorker(worker schedule_worker.IWorker) Option { +// WithWorker sets the worker.IWorker +func WithWorker(worker worker.IWorker) Option { return func(p *SchedulerServer) *SchedulerServer { p.worker = worker @@ -101,11 +101,11 @@ func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *schedul task, ok := s.service.GetTask(pkg.TaskId) if !ok { task, err = s.service.AddTask(&types.Task{ - TaskId: pkg.TaskId, - Url: request.Url, + TaskID: pkg.TaskId, + URL: request.Url, Filter: request.Filter, - BizId: request.BizId, - UrlMata: request.UrlMata, + BizID: request.BizId, + URLMata: request.UrlMata, }) if err != nil { dferror, _ := err.(*dferrors.DfError) @@ -122,7 +122,7 @@ func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *schedul return } - pkg.TaskId = task.TaskId + pkg.TaskId = task.TaskID pkg.SizeScope = task.SizeScope // case base.SizeScope_TINY @@ -134,10 +134,22 @@ func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *schedul // get or create host hostID := request.PeerHost.Uuid host, _ := s.service.GetHost(hostID) + + peerHost := request.PeerHost if host == nil { host = &types.Host{ - Type: types.HostTypePeer, - PeerHost: *request.PeerHost, + Type: types.HostTypePeer, + PeerHost: scheduler.PeerHost{ + Uuid: peerHost.Uuid, + Ip: peerHost.Ip, + RpcPort: peerHost.RpcPort, + DownPort: peerHost.DownPort, + HostName: peerHost.HostName, + SecurityDomain: peerHost.SecurityDomain, + Location: peerHost.Location, + Idc: peerHost.Idc, + NetTopology: peerHost.NetTopology, + }, } if isCdn { host.Type = types.HostTypeCdn @@ -212,7 +224,7 @@ func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPie } return }() - err = schedule_worker.NewClient(stream, s.worker, s.service).Serve() + err = worker.NewClient(stream, s.worker, s.service).Serve() return } @@ -233,7 +245,7 @@ func (s *SchedulerServer) ReportPeerResult(ctx context.Context, result *schedule return }() - logger.Infof("[%s][%s]: receive a peer result [%+v]", result.TaskId, result.PeerId, *result) + logger.Infof("[%s][%s]: receive a peer result [%+v]", result.TaskId, result.PeerId, result) pid := result.PeerId peerTask, err := s.service.GetPeerTask(pid) diff --git a/scheduler/server/server.go b/scheduler/server/server.go index 2fa9babf4..109e81ce7 100644 --- a/scheduler/server/server.go +++ b/scheduler/server/server.go @@ -24,16 +24,18 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/manager" "d7y.io/dragonfly/v2/pkg/rpc/manager/client" + + // Server registered to grpc _ "d7y.io/dragonfly/v2/pkg/rpc/scheduler/server" "d7y.io/dragonfly/v2/pkg/util/net/iputils" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/service" - "d7y.io/dragonfly/v2/scheduler/service/schedule_worker" + "d7y.io/dragonfly/v2/scheduler/service/worker" ) type Server struct { service *service.SchedulerService - worker schedule_worker.IWorker + worker worker.IWorker server *SchedulerServer config config.ServerConfig managerClient client.ManagerClient @@ -90,7 +92,7 @@ func New(cfg *config.Config) (*Server, error) { return nil, err } - s.worker = schedule_worker.NewWorkerGroup(cfg, s.service) + s.worker = worker.NewGroup(cfg, s.service) s.server = NewSchedulerServer(cfg, WithSchedulerService(s.service), WithWorker(s.worker)) diff --git a/scheduler/service/service.go b/scheduler/service/service.go index d5dab154b..d8a7f32c8 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -64,13 +64,13 @@ func (s *SchedulerService) GetTask(taskID string) (*types.Task, bool) { func (s *SchedulerService) AddTask(task *types.Task) (*types.Task, error) { // Task already exists - if ret, ok := s.TaskManager.Get(task.TaskId); ok { + if ret, ok := s.TaskManager.Get(task.TaskID); ok { s.TaskManager.PeerTask.AddTask(ret) return ret, nil } // Task does not exist - ret := s.TaskManager.Set(task.TaskId, task) + ret := s.TaskManager.Set(task.TaskID, task) if err := s.CDNManager.TriggerTask(ret, s.TaskManager.PeerTask.CDNCallback); err != nil { return nil, err } diff --git a/scheduler/service/schedule_worker/client.go b/scheduler/service/worker/client.go similarity index 99% rename from scheduler/service/schedule_worker/client.go rename to scheduler/service/worker/client.go index e432712e8..c9c811edf 100644 --- a/scheduler/service/schedule_worker/client.go +++ b/scheduler/service/worker/client.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package schedule_worker +package worker import ( "io" diff --git a/scheduler/service/schedule_worker/sender.go b/scheduler/service/worker/sender.go similarity index 96% rename from scheduler/service/schedule_worker/sender.go rename to scheduler/service/worker/sender.go index 7f5481c24..b44dd7dd8 100644 --- a/scheduler/service/schedule_worker/sender.go +++ b/scheduler/service/worker/sender.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package schedule_worker +package worker import ( "fmt" @@ -107,14 +107,14 @@ func (s *Sender) doSend() { err = peerTask.Send() }() if err != nil && err.Error() == "empty client" { - logger.Warnf("[%s][%s]: client is empty : %v", peerTask.Task.TaskId, peerTask.Pid, err.Error()) + logger.Warnf("[%s][%s]: client is empty : %v", peerTask.Task.TaskID, peerTask.Pid, err.Error()) break } else if err != nil { //TODO error - logger.Warnf("[%s][%s]: send result failed : %v", peerTask.Task.TaskId, peerTask.Pid, err.Error()) + logger.Warnf("[%s][%s]: send result failed : %v", peerTask.Task.TaskID, peerTask.Pid, err.Error()) break } else { - logger.Debugf("[%s][%s]: send result success", peerTask.Task.TaskId, peerTask.Pid) + logger.Debugf("[%s][%s]: send result success", peerTask.Task.TaskID, peerTask.Pid) } if peerTask.Success { break diff --git a/scheduler/service/schedule_worker/woker.go b/scheduler/service/worker/woker.go similarity index 95% rename from scheduler/service/schedule_worker/woker.go rename to scheduler/service/worker/woker.go index a8a220285..c39c926ac 100644 --- a/scheduler/service/schedule_worker/woker.go +++ b/scheduler/service/worker/woker.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package schedule_worker +package worker import ( "fmt" @@ -187,13 +187,13 @@ func (w *Worker) doSchedule(peerTask *types.PeerTask) { startTm := time.Now() status := peerTask.GetNodeStatus() - logger.Debugf("[%s][%s]: begin do schedule [%d]", peerTask.Task.TaskId, peerTask.Pid, status) + logger.Debugf("[%s][%s]: begin do schedule [%d]", peerTask.Task.TaskID, peerTask.Pid, status) defer func() { err := recover() if err != nil { - logger.Errorf("[%s][%s]: do schedule panic: %v", peerTask.Task.TaskId, peerTask.Pid, err) + logger.Errorf("[%s][%s]: do schedule panic: %v", peerTask.Task.TaskID, peerTask.Pid, err) } - logger.Infof("[%s][%s]: end do schedule [%d] cost: %d", peerTask.Task.TaskId, peerTask.Pid, status, time.Now().Sub(startTm).Nanoseconds()) + logger.Infof("[%s][%s]: end do schedule [%d] cost: %d", peerTask.Task.TaskID, peerTask.Pid, status, time.Now().Sub(startTm).Nanoseconds()) }() switch status { @@ -209,7 +209,7 @@ func (w *Worker) doSchedule(peerTask *types.PeerTask) { case types.PeerTaskStatusNeedParent: parent, _, err := w.schedulerService.Scheduler.ScheduleParent(peerTask) if err != nil { - logger.Debugf("[%s][%s]: schedule parent failed: %v", peerTask.Task.TaskId, peerTask.Pid, err) + logger.Debugf("[%s][%s]: schedule parent failed: %v", peerTask.Task.TaskID, peerTask.Pid, err) } // retry scheduler parent later when this is no parent if parent == nil || err != nil { @@ -222,7 +222,7 @@ func (w *Worker) doSchedule(peerTask *types.PeerTask) { case types.PeerTaskStatusNeedChildren: children, err := w.schedulerService.Scheduler.ScheduleChildren(peerTask) if err != nil { - logger.Debugf("[%s][%s]: schedule children failed: %v", peerTask.Task.TaskId, peerTask.Pid, err) + logger.Debugf("[%s][%s]: schedule children failed: %v", peerTask.Task.TaskID, peerTask.Pid, err) return } for i := range children { @@ -238,7 +238,7 @@ func (w *Worker) doSchedule(peerTask *types.PeerTask) { case types.PeerTaskStatusBadNode: adjustNodes, err := w.schedulerService.Scheduler.ScheduleBadNode(peerTask) if err != nil { - logger.Debugf("[%s][%s]: schedule bad node failed: %v", peerTask.Task.TaskId, peerTask.Pid, err) + logger.Debugf("[%s][%s]: schedule bad node failed: %v", peerTask.Task.TaskID, peerTask.Pid, err) w.sendJobLater(peerTask) return } @@ -258,7 +258,7 @@ func (w *Worker) doSchedule(peerTask *types.PeerTask) { case types.PeerTaskStatusNeedAdjustNode: _, _, err := w.schedulerService.Scheduler.ScheduleAdjustParentNode(peerTask) if err != nil { - logger.Debugf("[%s][%s]: schedule adjust node failed: %v", peerTask.Task.TaskId, peerTask.Pid, err) + logger.Debugf("[%s][%s]: schedule adjust node failed: %v", peerTask.Task.TaskID, peerTask.Pid, err) w.sendJobLater(peerTask) return } @@ -269,7 +269,7 @@ func (w *Worker) doSchedule(peerTask *types.PeerTask) { if w.schedulerService.Scheduler.IsNodeBad(peerTask) && peerTask.GetSubTreeNodesNum() > 1 { adjustNodes, err := w.schedulerService.Scheduler.ScheduleBadNode(peerTask) if err != nil { - logger.Debugf("[%s][%s]: schedule bad node failed: %v", peerTask.Task.TaskId, peerTask.Pid, err) + logger.Debugf("[%s][%s]: schedule bad node failed: %v", peerTask.Task.TaskID, peerTask.Pid, err) peerTask.SetNodeStatus(types.PeerTaskStatusBadNode) w.sendJobLater(peerTask) return @@ -286,7 +286,7 @@ func (w *Worker) doSchedule(peerTask *types.PeerTask) { } else if w.schedulerService.Scheduler.NeedAdjustParent(peerTask) { _, _, err := w.schedulerService.Scheduler.ScheduleAdjustParentNode(peerTask) if err != nil { - logger.Debugf("[%s][%s]: schedule adjust node failed: %v", peerTask.Task.TaskId, peerTask.Pid, err) + logger.Debugf("[%s][%s]: schedule adjust node failed: %v", peerTask.Task.TaskID, peerTask.Pid, err) return } w.sendScheduleResult(peerTask) @@ -296,7 +296,7 @@ func (w *Worker) doSchedule(peerTask *types.PeerTask) { case types.PeerTaskStatusDone: parent, err := w.schedulerService.Scheduler.ScheduleDone(peerTask) if err != nil { - logger.Debugf("[%s][%s]: schedule adjust node failed: %v", peerTask.Task.TaskId, peerTask.Pid, err) + logger.Debugf("[%s][%s]: schedule adjust node failed: %v", peerTask.Task.TaskID, peerTask.Pid, err) w.sendJobLater(peerTask) return } @@ -308,12 +308,12 @@ func (w *Worker) doSchedule(peerTask *types.PeerTask) { case types.PeerTaskStatusLeaveNode, types.PeerTaskStatusNodeGone: adjustNodes, err := w.schedulerService.Scheduler.ScheduleLeaveNode(peerTask) if err != nil { - logger.Debugf("[%s][%s]: schedule adjust node failed: %v", peerTask.Task.TaskId, peerTask.Pid, err) + logger.Debugf("[%s][%s]: schedule adjust node failed: %v", peerTask.Task.TaskID, peerTask.Pid, err) w.sendJobLater(peerTask) return } w.schedulerService.TaskManager.PeerTask.Delete(peerTask.Pid) - logger.Debugf("[%s][%s]: PeerTaskStatusLeaveNode", peerTask.Task.TaskId, peerTask.Pid) + logger.Debugf("[%s][%s]: PeerTaskStatusLeaveNode", peerTask.Task.TaskID, peerTask.Pid) for i := range adjustNodes { if adjustNodes[i].GetParent() != nil { w.sendScheduleResult(adjustNodes[i]) @@ -339,7 +339,7 @@ func (w *Worker) sendScheduleResult(peerTask *types.PeerTask) { if peerTask.GetParent() != nil && peerTask.GetParent().DstPeerTask != nil { parent = peerTask.GetParent().DstPeerTask.Pid } - logger.Infof("[%s][%s]: sendScheduleResult parent[%s] active time[%d] deep [%d]", peerTask.Task.TaskId, peerTask.Pid, parent, time.Now().UnixNano()-peerTask.GetStartTime(), peerTask.GetDeep()) + logger.Infof("[%s][%s]: sendScheduleResult parent[%s] active time[%d] deep [%d]", peerTask.Task.TaskID, peerTask.Pid, parent, time.Now().UnixNano()-peerTask.GetStartTime(), peerTask.GetDeep()) w.sender.Send(peerTask) return } diff --git a/scheduler/service/schedule_worker/worker_group.go b/scheduler/service/worker/worker_group.go similarity index 85% rename from scheduler/service/schedule_worker/worker_group.go rename to scheduler/service/worker/worker_group.go index 6b1e5bc33..7688a3aa2 100644 --- a/scheduler/service/schedule_worker/worker_group.go +++ b/scheduler/service/worker/worker_group.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package schedule_worker +package worker import ( "hash/crc32" @@ -34,7 +34,7 @@ type IWorker interface { ReceiveUpdatePieceResult(pr *scheduler2.PieceResult) } -type WorkerGroup struct { +type Group struct { workerNum int chanSize int workerList []*Worker @@ -46,8 +46,8 @@ type WorkerGroup struct { schedulerService *service.SchedulerService } -func NewWorkerGroup(cfg *config.Config, schedulerService *service.SchedulerService) *WorkerGroup { - return &WorkerGroup{ +func NewGroup(cfg *config.Config, schedulerService *service.SchedulerService) *Group { + return &Group{ workerNum: cfg.Worker.WorkerNum, chanSize: cfg.Worker.WorkerJobPoolSize, sender: NewSender(cfg.Worker, schedulerService), @@ -56,7 +56,7 @@ func NewWorkerGroup(cfg *config.Config, schedulerService *service.SchedulerServi } } -func (wg *WorkerGroup) Serve() { +func (wg *Group) Serve() { wg.stopCh = make(chan struct{}) wg.schedulerService.TaskManager.PeerTask.SetDownloadingMonitorCallBack(func(pt *types.PeerTask) { @@ -85,22 +85,22 @@ func (wg *WorkerGroup) Serve() { logger.Infof("start scheduler worker number:%d", wg.workerNum) } -func (wg *WorkerGroup) Stop() { +func (wg *Group) Stop() { close(wg.stopCh) wg.sender.Stop() wg.triggerLoadQueue.ShutDown() logger.Infof("stop scheduler worker") } -func (wg *WorkerGroup) ReceiveJob(job *types.PeerTask) { +func (wg *Group) ReceiveJob(job *types.PeerTask) { if job == nil { return } - choiceWorkerID := crc32.ChecksumIEEE([]byte(job.Task.TaskId)) % uint32(wg.workerNum) + choiceWorkerID := crc32.ChecksumIEEE([]byte(job.Task.TaskID)) % uint32(wg.workerNum) wg.workerList[choiceWorkerID].ReceiveJob(job) } -func (wg *WorkerGroup) ReceiveUpdatePieceResult(pr *scheduler2.PieceResult) { +func (wg *Group) ReceiveUpdatePieceResult(pr *scheduler2.PieceResult) { if pr == nil { return } diff --git a/scheduler/types/host.go b/scheduler/types/host.go index 183d82df5..b120f70e6 100644 --- a/scheduler/types/host.go +++ b/scheduler/types/host.go @@ -45,11 +45,10 @@ type Host struct { ServiceDownTime int64 } -func CopyHost(h *Host) *Host { - copyHost := *h - copyHost.peerTaskMap = new(sync.Map) - copyHost.loadLock = new(sync.Mutex) - return ©Host +func Init(h *Host) *Host { + h.loadLock = &sync.Mutex{} + h.peerTaskMap = &sync.Map{} + return h } func (h *Host) AddPeerTask(peerTask *PeerTask) { diff --git a/scheduler/types/peer_task.go b/scheduler/types/peer_task.go index 4de344fd6..02ac390de 100644 --- a/scheduler/types/peer_task.go +++ b/scheduler/types/peer_task.go @@ -299,7 +299,7 @@ func (pt *PeerTask) GetSendPkg() (pkg *scheduler.PeerPacket) { // if pt != nil && pt.client != nil { pkg = &scheduler.PeerPacket{ Code: dfcodes.Success, - TaskId: pt.Task.TaskId, + TaskId: pt.Task.TaskID, // source peer id SrcPid: pt.Pid, // concurrent downloading count from main peer @@ -308,10 +308,9 @@ func (pt *PeerTask) GetSendPkg() (pkg *scheduler.PeerPacket) { defer pt.lock.Unlock() if pt.parent != nil && pt.parent.DstPeerTask != nil && pt.parent.DstPeerTask.Host != nil { pkg.ParallelCount = int32(pt.parent.Concurrency) - peerHost := pt.parent.DstPeerTask.Host.PeerHost pkg.MainPeer = &scheduler.PeerPacket_DestPeer{ - Ip: peerHost.Ip, - RpcPort: peerHost.RpcPort, + Ip: pt.parent.DstPeerTask.Host.PeerHost.Ip, + RpcPort: pt.parent.DstPeerTask.Host.PeerHost.RpcPort, PeerId: pt.parent.DstPeerTask.Pid, } } diff --git a/scheduler/types/task.go b/scheduler/types/task.go index 77a161303..e441c8d77 100644 --- a/scheduler/types/task.go +++ b/scheduler/types/task.go @@ -27,14 +27,14 @@ import ( ) type Task struct { - TaskId string `json:"task_id,omitempty"` - Url string `json:"url,omitempty"` + TaskID string `json:"task_id,omitempty"` + URL string `json:"url,omitempty"` // regex format, used for task id generator, assimilating different urls Filter string `json:"filter,omitempty"` // biz_id and md5 are used for task id generator to distinguish the same urls // md5 is also used to check consistency about file content - BizId string `json:"biz_id,omitempty"` // caller's biz id that can be any string - UrlMata *base.UrlMeta `json:"url_mata,omitempty"` // downloaded file content md5 + BizID string `json:"biz_id,omitempty"` // caller's biz id that can be any string + URLMata *base.UrlMeta `json:"url_mata,omitempty"` // downloaded file content md5 SizeScope base.SizeScope DirectPiece *scheduler.RegisterResult_PieceContent