dragonfly/scheduler/storage/storage.go

464 lines
11 KiB
Go

/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//go:generate mockgen -destination mocks/storage_mock.go -source storage.go -package mocks
package storage
import (
"errors"
"fmt"
"io"
"io/fs"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"sort"
"sync"
"time"
"github.com/gocarina/gocsv"
logger "d7y.io/dragonfly/v2/internal/dflog"
pkgio "d7y.io/dragonfly/v2/pkg/io"
)
const (
// DefaultMaxSize is the default maximum size of record file.
DefaultMaxSize = 100
// DefaultMaxBackups is the default maximum count of backup.
DefaultMaxBackups = 10
// DefaultBufferSize is the default size of buffer container.
DefaultBufferSize = 100
)
const (
// RecordFilePrefix is prefix of record file name.
RecordFilePrefix = "record"
// RecordFileExt is extension of record file name.
RecordFileExt = "csv"
)
const (
// Peer has been downloaded successfully.
PeerStateSucceeded = iota
// Peer has been downloaded failed.
PeerStateFailed
// Peer has been back-to-source downloaded successfully.
PeerStateBackToSourceSucceeded
// Peer has been back-to-source downloaded failed.
PeerStateBackToSourceFailed
)
const (
// megabyte is the converted factor of MaxSize and bytes.
megabyte = 1024 * 1024
// backupTimeFormat is the timestamp format of backup filename.
backupTimeFormat = "2006-01-02T15-04-05.000"
)
// Record contains content for record.
type Record struct {
// ID is peer id.
ID string `csv:"id"`
// IP is host ip.
IP string `csv:"ip"`
// Hostname is host name.
Hostname string `csv:"hostname"`
// Tag is peer tag.
Tag string `csv:"tag"`
// Cost is the task download time(millisecond).
Cost uint32 `csv:"cost"`
// PieceCount is total piece count.
PieceCount int32 `csv:"pieceCount"`
// TotalPieceCount is total piece count.
TotalPieceCount int32 `csv:"totalPieceCount"`
// ContentLength is task total content length.
ContentLength int64 `csv:"contentLength"`
// SecurityDomain is security domain of host.
SecurityDomain string `csv:"securityDomain"`
// IDC is internet data center of host.
IDC string `csv:"idc"`
// NetTopology is network topology of host.
// Example: switch|router|...
NetTopology string `csv:"netTopology"`
// Location is location of host.
// Example: country|province|...
Location string `csv:"location"`
// FreeUploadCount is free upload count of host.
FreeUploadCount int32 `csv:"freeUpoladLoad"`
// State is the download state of the peer.
State int `csv:"state"`
// HostType is peer host type.
HostType int `csv:"hostType"`
// CreateAt is peer create nanosecond time.
CreateAt int64 `csv:"createAt"`
// UpdateAt is peer update nanosecond time.
UpdateAt int64 `csv:"updateAt"`
// ParentID is parent peer id.
ParentID string `csv:"parentID"`
// ParentIP is parent host ip.
ParentIP string `csv:"parentIP"`
// ParentHostname is parent hostname.
ParentHostname string `csv:"parentHostname"`
// ParentTag is parent peer tag.
ParentTag string `csv:"parentTag"`
// ParentPieceCount is parent total piece count.
ParentPieceCount int32 `csv:"parentPieceCount"`
// ParentSecurityDomain is parent security domain of host.
ParentSecurityDomain string `csv:"parentSecurityDomain"`
// ParentIDC is parent internet data center of host.
ParentIDC string `csv:"parentIDC"`
// ParentNetTopology is parent network topology of host.
// Example: switch|router|...
ParentNetTopology string `csv:"parentNetTopology"`
// ParentLocation is parent location of host.
// Example: country|province|...
ParentLocation string `csv:"parentLocation"`
// ParentFreeUploadCount is parent free upload count of host.
ParentFreeUploadCount int32 `csv:"parentFreeUploadCount"`
// ParentUploadCount is parent total upload count.
ParentUploadCount int64
// ParentUploadFailedCount is parent upload failed count.
ParentUploadFailedCount int64
// ParentHostType is parent host type.
ParentHostType int `csv:"parentHostType"`
// ParentCreateAt is parent peer create nanosecond time.
ParentCreateAt int64 `csv:"parentCreateAt"`
// ParentUpdateAt is parent peer update nanosecond time.
ParentUpdateAt int64 `csv:"parentUpdateAt"`
}
// Storage is the interface used for storage.
type Storage interface {
// Create inserts the record into csv file.
Create(Record) error
// List returns all of records in csv file.
List() ([]Record, error)
// Count returns the count of records.
Count() int64
// Open opens storage for read, it returns io.ReadCloser of storage files.
Open() (io.ReadCloser, error)
// Clear removes all record files.
Clear() error
}
// storage provides storage function.
type storage struct {
baseDir string
filename string
maxSize int64
maxBackups int
buffer []Record
bufferSize int
count int64
mu *sync.RWMutex
}
// Option is a functional option for configuring the Storage.
type Option func(s *storage)
// WithMaxSize sets the maximum size in megabytes of storage file.
func WithMaxSize(maxSize int) Option {
return func(s *storage) {
s.maxSize = int64(maxSize * megabyte)
}
}
// WithMaxBackups sets the maximum number of storage files to retain.
func WithMaxBackups(maxBackups int) Option {
return func(s *storage) {
s.maxBackups = maxBackups
}
}
// WithCacheSize sets the size of buffer container,
// if the buffer is full, write all the records in the buffer to the file.
func WithBufferSize(bufferSize int) Option {
return func(s *storage) {
s.bufferSize = bufferSize
s.buffer = make([]Record, 0, bufferSize)
}
}
// New returns a new Storage instence.
func New(baseDir string, options ...Option) (Storage, error) {
s := &storage{
baseDir: baseDir,
filename: filepath.Join(baseDir, fmt.Sprintf("%s.%s", RecordFilePrefix, RecordFileExt)),
maxSize: DefaultMaxSize * megabyte,
maxBackups: DefaultMaxBackups,
buffer: make([]Record, 0, DefaultBufferSize),
bufferSize: DefaultBufferSize,
mu: &sync.RWMutex{},
}
for _, opt := range options {
opt(s)
}
file, err := os.OpenFile(s.filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return nil, err
}
file.Close()
return s, nil
}
// Create inserts the record into csv file.
func (s *storage) Create(record Record) error {
s.mu.Lock()
defer s.mu.Unlock()
// Write without buffer.
if s.bufferSize == 0 {
if err := s.create(s.buffer...); err != nil {
return err
}
// Update record count.
s.count++
return nil
}
// Write records to file.
if len(s.buffer) >= s.bufferSize {
if err := s.create(s.buffer...); err != nil {
return err
}
// Update record count.
s.count += int64(s.bufferSize)
// Keep allocated memory.
s.buffer = s.buffer[:0]
}
// Write records to buffer.
s.buffer = append(s.buffer, record)
return nil
}
// List returns all of records in csv file.
func (s *storage) List() ([]Record, error) {
s.mu.RLock()
defer s.mu.RUnlock()
fileInfos, err := s.backups()
if err != nil {
return nil, err
}
var readers []io.Reader
var readClosers []io.ReadCloser
defer func() {
for _, readCloser := range readClosers {
if err := readCloser.Close(); err != nil {
logger.Error(err)
}
}
}()
for _, fileInfo := range fileInfos {
file, err := os.Open(filepath.Join(s.baseDir, fileInfo.Name()))
if err != nil {
return nil, err
}
readers = append(readers, file)
readClosers = append(readClosers, file)
}
var records []Record
if err := gocsv.UnmarshalWithoutHeaders(io.MultiReader(readers...), &records); err != nil {
return nil, err
}
return records, nil
}
// Count returns the count of records.
func (s *storage) Count() int64 {
return s.count
}
// Open opens storage for read, it returns io.ReadCloser of storage files.
func (s *storage) Open() (io.ReadCloser, error) {
s.mu.RLock()
defer s.mu.RUnlock()
fileInfos, err := s.backups()
if err != nil {
return nil, err
}
var readClosers []io.ReadCloser
for _, fileInfo := range fileInfos {
file, err := os.Open(filepath.Join(s.baseDir, fileInfo.Name()))
if err != nil {
return nil, err
}
readClosers = append(readClosers, file)
}
return pkgio.MultiReadCloser(readClosers...), nil
}
// Clear removes all records.
func (s *storage) Clear() error {
s.mu.Lock()
defer s.mu.Unlock()
fileInfos, err := s.backups()
if err != nil {
return err
}
for _, fileInfo := range fileInfos {
filename := filepath.Join(s.baseDir, fileInfo.Name())
if err := os.Remove(filename); err != nil {
return err
}
}
return nil
}
// create inserts the records into csv file.
func (s *storage) create(records ...Record) error {
file, err := s.openFile()
if err != nil {
return err
}
defer file.Close()
if err := gocsv.MarshalWithoutHeaders(records, file); err != nil {
return err
}
return nil
}
// openFile opens the record file and removes record files that exceed the total size.
func (s *storage) openFile() (*os.File, error) {
fileInfo, err := os.Stat(s.filename)
if err != nil {
return nil, err
}
if s.maxSize <= fileInfo.Size() {
if err := os.Rename(s.filename, s.backupFilename()); err != nil {
return nil, err
}
}
fileInfos, err := s.backups()
if err != nil {
return nil, err
}
if s.maxBackups < len(fileInfos)+1 {
filename := filepath.Join(s.baseDir, fileInfos[0].Name())
if err := os.Remove(filename); err != nil {
return nil, err
}
}
file, err := os.OpenFile(s.filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
return nil, err
}
return file, nil
}
// backupFilename generates file name of backup files.
func (s *storage) backupFilename() string {
timestamp := time.Now().Format(backupTimeFormat)
return filepath.Join(s.baseDir, fmt.Sprintf("%s-%s.%s", RecordFilePrefix, timestamp, RecordFileExt))
}
// backupFilename returns backup file information.
func (s *storage) backups() ([]fs.FileInfo, error) {
fileInfos, err := ioutil.ReadDir(s.baseDir)
if err != nil {
return nil, err
}
var backups []fs.FileInfo
regexp := regexp.MustCompile(RecordFilePrefix)
for _, fileInfo := range fileInfos {
if !fileInfo.IsDir() && regexp.MatchString(fileInfo.Name()) {
backups = append(backups, fileInfo)
}
}
if len(backups) <= 0 {
return nil, errors.New("backup does not exist")
}
sort.Slice(backups, func(i, j int) bool {
return backups[i].ModTime().Before(backups[j].ModTime())
})
return backups, nil
}