libimage: add an events system
Add an event system to libimage. Callers can opt-in to using events by requesting an event channel via `(*Runtime).EventChannel()`. The returned channel has a buffer of size 100 which should be sufficient even under high loads. But, to be on the safe side, writing an event will time out after 2 seconds to prevent operations from blocking. Currently, the only user of such an event system is Podman which will need to convert the `Event` type to what's used internally in libpod. Signed-off-by: Valentin Rothberg <rothberg@redhat.com>
This commit is contained in:
parent
bd198b4d92
commit
bb4c4ab9c0
|
|
@ -1,6 +1,10 @@
|
|||
package libimage
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// EventType indicates the type of an event. Currrently, there is only one
|
||||
// supported type for container image but we may add more (e.g., for manifest
|
||||
|
|
@ -41,3 +45,18 @@ type Event struct {
|
|||
// Type of the event.
|
||||
Type EventType
|
||||
}
|
||||
|
||||
// writeEvent writes the specified event to the Runtime's event channel. The
|
||||
// event is discarded if no event channel has been registered (yet).
|
||||
func (r *Runtime) writeEvent(event *Event) {
|
||||
select {
|
||||
case r.eventChannel <- event:
|
||||
// Done
|
||||
case <-time.After(2 * time.Second):
|
||||
// The Runtime's event channel has a buffer of size 100 which
|
||||
// should be enough even under high load. However, we
|
||||
// shouldn't block too long in case the buffer runs full (could
|
||||
// be an honest user error or bug).
|
||||
logrus.Warnf("Discarding libimage event which was not read within 2 seconds: %v", event)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -277,6 +277,10 @@ func (i *Image) remove(ctx context.Context, rmMap map[string]*RemoveImageReport,
|
|||
return errors.Errorf("cannot remove read-only image %q", i.ID())
|
||||
}
|
||||
|
||||
if i.runtime.eventChannel != nil {
|
||||
i.runtime.writeEvent(&Event{ID: i.ID(), Name: referencedBy, Time: time.Now(), Type: EventTypeImageRemove})
|
||||
}
|
||||
|
||||
// Check if already visisted this image.
|
||||
report, exists := rmMap[i.ID()]
|
||||
if exists {
|
||||
|
|
@ -423,6 +427,9 @@ func (i *Image) Tag(name string) error {
|
|||
}
|
||||
|
||||
logrus.Debugf("Tagging image %s with %q", i.ID(), ref.String())
|
||||
if i.runtime.eventChannel != nil {
|
||||
i.runtime.writeEvent(&Event{ID: i.ID(), Name: name, Time: time.Now(), Type: EventTypeImageTag})
|
||||
}
|
||||
|
||||
newNames := append(i.Names(), ref.String())
|
||||
if err := i.runtime.store.SetNames(i.ID(), newNames); err != nil {
|
||||
|
|
@ -454,6 +461,9 @@ func (i *Image) Untag(name string) error {
|
|||
name = ref.String()
|
||||
|
||||
logrus.Debugf("Untagging %q from image %s", ref.String(), i.ID())
|
||||
if i.runtime.eventChannel != nil {
|
||||
i.runtime.writeEvent(&Event{ID: i.ID(), Name: name, Time: time.Now(), Type: EventTypeImageUntag})
|
||||
}
|
||||
|
||||
removedName := false
|
||||
newNames := []string{}
|
||||
|
|
@ -593,6 +603,10 @@ func (i *Image) RepoDigests() ([]string, error) {
|
|||
// are directly passed down to the containers storage. Returns the fully
|
||||
// evaluated path to the mount point.
|
||||
func (i *Image) Mount(ctx context.Context, mountOptions []string, mountLabel string) (string, error) {
|
||||
if i.runtime.eventChannel != nil {
|
||||
i.runtime.writeEvent(&Event{ID: i.ID(), Name: "", Time: time.Now(), Type: EventTypeImageMount})
|
||||
}
|
||||
|
||||
mountPoint, err := i.runtime.store.MountImage(i.ID(), mountOptions, mountLabel)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
|
@ -634,6 +648,9 @@ func (i *Image) Mountpoint() (string, error) {
|
|||
// Unmount the image. Use force to ignore the reference counter and forcefully
|
||||
// unmount.
|
||||
func (i *Image) Unmount(force bool) error {
|
||||
if i.runtime.eventChannel != nil {
|
||||
i.runtime.writeEvent(&Event{ID: i.ID(), Name: "", Time: time.Now(), Type: EventTypeImageUnmount})
|
||||
}
|
||||
logrus.Debugf("Unmounted image %s", i.ID())
|
||||
_, err := i.runtime.store.UnmountImage(i.ID(), force)
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
dirTransport "github.com/containers/image/v5/directory"
|
||||
dockerArchiveTransport "github.com/containers/image/v5/docker/archive"
|
||||
|
|
@ -23,6 +24,8 @@ type LoadOptions struct {
|
|||
func (r *Runtime) Load(ctx context.Context, path string, options *LoadOptions) ([]string, error) {
|
||||
logrus.Debugf("Loading image from %q", path)
|
||||
|
||||
r.writeEvent(&Event{ID: "", Name: path, Time: time.Now(), Type: EventTypeImageLoad})
|
||||
|
||||
var (
|
||||
loadedImages []string
|
||||
loadError error
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package libimage
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/containers/common/libimage/manifests"
|
||||
imageCopy "github.com/containers/image/v5/copy"
|
||||
|
|
@ -364,6 +365,10 @@ func (m *ManifestList) Push(ctx context.Context, destination string, options *Ma
|
|||
}
|
||||
}
|
||||
|
||||
if m.image.runtime.eventChannel != nil {
|
||||
m.image.runtime.writeEvent(&Event{ID: m.ID(), Name: destination, Time: time.Now(), Type: EventTypeImagePush})
|
||||
}
|
||||
|
||||
// NOTE: we're using the logic in copier to create a proper
|
||||
// types.SystemContext. This prevents us from having an error prone
|
||||
// code duplicate here.
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containers/common/pkg/config"
|
||||
dirTransport "github.com/containers/image/v5/directory"
|
||||
|
|
@ -80,6 +81,10 @@ func (r *Runtime) Pull(ctx context.Context, name string, pullPolicy config.PullP
|
|||
return nil, errors.Errorf("pulling all tags is not supported for %s transport", ref.Transport().Name())
|
||||
}
|
||||
|
||||
if r.eventChannel != nil {
|
||||
r.writeEvent(&Event{ID: "", Name: name, Time: time.Now(), Type: EventTypeImagePull})
|
||||
}
|
||||
|
||||
var (
|
||||
pulledImages []string
|
||||
pullError error
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package libimage
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
dockerArchiveTransport "github.com/containers/image/v5/docker/archive"
|
||||
"github.com/containers/image/v5/docker/reference"
|
||||
|
|
@ -61,6 +62,10 @@ func (r *Runtime) Push(ctx context.Context, source, destination string, options
|
|||
destRef = dockerRef
|
||||
}
|
||||
|
||||
if r.eventChannel != nil {
|
||||
r.writeEvent(&Event{ID: image.ID(), Name: destination, Time: time.Now(), Type: EventTypeImagePush})
|
||||
}
|
||||
|
||||
// Buildah compat: Make sure to tag the destination image if it's a
|
||||
// Docker archive. This way, we preseve the image name.
|
||||
if destRef.Transport().Name() == dockerArchiveTransport.Transport.Name() {
|
||||
|
|
|
|||
|
|
@ -20,6 +20,9 @@ import (
|
|||
|
||||
// RuntimeOptions allow for creating a customized Runtime.
|
||||
type RuntimeOptions struct {
|
||||
// The base system context of the runtime which will be used throughout
|
||||
// the entire lifespan of the Runtime. Certain options in some
|
||||
// functions may override specific fields.
|
||||
SystemContext *types.SystemContext
|
||||
}
|
||||
|
||||
|
|
@ -41,6 +44,8 @@ func setRegistriesConfPath(systemContext *types.SystemContext) {
|
|||
// Runtime is responsible for image management and storing them in a containers
|
||||
// storage.
|
||||
type Runtime struct {
|
||||
// Use to send events out to users.
|
||||
eventChannel chan *Event
|
||||
// Underlying storage store.
|
||||
store storage.Store
|
||||
// Global system context. No pointer to simplify copying and modifying
|
||||
|
|
@ -55,6 +60,18 @@ func (r *Runtime) systemContextCopy() *types.SystemContext {
|
|||
return &sys
|
||||
}
|
||||
|
||||
// EventChannel creates a buffered channel for events that the Runtime will use
|
||||
// to write events to. Callers are expected to read from the channel in a
|
||||
// timely manner.
|
||||
// Can be called once for a given Runtime.
|
||||
func (r *Runtime) EventChannel() chan *Event {
|
||||
if r.eventChannel != nil {
|
||||
return r.eventChannel
|
||||
}
|
||||
r.eventChannel = make(chan *Event, 100)
|
||||
return r.eventChannel
|
||||
}
|
||||
|
||||
// RuntimeFromStore returns a Runtime for the specified store.
|
||||
func RuntimeFromStore(store storage.Store, options *RuntimeOptions) (*Runtime, error) {
|
||||
if options == nil {
|
||||
|
|
@ -99,6 +116,7 @@ func RuntimeFromStoreOptions(runtimeOptions *RuntimeOptions, storeOptions *stora
|
|||
// is considered to be an error condition.
|
||||
func (r *Runtime) Shutdown(force bool) error {
|
||||
_, err := r.store.Shutdown(force)
|
||||
close(r.eventChannel)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package libimage
|
|||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
dirTransport "github.com/containers/image/v5/directory"
|
||||
dockerArchiveTransport "github.com/containers/image/v5/docker/archive"
|
||||
|
|
@ -74,6 +75,10 @@ func (r *Runtime) saveSingleImage(ctx context.Context, name, format, path string
|
|||
return err
|
||||
}
|
||||
|
||||
if r.eventChannel != nil {
|
||||
r.writeEvent(&Event{ID: image.ID(), Name: path, Time: time.Now(), Type: EventTypeImageSave})
|
||||
}
|
||||
|
||||
// Unless the image was referenced by ID, use the resolved name as a
|
||||
// tag.
|
||||
var tag string
|
||||
|
|
@ -160,6 +165,9 @@ func (r *Runtime) saveDockerArchive(ctx context.Context, names []string, path st
|
|||
}
|
||||
}
|
||||
localImages[image.ID()] = local
|
||||
if r.eventChannel != nil {
|
||||
r.writeEvent(&Event{ID: image.ID(), Name: path, Time: time.Now(), Type: EventTypeImageSave})
|
||||
}
|
||||
}
|
||||
|
||||
writer, err := dockerArchiveTransport.NewWriter(r.systemContextCopy(), path)
|
||||
|
|
|
|||
Loading…
Reference in New Issue