func/pkg/functions/job.go

134 lines
3.6 KiB
Go

package functions
import (
"errors"
"fmt"
"net"
"os"
"path/filepath"
"strconv"
)
const runsDir = "runs"
// Job represents a running function job (presumably started by this process'
// Runner instance.
// In order for this to function along with the noop runner used by client,
// the zero value of the struct is set up to noop without errors.
type Job struct {
Function Function
Host string
Port string
Errors chan error
onStop func() error
verbose bool
}
// Create a new Job which represents a running function task by providing
// the port on which it was started, a channel on which runtime errors can
// be received, and a stop function.
func NewJob(f Function, host, port string, errs chan error, onStop func() error, verbose bool) (j *Job, err error) {
j = &Job{
Function: f,
Host: host,
Port: port,
Errors: errs,
onStop: onStop,
verbose: verbose,
}
if !f.Initialized() {
return j, errors.New("initialized function required to create job")
}
if j.Port == "" {
return j, errors.New("port required to create job")
}
if j.Errors == nil {
j.Errors = make(chan error, 1)
}
if j.onStop == nil {
j.onStop = func() error { return nil }
}
if err = cleanupJobDirs(j); err != nil {
return
}
if j.verbose {
fmt.Printf("mkdir -p %v\n", j.Dir())
}
return j, os.MkdirAll(j.Dir(), os.ModePerm)
}
// Stop the Job, running the provided stop delegate and removing runtime
// metadata from disk.
func (j *Job) Stop() error {
if j.verbose {
fmt.Printf("rm %v\n", j.Dir())
}
if err := os.RemoveAll(j.Dir()); err != nil {
fmt.Fprintf(os.Stderr, "warning: unable to remove run directory. %v", err)
}
return j.onStop()
}
// Directory within which all data about this current job is placed.
// ${f.Root}/.func/runs/${j.Port}
func (j *Job) Dir() string {
return filepath.Join(funcJobsDir(j.Function), j.Port)
}
// Directory within which all runs (jobs) are held for the given function.
// ${f.Root}/.func/runs/
func funcJobsDir(f Function) string {
return filepath.Join(f.Root, RunDataDir, runsDir)
}
// cleanupJobDirs removes any orphaned jobs' disk representation
func cleanupJobDirs(j *Job) error {
dd, _ := os.ReadDir(funcJobsDir(j.Function))
for _, d := range dd {
if !d.IsDir() {
continue // ignore files in the directory (like a readme)
}
if _, err := strconv.Atoi(d.Name()); err != nil {
continue // ignore directories that aren't integers (ports)
}
ln, err := net.Listen("tcp", ":"+d.Name())
if err != nil {
continue // ignore if we can't bind to the port (running or invalid port)
}
_ = ln.Close()
orphanedJobDir := filepath.Join(funcJobsDir(j.Function), d.Name())
if j.verbose {
fmt.Printf("No process listening on port %v. Removing its job directory\n", d.Name())
fmt.Printf("rm %v\n", orphanedJobDir)
}
return os.RemoveAll(orphanedJobDir)
}
return nil
}
// jobPorts returns all the ports on which an instance of the given function is
// running. len is 0 when not running.
// Improperly initialized or nonexistent (zero value) functions are considered
// to not be running.
func jobPorts(f Function) []string {
if f.Root == "" || !f.Initialized() {
return []string{}
}
jobsDir := funcJobsDir(f)
if _, err := os.Stat(jobsDir); err != nil {
return []string{} // never started, so path does not exist
}
files, err := os.ReadDir(jobsDir)
if err != nil {
fmt.Fprintf(os.Stderr, "error reading %v", jobsDir)
return []string{}
}
ports := []string{}
for _, f := range files {
ports = append(ports, f.Name())
}
// TODO: validate it's a directory whose name parses as an integer?
return ports
}