mirror of https://github.com/containerd/nri.git
stub: fork/bump stub to v1beta1 revision.
Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
This commit is contained in:
parent
38c344862a
commit
bbe965498c
|
|
@ -0,0 +1,965 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package stub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
stdnet "net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
api "github.com/containerd/nri/pkg/api/v1beta1"
|
||||
nrilog "github.com/containerd/nri/pkg/log"
|
||||
"github.com/containerd/nri/pkg/net"
|
||||
"github.com/containerd/nri/pkg/net/multiplex"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
// Plugin can implement a number of interfaces related to Pod and Container
|
||||
// lifecycle events. No any single such interface is mandatory, therefore the
|
||||
// Plugin interface itself is empty. Plugins are required to implement at
|
||||
// least one of these interfaces and this is verified during stub creation.
|
||||
// Trying to create a stub for a plugin violating this requirement will fail
|
||||
// with and error.
|
||||
type Plugin interface{}
|
||||
|
||||
// ConfigureInterface handles Configure API request.
|
||||
type ConfigureInterface interface {
|
||||
// Configure the plugin with the given NRI-supplied configuration.
|
||||
// If a non-zero EventMask is returned, the plugin will be subscribed
|
||||
// to the corresponding.
|
||||
Configure(ctx context.Context, config, runtime, version string) (api.EventMask, error)
|
||||
}
|
||||
|
||||
// SynchronizeInterface handles Synchronize API requests.
|
||||
type SynchronizeInterface interface {
|
||||
// Synchronize the state of the plugin with the runtime.
|
||||
// The plugin can request updates to containers in response.
|
||||
Synchronize(context.Context, []*api.PodSandbox, []*api.Container) ([]*api.ContainerUpdate, error)
|
||||
}
|
||||
|
||||
// ShutdownInterface handles a Shutdown API request.
|
||||
type ShutdownInterface interface {
|
||||
// Shutdown notifies the plugin about the runtime shutting down.
|
||||
Shutdown(context.Context)
|
||||
}
|
||||
|
||||
// RunPodInterface handles RunPodSandbox API events.
|
||||
type RunPodInterface interface {
|
||||
// RunPodSandbox relays a RunPodSandbox event to the plugin.
|
||||
RunPodSandbox(context.Context, *api.PodSandbox) error
|
||||
}
|
||||
|
||||
// UpdatePodInterface handles UpdatePodSandbox API requests.
|
||||
type UpdatePodInterface interface {
|
||||
// UpdatePodSandbox relays an UpdatePodSandbox request to the plugin.
|
||||
UpdatePodSandbox(context.Context, *api.PodSandbox, *api.LinuxResources, *api.LinuxResources) error
|
||||
}
|
||||
|
||||
// StopPodInterface handles StopPodSandbox API events.
|
||||
type StopPodInterface interface {
|
||||
// StopPodSandbox relays a StopPodSandbox event to the plugin.
|
||||
StopPodSandbox(context.Context, *api.PodSandbox) error
|
||||
}
|
||||
|
||||
// RemovePodInterface handles RemovePodSandbox API events.
|
||||
type RemovePodInterface interface {
|
||||
// RemovePodSandbox relays a RemovePodSandbox event to the plugin.
|
||||
RemovePodSandbox(context.Context, *api.PodSandbox) error
|
||||
}
|
||||
|
||||
// PostUpdatePodInterface handles PostUpdatePodSandbox API events.
|
||||
type PostUpdatePodInterface interface {
|
||||
// PostUpdatePodSandbox relays a PostUpdatePodSandbox event to the plugin.
|
||||
PostUpdatePodSandbox(context.Context, *api.PodSandbox) error
|
||||
}
|
||||
|
||||
// CreateContainerInterface handles CreateContainer API requests.
|
||||
type CreateContainerInterface interface {
|
||||
// CreateContainer relays a CreateContainer request to the plugin.
|
||||
// The plugin can request adjustments to the container being created
|
||||
// and updates to other unstopped containers in response.
|
||||
CreateContainer(context.Context, *api.PodSandbox, *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error)
|
||||
}
|
||||
|
||||
// StartContainerInterface handles StartContainer API requests.
|
||||
type StartContainerInterface interface {
|
||||
// StartContainer relays a StartContainer event to the plugin.
|
||||
StartContainer(context.Context, *api.PodSandbox, *api.Container) error
|
||||
}
|
||||
|
||||
// UpdateContainerInterface handles UpdateContainer API requests.
|
||||
type UpdateContainerInterface interface {
|
||||
// UpdateContainer relays an UpdateContainer request to the plugin.
|
||||
// The plugin can request updates both to the container being updated
|
||||
// (which then supersedes the original update) and to other unstopped
|
||||
// containers in response.
|
||||
UpdateContainer(context.Context, *api.PodSandbox, *api.Container, *api.LinuxResources) ([]*api.ContainerUpdate, error)
|
||||
}
|
||||
|
||||
// StopContainerInterface handles StopContainer API requests.
|
||||
type StopContainerInterface interface {
|
||||
// StopContainer relays a StopContainer request to the plugin.
|
||||
// The plugin can request updates to unstopped containers in response.
|
||||
StopContainer(context.Context, *api.PodSandbox, *api.Container) ([]*api.ContainerUpdate, error)
|
||||
}
|
||||
|
||||
// RemoveContainerInterface handles RemoveContainer API events.
|
||||
type RemoveContainerInterface interface {
|
||||
// RemoveContainer relays a RemoveContainer event to the plugin.
|
||||
RemoveContainer(context.Context, *api.PodSandbox, *api.Container) error
|
||||
}
|
||||
|
||||
// PostCreateContainerInterface handles PostCreateContainer API events.
|
||||
type PostCreateContainerInterface interface {
|
||||
// PostCreateContainer relays a PostCreateContainer event to the plugin.
|
||||
PostCreateContainer(context.Context, *api.PodSandbox, *api.Container) error
|
||||
}
|
||||
|
||||
// PostStartContainerInterface handles PostStartContainer API events.
|
||||
type PostStartContainerInterface interface {
|
||||
// PostStartContainer relays a PostStartContainer event to the plugin.
|
||||
PostStartContainer(context.Context, *api.PodSandbox, *api.Container) error
|
||||
}
|
||||
|
||||
// PostUpdateContainerInterface handles PostUpdateContainer API events.
|
||||
type PostUpdateContainerInterface interface {
|
||||
// PostUpdateContainer relays a PostUpdateContainer event to the plugin.
|
||||
PostUpdateContainer(context.Context, *api.PodSandbox, *api.Container) error
|
||||
}
|
||||
|
||||
// ValidateContainerAdjustmentInterface handles container adjustment validation.
|
||||
type ValidateContainerAdjustmentInterface interface {
|
||||
// ValidateContainerAdjustment validates the container adjustment.
|
||||
ValidateContainerAdjustment(context.Context, *api.ValidateContainerAdjustmentRequest) error
|
||||
}
|
||||
|
||||
// Stub is the interface the stub provides for the plugin implementation.
|
||||
type Stub interface {
|
||||
// Run starts the plugin then waits for the plugin service to exit, either due to a
|
||||
// critical error or an explicit call to Stop(). Once Run() returns, the plugin can be
|
||||
// restarted by calling Run() or Start() again.
|
||||
Run(context.Context) error
|
||||
// Start the plugin.
|
||||
Start(context.Context) error
|
||||
// Stop the plugin.
|
||||
Stop()
|
||||
// Wait for the plugin to stop.
|
||||
Wait()
|
||||
|
||||
// UpdateContainer requests unsolicited updates to containers.
|
||||
UpdateContainers([]*api.ContainerUpdate) ([]*api.ContainerUpdate, error)
|
||||
|
||||
// RegistrationTimeout returns the registration timeout for the stub.
|
||||
// This is the default timeout if the plugin has not been started or
|
||||
// the timeout received in the Configure request otherwise.
|
||||
RegistrationTimeout() time.Duration
|
||||
|
||||
// RequestTimeout returns the request timeout for the stub.
|
||||
// This is the default timeout if the plugin has not been started or
|
||||
// the timeout received in the Configure request otherwise.
|
||||
RequestTimeout() time.Duration
|
||||
}
|
||||
|
||||
const (
|
||||
// DefaultRegistrationTimeout is the default plugin registration timeout.
|
||||
DefaultRegistrationTimeout = api.DefaultPluginRegistrationTimeout
|
||||
// DefaultRequestTimeout is the default plugin request processing timeout.
|
||||
DefaultRequestTimeout = api.DefaultPluginRequestTimeout
|
||||
)
|
||||
|
||||
var (
|
||||
// Logger for messages generated internally by the stub itself.
|
||||
log = nrilog.Get()
|
||||
|
||||
// Used instead of a nil Context in logging.
|
||||
noCtx = context.TODO()
|
||||
|
||||
// ErrNoService indicates that the stub has no runtime service/connection,
|
||||
// for instance by UpdateContainers on a stub which has not been started.
|
||||
ErrNoService = errors.New("stub: no service/connection")
|
||||
)
|
||||
|
||||
// EventMask holds a mask of events for plugin subscription.
|
||||
type EventMask = api.EventMask
|
||||
|
||||
// Option to apply to a plugin during its creation.
|
||||
type Option func(*stub) error
|
||||
|
||||
// WithOnClose sets a notification function to call if the ttRPC connection goes down.
|
||||
func WithOnClose(onClose func()) Option {
|
||||
return func(s *stub) error {
|
||||
s.onClose = onClose
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPluginName sets the name to use in plugin registration.
|
||||
func WithPluginName(name string) Option {
|
||||
return func(s *stub) error {
|
||||
if s.name != "" {
|
||||
return fmt.Errorf("plugin name already set (%q)", s.name)
|
||||
}
|
||||
s.name = name
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPluginIdx sets the index to use in plugin registration.
|
||||
func WithPluginIdx(idx string) Option {
|
||||
return func(s *stub) error {
|
||||
if s.idx != "" {
|
||||
return fmt.Errorf("plugin ID already set (%q)", s.idx)
|
||||
}
|
||||
s.idx = idx
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithSocketPath sets the NRI socket path to connect to.
|
||||
func WithSocketPath(path string) Option {
|
||||
return func(s *stub) error {
|
||||
s.socketPath = path
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithConnection sets an existing NRI connection to use.
|
||||
func WithConnection(conn stdnet.Conn) Option {
|
||||
return func(s *stub) error {
|
||||
s.conn = conn
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithDialer sets the dialer to use.
|
||||
func WithDialer(d func(string) (stdnet.Conn, error)) Option {
|
||||
return func(s *stub) error {
|
||||
s.dialer = d
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithTTRPCOptions sets extra client and server options to use for ttrpc .
|
||||
func WithTTRPCOptions(clientOpts []ttrpc.ClientOpts, serverOpts []ttrpc.ServerOpt) Option {
|
||||
return func(s *stub) error {
|
||||
s.clientOpts = append(s.clientOpts, clientOpts...)
|
||||
s.serverOpts = append(s.serverOpts, serverOpts...)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// stub implements Stub.
|
||||
type stub struct {
|
||||
sync.Mutex
|
||||
plugin interface{}
|
||||
handlers handlers
|
||||
events api.EventMask
|
||||
name string
|
||||
idx string
|
||||
socketPath string
|
||||
dialer func(string) (stdnet.Conn, error)
|
||||
conn stdnet.Conn
|
||||
onClose func()
|
||||
serverOpts []ttrpc.ServerOpt
|
||||
clientOpts []ttrpc.ClientOpts
|
||||
rpcm multiplex.Mux
|
||||
rpcl stdnet.Listener
|
||||
rpcs *ttrpc.Server
|
||||
rpcc *ttrpc.Client
|
||||
runtime api.RuntimeService
|
||||
started bool
|
||||
doneC chan struct{}
|
||||
srvErrC chan error
|
||||
cfgErrC chan error
|
||||
syncReq *api.SynchronizeRequest
|
||||
|
||||
registrationTimeout time.Duration
|
||||
requestTimeout time.Duration
|
||||
}
|
||||
|
||||
// Handlers for NRI plugin event and request.
|
||||
type handlers struct {
|
||||
Configure func(context.Context, string, string, string) (api.EventMask, error)
|
||||
Synchronize func(context.Context, []*api.PodSandbox, []*api.Container) ([]*api.ContainerUpdate, error)
|
||||
Shutdown func(context.Context)
|
||||
RunPodSandbox func(context.Context, *api.PodSandbox) error
|
||||
UpdatePodSandbox func(context.Context, *api.PodSandbox, *api.LinuxResources, *api.LinuxResources) error
|
||||
PostUpdatePodSandbox func(context.Context, *api.PodSandbox) error
|
||||
StopPodSandbox func(context.Context, *api.PodSandbox) error
|
||||
RemovePodSandbox func(context.Context, *api.PodSandbox) error
|
||||
CreateContainer func(context.Context, *api.PodSandbox, *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error)
|
||||
PostCreateContainer func(context.Context, *api.PodSandbox, *api.Container) error
|
||||
StartContainer func(context.Context, *api.PodSandbox, *api.Container) error
|
||||
PostStartContainer func(context.Context, *api.PodSandbox, *api.Container) error
|
||||
UpdateContainer func(context.Context, *api.PodSandbox, *api.Container, *api.LinuxResources) ([]*api.ContainerUpdate, error)
|
||||
PostUpdateContainer func(context.Context, *api.PodSandbox, *api.Container) error
|
||||
StopContainer func(context.Context, *api.PodSandbox, *api.Container) ([]*api.ContainerUpdate, error)
|
||||
RemoveContainer func(context.Context, *api.PodSandbox, *api.Container) error
|
||||
ValidateContainerAdjustment func(context.Context, *api.ValidateContainerAdjustmentRequest) error
|
||||
}
|
||||
|
||||
// New creates a stub with the given plugin and options.
|
||||
func New(p interface{}, opts ...Option) (Stub, error) {
|
||||
stub := &stub{
|
||||
plugin: p,
|
||||
name: os.Getenv(api.PluginNameEnvVar),
|
||||
idx: os.Getenv(api.PluginIdxEnvVar),
|
||||
socketPath: api.DefaultSocketPath,
|
||||
dialer: func(p string) (stdnet.Conn, error) { return stdnet.Dial("unix", p) },
|
||||
|
||||
registrationTimeout: DefaultRegistrationTimeout,
|
||||
requestTimeout: DefaultRequestTimeout,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
if err := o(stub); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := stub.setupHandlers(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := stub.ensureIdentity(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Infof(noCtx, "Created plugin %s (%s, handles %s)", stub.Name(),
|
||||
filepath.Base(os.Args[0]), stub.events.PrettyString())
|
||||
|
||||
return stub, nil
|
||||
}
|
||||
|
||||
// Start event processing, register to NRI and wait for getting configured.
|
||||
func (stub *stub) Start(ctx context.Context) (retErr error) {
|
||||
stub.Lock()
|
||||
defer stub.Unlock()
|
||||
|
||||
if stub.isStarted() {
|
||||
return fmt.Errorf("stub already started")
|
||||
}
|
||||
stub.doneC = make(chan struct{})
|
||||
|
||||
err := stub.connect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rpcm := multiplex.Multiplex(stub.conn)
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
rpcm.Close()
|
||||
stub.rpcm = nil
|
||||
}
|
||||
}()
|
||||
|
||||
rpcl, err := rpcm.Listen(multiplex.PluginServiceConn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
rpcl.Close()
|
||||
stub.rpcl = nil
|
||||
}
|
||||
}()
|
||||
|
||||
rpcs, err := ttrpc.NewServer(stub.serverOpts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create ttrpc server: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
rpcs.Close()
|
||||
stub.rpcs = nil
|
||||
}
|
||||
}()
|
||||
|
||||
api.RegisterPluginService(rpcs, stub)
|
||||
|
||||
conn, err := rpcm.Open(multiplex.RuntimeServiceConn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to multiplex ttrpc client connection: %w", err)
|
||||
}
|
||||
|
||||
clientOpts := []ttrpc.ClientOpts{
|
||||
ttrpc.WithOnClose(func() {
|
||||
stub.connClosed()
|
||||
}),
|
||||
}
|
||||
rpcc := ttrpc.NewClient(conn, append(clientOpts, stub.clientOpts...)...)
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
rpcc.Close()
|
||||
stub.rpcc = nil
|
||||
}
|
||||
}()
|
||||
|
||||
stub.srvErrC = make(chan error, 1)
|
||||
stub.cfgErrC = make(chan error, 1)
|
||||
|
||||
go func(l stdnet.Listener, doneC chan struct{}, srvErrC chan error) {
|
||||
srvErrC <- rpcs.Serve(ctx, l)
|
||||
close(doneC)
|
||||
}(rpcl, stub.doneC, stub.srvErrC)
|
||||
|
||||
stub.rpcm = rpcm
|
||||
stub.rpcl = rpcl
|
||||
stub.rpcs = rpcs
|
||||
stub.rpcc = rpcc
|
||||
|
||||
stub.runtime = api.NewRuntimeClient(rpcc)
|
||||
|
||||
if err = stub.register(ctx); err != nil {
|
||||
stub.close()
|
||||
return err
|
||||
}
|
||||
|
||||
if err = <-stub.cfgErrC; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof(ctx, "Started plugin %s...", stub.Name())
|
||||
|
||||
stub.started = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop the plugin.
|
||||
func (stub *stub) Stop() {
|
||||
log.Infof(noCtx, "Stopping plugin %s...", stub.Name())
|
||||
|
||||
stub.Lock()
|
||||
defer stub.Unlock()
|
||||
stub.close()
|
||||
}
|
||||
|
||||
// IsStarted returns true if the plugin has been started either by Start() or by Run().
|
||||
func (stub *stub) IsStarted() bool {
|
||||
stub.Lock()
|
||||
defer stub.Unlock()
|
||||
return stub.isStarted()
|
||||
}
|
||||
|
||||
func (stub *stub) isStarted() bool {
|
||||
return stub.started
|
||||
}
|
||||
|
||||
// reset stub to the status that can initiate a new
|
||||
// NRI connection, the caller must hold lock.
|
||||
func (stub *stub) close() {
|
||||
if !stub.isStarted() {
|
||||
return
|
||||
}
|
||||
|
||||
if stub.rpcl != nil {
|
||||
stub.rpcl.Close()
|
||||
}
|
||||
if stub.rpcs != nil {
|
||||
stub.rpcs.Close()
|
||||
}
|
||||
if stub.rpcc != nil {
|
||||
stub.rpcc.Close()
|
||||
}
|
||||
if stub.rpcm != nil {
|
||||
stub.rpcm.Close()
|
||||
}
|
||||
if stub.srvErrC != nil {
|
||||
<-stub.doneC
|
||||
}
|
||||
|
||||
stub.started = false
|
||||
stub.conn = nil
|
||||
stub.syncReq = nil
|
||||
}
|
||||
|
||||
// Run the plugin. Start event processing then wait for an error or getting stopped.
|
||||
func (stub *stub) Run(ctx context.Context) error {
|
||||
var err error
|
||||
|
||||
if err = stub.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = <-stub.srvErrC
|
||||
if err == ttrpc.ErrServerClosed {
|
||||
log.Infof(noCtx, "ttrpc server closed %s : %v", stub.Name(), err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait for the plugin to stop, should be called after Start() or Run().
|
||||
func (stub *stub) Wait() {
|
||||
if stub.IsStarted() {
|
||||
<-stub.doneC
|
||||
}
|
||||
}
|
||||
|
||||
// Name returns the full indexed name of the plugin.
|
||||
func (stub *stub) Name() string {
|
||||
return stub.idx + "-" + stub.name
|
||||
}
|
||||
|
||||
func (stub *stub) RegistrationTimeout() time.Duration {
|
||||
return stub.registrationTimeout
|
||||
}
|
||||
|
||||
func (stub *stub) RequestTimeout() time.Duration {
|
||||
return stub.requestTimeout
|
||||
}
|
||||
|
||||
// Connect the plugin to NRI.
|
||||
func (stub *stub) connect() error {
|
||||
if stub.conn != nil {
|
||||
log.Infof(noCtx, "Using given plugin connection...")
|
||||
return nil
|
||||
}
|
||||
|
||||
if env := os.Getenv(api.PluginSocketEnvVar); env != "" {
|
||||
log.Infof(noCtx, "Using connection %q from environment...", env)
|
||||
|
||||
fd, err := strconv.Atoi(env)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid socket in environment (%s=%q): %w",
|
||||
api.PluginSocketEnvVar, env, err)
|
||||
}
|
||||
|
||||
stub.conn, err = net.NewFdConn(fd)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid socket (%d) in environment: %w", fd, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
conn, err := stub.dialer(stub.socketPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to NRI service: %w", err)
|
||||
}
|
||||
|
||||
stub.conn = conn
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Register the plugin with NRI.
|
||||
func (stub *stub) register(ctx context.Context) error {
|
||||
log.Infof(ctx, "Registering plugin %s...", stub.Name())
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, stub.registrationTimeout)
|
||||
defer cancel()
|
||||
|
||||
req := &api.RegisterPluginRequest{
|
||||
PluginName: stub.name,
|
||||
PluginIdx: stub.idx,
|
||||
}
|
||||
if _, err := stub.runtime.RegisterPlugin(ctx, req); err != nil {
|
||||
return fmt.Errorf("failed to register with NRI/Runtime: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Handle a lost connection.
|
||||
func (stub *stub) connClosed() {
|
||||
stub.Lock()
|
||||
stub.close()
|
||||
stub.Unlock()
|
||||
if stub.onClose != nil {
|
||||
stub.onClose()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// plugin event and request handlers
|
||||
//
|
||||
|
||||
// UpdateContainers requests unsolicited updates to containers.
|
||||
func (stub *stub) UpdateContainers(update []*api.ContainerUpdate) ([]*api.ContainerUpdate, error) {
|
||||
if stub.runtime == nil {
|
||||
return nil, ErrNoService
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
req := &api.UpdateContainersRequest{
|
||||
Update: update,
|
||||
}
|
||||
rpl, err := stub.runtime.UpdateContainers(ctx, req)
|
||||
if rpl != nil {
|
||||
return rpl.Failed, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Configure the plugin.
|
||||
func (stub *stub) Configure(ctx context.Context, req *api.ConfigureRequest) (rpl *api.ConfigureResponse, retErr error) {
|
||||
var (
|
||||
events api.EventMask
|
||||
err error
|
||||
)
|
||||
|
||||
log.Infof(ctx, "Configuring plugin %s for runtime %s/%s...", stub.Name(),
|
||||
req.RuntimeName, req.RuntimeVersion)
|
||||
|
||||
stub.registrationTimeout = time.Duration(req.RegistrationTimeout * int64(time.Millisecond))
|
||||
stub.requestTimeout = time.Duration(req.RequestTimeout * int64(time.Millisecond))
|
||||
|
||||
defer func() {
|
||||
stub.cfgErrC <- retErr
|
||||
}()
|
||||
|
||||
if handler := stub.handlers.Configure; handler == nil {
|
||||
events = stub.events
|
||||
} else {
|
||||
events, err = handler(ctx, req.Config, req.RuntimeName, req.RuntimeVersion)
|
||||
if err != nil {
|
||||
log.Errorf(ctx, "Plugin configuration failed: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if events == 0 {
|
||||
events = stub.events
|
||||
}
|
||||
|
||||
// Only allow plugins to subscribe to events they can handle.
|
||||
if extra := events & ^stub.events; extra != 0 {
|
||||
log.Errorf(ctx, "Plugin subscribed for unhandled events %s (0x%x)",
|
||||
extra.PrettyString(), extra)
|
||||
return nil, fmt.Errorf("internal error: unhandled events %s (0x%x)",
|
||||
extra.PrettyString(), extra)
|
||||
}
|
||||
|
||||
log.Infof(ctx, "Subscribing plugin %s (%s) for events %s", stub.Name(),
|
||||
filepath.Base(os.Args[0]), events.PrettyString())
|
||||
}
|
||||
|
||||
return &api.ConfigureResponse{
|
||||
Events: int32(events),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Synchronize the state of the plugin with the runtime.
|
||||
func (stub *stub) Synchronize(ctx context.Context, req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) {
|
||||
handler := stub.handlers.Synchronize
|
||||
if handler == nil {
|
||||
return &api.SynchronizeResponse{More: req.More}, nil
|
||||
}
|
||||
|
||||
if req.More {
|
||||
return stub.collectSync(req)
|
||||
}
|
||||
|
||||
return stub.deliverSync(ctx, req)
|
||||
}
|
||||
|
||||
func (stub *stub) collectSync(req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) {
|
||||
stub.Lock()
|
||||
defer stub.Unlock()
|
||||
|
||||
log.Debugf(noCtx, "collecting sync req with %d pods, %d containers...",
|
||||
len(req.Pods), len(req.Containers))
|
||||
|
||||
if stub.syncReq == nil {
|
||||
stub.syncReq = req
|
||||
} else {
|
||||
stub.syncReq.Pods = append(stub.syncReq.Pods, req.Pods...)
|
||||
stub.syncReq.Containers = append(stub.syncReq.Containers, req.Containers...)
|
||||
}
|
||||
|
||||
return &api.SynchronizeResponse{More: req.More}, nil
|
||||
}
|
||||
|
||||
func (stub *stub) deliverSync(ctx context.Context, req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) {
|
||||
stub.Lock()
|
||||
syncReq := stub.syncReq
|
||||
stub.syncReq = nil
|
||||
stub.Unlock()
|
||||
|
||||
if syncReq == nil {
|
||||
syncReq = req
|
||||
} else {
|
||||
syncReq.Pods = append(syncReq.Pods, req.Pods...)
|
||||
syncReq.Containers = append(syncReq.Containers, req.Containers...)
|
||||
}
|
||||
|
||||
update, err := stub.handlers.Synchronize(ctx, syncReq.Pods, syncReq.Containers)
|
||||
return &api.SynchronizeResponse{
|
||||
Update: update,
|
||||
More: false,
|
||||
}, err
|
||||
}
|
||||
|
||||
// Shutdown the plugin.
|
||||
func (stub *stub) Shutdown(ctx context.Context, _ *api.ShutdownRequest) (*api.ShutdownResponse, error) {
|
||||
handler := stub.handlers.Shutdown
|
||||
if handler != nil {
|
||||
handler(ctx)
|
||||
}
|
||||
return &api.ShutdownResponse{}, nil
|
||||
}
|
||||
|
||||
// RunPodSandbox request handler.
|
||||
func (stub *stub) RunPodSandbox(ctx context.Context, req *api.RunPodSandboxRequest) (*api.RunPodSandboxResponse, error) {
|
||||
if handler := stub.handlers.RunPodSandbox; handler != nil {
|
||||
return &api.RunPodSandboxResponse{}, handler(ctx, req.Pod)
|
||||
}
|
||||
|
||||
return &api.RunPodSandboxResponse{}, nil
|
||||
}
|
||||
|
||||
// UpdatePodSandbox request handler.
|
||||
func (stub *stub) UpdatePodSandbox(ctx context.Context, req *api.UpdatePodSandboxRequest) (*api.UpdatePodSandboxResponse, error) {
|
||||
handler := stub.handlers.UpdatePodSandbox
|
||||
if handler == nil {
|
||||
return &api.UpdatePodSandboxResponse{}, nil
|
||||
}
|
||||
err := handler(ctx, req.Pod, req.OverheadLinuxResources, req.LinuxResources)
|
||||
return &api.UpdatePodSandboxResponse{}, err
|
||||
}
|
||||
|
||||
// PostUpdatePodSandbox request handler.
|
||||
func (stub *stub) PostUpdatePodSandbox(ctx context.Context, req *api.PostUpdatePodSandboxRequest) (*api.PostUpdatePodSandboxResponse, error) {
|
||||
if handler := stub.handlers.PostUpdatePodSandbox; handler != nil {
|
||||
return &api.PostUpdatePodSandboxResponse{}, handler(ctx, req.Pod)
|
||||
}
|
||||
|
||||
return &api.PostUpdatePodSandboxResponse{}, nil
|
||||
}
|
||||
|
||||
// StopPodSandbox request handler.
|
||||
func (stub *stub) StopPodSandbox(ctx context.Context, req *api.StopPodSandboxRequest) (*api.StopPodSandboxResponse, error) {
|
||||
if handler := stub.handlers.StopPodSandbox; handler != nil {
|
||||
return &api.StopPodSandboxResponse{}, handler(ctx, req.Pod)
|
||||
}
|
||||
|
||||
return &api.StopPodSandboxResponse{}, nil
|
||||
}
|
||||
|
||||
// RemovePodSandbox request handler.
|
||||
func (stub *stub) RemovePodSandbox(ctx context.Context, req *api.RemovePodSandboxRequest) (*api.RemovePodSandboxResponse, error) {
|
||||
if handler := stub.handlers.RemovePodSandbox; handler != nil {
|
||||
return &api.RemovePodSandboxResponse{}, handler(ctx, req.Pod)
|
||||
}
|
||||
|
||||
return &api.RemovePodSandboxResponse{}, nil
|
||||
}
|
||||
|
||||
// CreateContainer request handler.
|
||||
func (stub *stub) CreateContainer(ctx context.Context, req *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
|
||||
handler := stub.handlers.CreateContainer
|
||||
if handler == nil {
|
||||
return &api.CreateContainerResponse{}, nil
|
||||
}
|
||||
adjust, update, err := handler(ctx, req.Pod, req.Container)
|
||||
return &api.CreateContainerResponse{
|
||||
Adjust: adjust,
|
||||
Update: update,
|
||||
}, err
|
||||
}
|
||||
|
||||
// PostCreateContainer request handler.
|
||||
func (stub *stub) PostCreateContainer(ctx context.Context, req *api.PostCreateContainerRequest) (*api.PostCreateContainerResponse, error) {
|
||||
if handler := stub.handlers.PostCreateContainer; handler != nil {
|
||||
return &api.PostCreateContainerResponse{}, handler(ctx, req.Pod, req.Container)
|
||||
}
|
||||
|
||||
return &api.PostCreateContainerResponse{}, nil
|
||||
}
|
||||
|
||||
// StartContainer request handler.
|
||||
func (stub *stub) StartContainer(ctx context.Context, req *api.StartContainerRequest) (*api.StartContainerResponse, error) {
|
||||
if handler := stub.handlers.StartContainer; handler != nil {
|
||||
return &api.StartContainerResponse{}, handler(ctx, req.Pod, req.Container)
|
||||
}
|
||||
|
||||
return &api.StartContainerResponse{}, nil
|
||||
}
|
||||
|
||||
// PostStartContainer request handler.
|
||||
func (stub *stub) PostStartContainer(ctx context.Context, req *api.PostStartContainerRequest) (*api.PostStartContainerResponse, error) {
|
||||
if handler := stub.handlers.PostStartContainer; handler != nil {
|
||||
return &api.PostStartContainerResponse{}, handler(ctx, req.Pod, req.Container)
|
||||
}
|
||||
|
||||
return &api.PostStartContainerResponse{}, nil
|
||||
}
|
||||
|
||||
// UpdateContainer request handler.
|
||||
func (stub *stub) UpdateContainer(ctx context.Context, req *api.UpdateContainerRequest) (*api.UpdateContainerResponse, error) {
|
||||
handler := stub.handlers.UpdateContainer
|
||||
if handler == nil {
|
||||
return &api.UpdateContainerResponse{}, nil
|
||||
}
|
||||
update, err := handler(ctx, req.Pod, req.Container, req.LinuxResources)
|
||||
return &api.UpdateContainerResponse{
|
||||
Update: update,
|
||||
}, err
|
||||
}
|
||||
|
||||
// PostUpdateContainer request handler.
|
||||
func (stub *stub) PostUpdateContainer(ctx context.Context, req *api.PostUpdateContainerRequest) (*api.PostUpdateContainerResponse, error) {
|
||||
if handler := stub.handlers.PostUpdateContainer; handler != nil {
|
||||
return &api.PostUpdateContainerResponse{}, handler(ctx, req.Pod, req.Container)
|
||||
}
|
||||
|
||||
return &api.PostUpdateContainerResponse{}, nil
|
||||
}
|
||||
|
||||
// StopContainer request handler.
|
||||
func (stub *stub) StopContainer(ctx context.Context, req *api.StopContainerRequest) (*api.StopContainerResponse, error) {
|
||||
handler := stub.handlers.StopContainer
|
||||
if handler == nil {
|
||||
return &api.StopContainerResponse{}, nil
|
||||
}
|
||||
update, err := handler(ctx, req.Pod, req.Container)
|
||||
return &api.StopContainerResponse{
|
||||
Update: update,
|
||||
}, err
|
||||
}
|
||||
|
||||
// RemoveContainer request handler.
|
||||
func (stub *stub) RemoveContainer(ctx context.Context, req *api.RemoveContainerRequest) (*api.RemoveContainerResponse, error) {
|
||||
if handler := stub.handlers.RemoveContainer; handler != nil {
|
||||
return &api.RemoveContainerResponse{}, handler(ctx, req.Pod, req.Container)
|
||||
}
|
||||
|
||||
return &api.RemoveContainerResponse{}, nil
|
||||
}
|
||||
|
||||
func (stub *stub) ValidateContainerAdjustment(ctx context.Context, req *api.ValidateContainerAdjustmentRequest) (*api.ValidateContainerAdjustmentResponse, error) {
|
||||
handler := stub.handlers.ValidateContainerAdjustment
|
||||
if handler == nil {
|
||||
return &api.ValidateContainerAdjustmentResponse{}, nil
|
||||
}
|
||||
|
||||
if err := handler(ctx, req); err != nil {
|
||||
return &api.ValidateContainerAdjustmentResponse{
|
||||
Reject: true,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &api.ValidateContainerAdjustmentResponse{}, nil
|
||||
}
|
||||
|
||||
// ensureIdentity sets plugin index and name from the binary if those are unset.
|
||||
func (stub *stub) ensureIdentity() error {
|
||||
if stub.idx != "" && stub.name != "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if stub.idx != "" {
|
||||
stub.name = filepath.Base(os.Args[0])
|
||||
return nil
|
||||
}
|
||||
|
||||
idx, name, err := api.ParsePluginName(filepath.Base(os.Args[0]))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stub.name = name
|
||||
stub.idx = idx
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set up event handlers and the subscription mask for the plugin.
|
||||
func (stub *stub) setupHandlers() error {
|
||||
if plugin, ok := stub.plugin.(ConfigureInterface); ok {
|
||||
stub.handlers.Configure = plugin.Configure
|
||||
}
|
||||
if plugin, ok := stub.plugin.(SynchronizeInterface); ok {
|
||||
stub.handlers.Synchronize = plugin.Synchronize
|
||||
}
|
||||
if plugin, ok := stub.plugin.(ShutdownInterface); ok {
|
||||
stub.handlers.Shutdown = plugin.Shutdown
|
||||
}
|
||||
|
||||
if plugin, ok := stub.plugin.(RunPodInterface); ok {
|
||||
stub.handlers.RunPodSandbox = plugin.RunPodSandbox
|
||||
stub.events.Set(api.Event_RUN_POD_SANDBOX)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(UpdatePodInterface); ok {
|
||||
stub.handlers.UpdatePodSandbox = plugin.UpdatePodSandbox
|
||||
stub.events.Set(api.Event_UPDATE_POD_SANDBOX)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(StopPodInterface); ok {
|
||||
stub.handlers.StopPodSandbox = plugin.StopPodSandbox
|
||||
stub.events.Set(api.Event_STOP_POD_SANDBOX)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(RemovePodInterface); ok {
|
||||
stub.handlers.RemovePodSandbox = plugin.RemovePodSandbox
|
||||
stub.events.Set(api.Event_REMOVE_POD_SANDBOX)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(PostUpdatePodInterface); ok {
|
||||
stub.handlers.PostUpdatePodSandbox = plugin.PostUpdatePodSandbox
|
||||
stub.events.Set(api.Event_POST_UPDATE_POD_SANDBOX)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(CreateContainerInterface); ok {
|
||||
stub.handlers.CreateContainer = plugin.CreateContainer
|
||||
stub.events.Set(api.Event_CREATE_CONTAINER)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(StartContainerInterface); ok {
|
||||
stub.handlers.StartContainer = plugin.StartContainer
|
||||
stub.events.Set(api.Event_START_CONTAINER)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(UpdateContainerInterface); ok {
|
||||
stub.handlers.UpdateContainer = plugin.UpdateContainer
|
||||
stub.events.Set(api.Event_UPDATE_CONTAINER)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(StopContainerInterface); ok {
|
||||
stub.handlers.StopContainer = plugin.StopContainer
|
||||
stub.events.Set(api.Event_STOP_CONTAINER)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(RemoveContainerInterface); ok {
|
||||
stub.handlers.RemoveContainer = plugin.RemoveContainer
|
||||
stub.events.Set(api.Event_REMOVE_CONTAINER)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(PostCreateContainerInterface); ok {
|
||||
stub.handlers.PostCreateContainer = plugin.PostCreateContainer
|
||||
stub.events.Set(api.Event_POST_CREATE_CONTAINER)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(PostStartContainerInterface); ok {
|
||||
stub.handlers.PostStartContainer = plugin.PostStartContainer
|
||||
stub.events.Set(api.Event_POST_START_CONTAINER)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(PostUpdateContainerInterface); ok {
|
||||
stub.handlers.PostUpdateContainer = plugin.PostUpdateContainer
|
||||
stub.events.Set(api.Event_POST_UPDATE_CONTAINER)
|
||||
}
|
||||
if plugin, ok := stub.plugin.(ValidateContainerAdjustmentInterface); ok {
|
||||
stub.handlers.ValidateContainerAdjustment = plugin.ValidateContainerAdjustment
|
||||
stub.events.Set(api.Event_VALIDATE_CONTAINER_ADJUSTMENT)
|
||||
}
|
||||
|
||||
if stub.events == 0 {
|
||||
return fmt.Errorf("internal error: plugin %T does not implement any NRI request handlers",
|
||||
stub.plugin)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Loading…
Reference in New Issue