storage: take lock files into consideration while garbage collecting

Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>
This commit is contained in:
Sanskar Jaiswal 2022-12-19 19:18:49 +05:30
parent 61e9123691
commit bdd08bcb72
2 changed files with 25 additions and 16 deletions

View File

@ -159,18 +159,17 @@ func (s *Storage) RemoveAllButCurrent(artifact sourcev1.Artifact) ([]string, err
// getGarbageFiles returns all files that need to be garbage collected for the given artifact. // getGarbageFiles returns all files that need to be garbage collected for the given artifact.
// Garbage files are determined based on the below flow: // Garbage files are determined based on the below flow:
// 1. collect all files with an expired ttl // 1. collect all artifact files with an expired ttl
// 2. if we satisfy maxItemsToBeRetained, then return // 2. if we satisfy maxItemsToBeRetained, then return
// 3. else, remove all files till the latest n files remain, where n=maxItemsToBeRetained // 3. else, collect all artifact files till the latest n files remain, where n=maxItemsToBeRetained
func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, maxItemsToBeRetained int, ttl time.Duration) ([]string, error) { func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, maxItemsToBeRetained int, ttl time.Duration) (garbageFiles []string, _ error) {
localPath := s.LocalPath(artifact) localPath := s.LocalPath(artifact)
dir := filepath.Dir(localPath) dir := filepath.Dir(localPath)
garbageFiles := []string{} artifactFilesWithCreatedTs := make(map[time.Time]string)
filesWithCreatedTs := make(map[time.Time]string)
// sortedPaths contain all files sorted according to their created ts. // sortedPaths contain all files sorted according to their created ts.
sortedPaths := []string{} sortedPaths := []string{}
now := time.Now().UTC() now := time.Now().UTC()
totalFiles := 0 totalArtifactFiles := 0
var errors []string var errors []string
creationTimestamps := []time.Time{} creationTimestamps := []time.Time{}
_ = filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error { _ = filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error {
@ -178,8 +177,8 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, m
errors = append(errors, err.Error()) errors = append(errors, err.Error())
return nil return nil
} }
if totalFiles >= totalCountLimit { if totalArtifactFiles >= totalCountLimit {
return fmt.Errorf("reached file walking limit, already walked over: %d", totalFiles) return fmt.Errorf("reached file walking limit, already walked over: %d", totalArtifactFiles)
} }
info, err := d.Info() info, err := d.Info()
if err != nil { if err != nil {
@ -189,14 +188,16 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, m
createdAt := info.ModTime().UTC() createdAt := info.ModTime().UTC()
diff := now.Sub(createdAt) diff := now.Sub(createdAt)
// Compare the time difference between now and the time at which the file was created // Compare the time difference between now and the time at which the file was created
// with the provided TTL. Delete if the difference is greater than the TTL. // with the provided TTL. Delete if the difference is greater than the TTL. Since the
// below logic just deals with determining if an artifact needs to be garbage collected,
// we avoid all lock files, adding them at the end to the list of garbage files.
expired := diff > ttl expired := diff > ttl
if !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink { if !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink && filepath.Ext(path) != ".lock" {
if path != localPath && expired { if path != localPath && expired {
garbageFiles = append(garbageFiles, path) garbageFiles = append(garbageFiles, path)
} }
totalFiles += 1 totalArtifactFiles += 1
filesWithCreatedTs[createdAt] = path artifactFilesWithCreatedTs[createdAt] = path
creationTimestamps = append(creationTimestamps, createdAt) creationTimestamps = append(creationTimestamps, createdAt)
} }
return nil return nil
@ -208,14 +209,14 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, m
// We already collected enough garbage files to satisfy the no. of max // We already collected enough garbage files to satisfy the no. of max
// items that are supposed to be retained, so exit early. // items that are supposed to be retained, so exit early.
if totalFiles-len(garbageFiles) < maxItemsToBeRetained { if totalArtifactFiles-len(garbageFiles) < maxItemsToBeRetained {
return garbageFiles, nil return garbageFiles, nil
} }
// sort all timestamps in an ascending order. // sort all timestamps in an ascending order.
sort.Slice(creationTimestamps, func(i, j int) bool { return creationTimestamps[i].Before(creationTimestamps[j]) }) sort.Slice(creationTimestamps, func(i, j int) bool { return creationTimestamps[i].Before(creationTimestamps[j]) })
for _, ts := range creationTimestamps { for _, ts := range creationTimestamps {
path, ok := filesWithCreatedTs[ts] path, ok := artifactFilesWithCreatedTs[ts]
if !ok { if !ok {
return garbageFiles, fmt.Errorf("failed to fetch file for created ts: %v", ts) return garbageFiles, fmt.Errorf("failed to fetch file for created ts: %v", ts)
} }
@ -225,7 +226,7 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, m
var collected int var collected int
noOfGarbageFiles := len(garbageFiles) noOfGarbageFiles := len(garbageFiles)
for _, path := range sortedPaths { for _, path := range sortedPaths {
if path != localPath && !stringInSlice(path, garbageFiles) { if path != localPath && filepath.Ext(path) != ".lock" && !stringInSlice(path, garbageFiles) {
// If we previously collected a few garbage files with an expired ttl, then take that into account // If we previously collected a few garbage files with an expired ttl, then take that into account
// when checking whether we need to remove more files to satisfy the max no. of items allowed // when checking whether we need to remove more files to satisfy the max no. of items allowed
// in the filesystem, along with the no. of files already removed in this loop. // in the filesystem, along with the no. of files already removed in this loop.
@ -271,6 +272,14 @@ func (s *Storage) GarbageCollect(ctx context.Context, artifact sourcev1.Artifact
} else { } else {
deleted = append(deleted, file) deleted = append(deleted, file)
} }
// If a lock file exists for this garbage artifact, remove that too.
lockFile := file + ".lock"
if _, err = os.Lstat(lockFile); err == nil {
err = os.Remove(lockFile)
if err != nil {
errors = append(errors, err)
}
}
} }
} }
if len(errors) > 0 { if len(errors) > 0 {

View File

@ -135,7 +135,7 @@ func main() {
flag.StringSliceVar(&git.HostKeyAlgos, "ssh-hostkey-algos", []string{}, flag.StringSliceVar(&git.HostKeyAlgos, "ssh-hostkey-algos", []string{},
"The list of hostkey algorithms to use for ssh connections, arranged from most preferred to the least.") "The list of hostkey algorithms to use for ssh connections, arranged from most preferred to the least.")
flag.DurationVar(&artifactRetentionTTL, "artifact-retention-ttl", 60*time.Second, flag.DurationVar(&artifactRetentionTTL, "artifact-retention-ttl", 60*time.Second,
"The duration of time that artifacts will be kept in storage before being garbage collected.") "The duration of time that artifacts from previous reconcilations will be kept in storage before being garbage collected.")
flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2, flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2,
"The maximum number of artifacts to be kept in storage after a garbage collection.") "The maximum number of artifacts to be kept in storage after a garbage collection.")