dragonfly/pkg/gc/gc.go

165 lines
3.1 KiB
Go

/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//go:generate mockgen -destination gc_mock.go -source gc.go -package gc
package gc
import (
"context"
"fmt"
"sync"
"time"
)
type ContextKey string
func (ck ContextKey) String() string {
return string(ck)
}
var (
// ContextKeyUserID is the key for user ID in context.
ContextKeyUserID ContextKey = "user_id"
// ContextKeyTaskID is the key for task ID in context.
ContextKeyTaskID ContextKey = "task_id"
)
// GC is the interface used for release resource.
type GC interface {
// Add adds GC task.
Add(Task) error
// Run GC task.
Run(context.Context, string) error
// Run all registered GC tasks.
RunAll(context.Context)
// Start running the GC task.
Start(context.Context)
// Stop running the GC task.
Stop()
}
// GC provides task release function.
type gc struct {
tasks *sync.Map
logger Logger
done chan struct{}
}
// Option is a functional option for configuring the GC.
type Option func(g *gc)
// WithLogger set the logger for GC.
func WithLogger(logger Logger) Option {
return func(g *gc) {
g.logger = logger
}
}
// New returns a new GC instance.
func New(options ...Option) GC {
g := &gc{
tasks: &sync.Map{},
logger: &gcLogger{},
done: make(chan struct{}),
}
for _, opt := range options {
opt(g)
}
return g
}
func (g gc) Add(t Task) error {
if err := t.validate(); err != nil {
return err
}
g.tasks.Store(t.ID, t)
return nil
}
func (g gc) Run(ctx context.Context, id string) error {
v, ok := g.tasks.Load(id)
if !ok {
return fmt.Errorf("can not find task %s", id)
}
go g.run(ctx, v.(Task))
return nil
}
func (g gc) RunAll(ctx context.Context) {
g.runAll(ctx)
}
func (g gc) Start(ctx context.Context) {
g.tasks.Range(func(k, v any) bool {
go func() {
task := v.(Task)
tick := time.NewTicker(task.Interval)
for {
select {
case <-tick.C:
g.run(ctx, task)
case <-g.done:
g.logger.Infof("%s GC stop", k)
return
}
}
}()
return true
})
}
func (g gc) Stop() {
close(g.done)
}
func (g gc) runAll(ctx context.Context) {
g.tasks.Range(func(k, v any) bool {
go g.run(ctx, v.(Task))
return true
})
}
func (g gc) run(ctx context.Context, t Task) {
done := make(chan struct{})
go func() {
g.logger.Infof("%s GC start", t.ID)
defer close(done)
if err := t.Runner.RunGC(ctx); err != nil {
g.logger.Errorf("%s GC error: %v", t.ID, err)
return
}
}()
select {
case <-time.After(t.Timeout):
g.logger.Infof("%s GC timeout", t.ID)
case <-done:
g.logger.Infof("%s GC done", t.ID)
}
}