Update DB to hold CNI network information

Replace our old IP and Subnet fields in state with CNI types that
contain a lot more information. Retrieve these structs from the
CNI plugins themselves.

Signed-off-by: Matthew Heon <matthew.heon@gmail.com>

Closes: #440
Approved by: baude
This commit is contained in:
Matthew Heon 2018-03-02 11:10:37 -05:00 committed by Atomic Bot
parent 29d650a379
commit edb1609c61
7 changed files with 402 additions and 136 deletions

View File

@ -140,9 +140,9 @@ func (s *BoltState) Refresh() error {
state.Mountpoint = ""
state.Mounted = false
state.State = ContainerStateConfigured
state.IPAddress = ""
state.SubnetMask = ""
state.ExecSessions = make(map[string]*ExecSession)
state.IPs = nil
state.Routes = nil
newStateBytes, err := json.Marshal(state)
if err != nil {

View File

@ -7,6 +7,8 @@ import (
"time"
"github.com/containerd/cgroups"
"github.com/containernetworking/cni/pkg/types"
cnitypes "github.com/containernetworking/cni/pkg/types/current"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/containers/storage"
"github.com/cri-o/ocicni/pkg/ocicni"
@ -137,13 +139,17 @@ type containerState struct {
// Will only be set if config.CreateNetNS is true, or the container was
// told to join another container's network namespace
NetNS ns.NetNS `json:"-"`
// IP address of container (if network namespace was created)
IPAddress string `json:"ipAddress"`
// Subnet mask of container (if network namespace was created)
SubnetMask string `json:"subnetMask"`
// ExecSessions contains active exec sessions for container
// Exec session ID is mapped to PID of exec process
ExecSessions map[string]*ExecSession `json:"execSessions,omitempty"`
// IPs contains IP addresses assigned to the container
// Only populated if we created a network namespace for the container,
// and the network namespace is currently active
IPs []*cnitypes.IPConfig `json:"ipAddresses,omitempty"`
// Routes contains network routes present in the container
// Only populated if we created a network namespace for the container,
// and the network namespace is currently active
Routes []*types.Route `json:"routes,omitempty"`
}
// ExecSession contains information on an active exec session
@ -643,6 +649,55 @@ func (c *Container) ExecSession(id string) (*ExecSession, error) {
return returnSession, nil
}
// IPs() retrieves a container's IP addresses
// This will only be populated if the container is configured to created a new
// network namespace, and that namespace is presently active
func (c *Container) IPs() ([]net.IPNet, error) {
if !c.locked {
c.lock.Lock()
defer c.lock.Unlock()
if err := c.syncContainer(); err != nil {
return nil, err
}
}
ips := make([]net.IPNet, 0, len(c.state.IPs))
for _, ip := range c.state.IPs {
ips = append(ips, ip.Address)
}
return ips, nil
}
// Routes retrieves a container's routes
// This will only be populated if the container is configured to created a new
// network namespace, and that namespace is presently active
func (c *Container) Routes() ([]types.Route, error) {
if !c.locked {
c.lock.Lock()
defer c.lock.Unlock()
if err := c.syncContainer(); err != nil {
return nil, err
}
}
routes := make([]types.Route, 0, len(c.state.Routes))
for _, route := range c.state.Routes {
newRoute := types.Route{
Dst: route.Dst,
GW: route.GW,
}
routes = append(routes, newRoute)
}
return routes, nil
}
// Misc Accessors
// Most will require locking

View File

@ -9,6 +9,8 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
"github.com/cri-o/ocicni/pkg/ocicni"
fflib "github.com/pquerna/ffjson/fflib/v1"
"net"
@ -2866,11 +2868,6 @@ func (j *containerState) MarshalJSONBuf(buf fflib.EncodingBuffer) error {
fflib.FormatBits2(buf, uint64(j.PID), 10, j.PID < 0)
buf.WriteByte(',')
}
buf.WriteString(`"ipAddress":`)
fflib.WriteJsonString(buf, string(j.IPAddress))
buf.WriteString(`,"subnetMask":`)
fflib.WriteJsonString(buf, string(j.SubnetMask))
buf.WriteByte(',')
if len(j.ExecSessions) != 0 {
buf.WriteString(`"execSessions":`)
/* Falling back. type=map[string]*libpod.ExecSession kind=map */
@ -2880,6 +2877,68 @@ func (j *containerState) MarshalJSONBuf(buf fflib.EncodingBuffer) error {
}
buf.WriteByte(',')
}
if len(j.IPs) != 0 {
buf.WriteString(`"ipAddresses":`)
if j.IPs != nil {
buf.WriteString(`[`)
for i, v := range j.IPs {
if i != 0 {
buf.WriteString(`,`)
}
{
if v == nil {
buf.WriteString("null")
} else {
obj, err = v.MarshalJSON()
if err != nil {
return err
}
buf.Write(obj)
}
}
}
buf.WriteString(`]`)
} else {
buf.WriteString(`null`)
}
buf.WriteByte(',')
}
if len(j.Routes) != 0 {
buf.WriteString(`"routes":`)
if j.Routes != nil {
buf.WriteString(`[`)
for i, v := range j.Routes {
if i != 0 {
buf.WriteString(`,`)
}
{
if v == nil {
buf.WriteString("null")
} else {
obj, err = v.MarshalJSON()
if err != nil {
return err
}
buf.Write(obj)
}
}
}
buf.WriteString(`]`)
} else {
buf.WriteString(`null`)
}
buf.WriteByte(',')
}
buf.Rewind(1)
buf.WriteByte('}')
return nil
@ -2909,11 +2968,11 @@ const (
ffjtcontainerStatePID
ffjtcontainerStateIPAddress
ffjtcontainerStateSubnetMask
ffjtcontainerStateExecSessions
ffjtcontainerStateIPs
ffjtcontainerStateRoutes
)
var ffjKeycontainerStateState = []byte("state")
@ -2936,12 +2995,12 @@ var ffjKeycontainerStateOOMKilled = []byte("oomKilled")
var ffjKeycontainerStatePID = []byte("pid")
var ffjKeycontainerStateIPAddress = []byte("ipAddress")
var ffjKeycontainerStateSubnetMask = []byte("subnetMask")
var ffjKeycontainerStateExecSessions = []byte("execSessions")
var ffjKeycontainerStateIPs = []byte("ipAddresses")
var ffjKeycontainerStateRoutes = []byte("routes")
// UnmarshalJSON umarshall json - template of ffjson
func (j *containerState) UnmarshalJSON(input []byte) error {
fs := fflib.NewFFLexer(input)
@ -3034,8 +3093,8 @@ mainparse:
case 'i':
if bytes.Equal(ffjKeycontainerStateIPAddress, kn) {
currentKey = ffjtcontainerStateIPAddress
if bytes.Equal(ffjKeycontainerStateIPs, kn) {
currentKey = ffjtcontainerStateIPs
state = fflib.FFParse_want_colon
goto mainparse
}
@ -3075,6 +3134,11 @@ mainparse:
currentKey = ffjtcontainerStateRunDir
state = fflib.FFParse_want_colon
goto mainparse
} else if bytes.Equal(ffjKeycontainerStateRoutes, kn) {
currentKey = ffjtcontainerStateRoutes
state = fflib.FFParse_want_colon
goto mainparse
}
case 's':
@ -3088,33 +3152,28 @@ mainparse:
currentKey = ffjtcontainerStateStartedTime
state = fflib.FFParse_want_colon
goto mainparse
} else if bytes.Equal(ffjKeycontainerStateSubnetMask, kn) {
currentKey = ffjtcontainerStateSubnetMask
state = fflib.FFParse_want_colon
goto mainparse
}
}
if fflib.EqualFoldRight(ffjKeycontainerStateRoutes, kn) {
currentKey = ffjtcontainerStateRoutes
state = fflib.FFParse_want_colon
goto mainparse
}
if fflib.EqualFoldRight(ffjKeycontainerStateIPs, kn) {
currentKey = ffjtcontainerStateIPs
state = fflib.FFParse_want_colon
goto mainparse
}
if fflib.EqualFoldRight(ffjKeycontainerStateExecSessions, kn) {
currentKey = ffjtcontainerStateExecSessions
state = fflib.FFParse_want_colon
goto mainparse
}
if fflib.EqualFoldRight(ffjKeycontainerStateSubnetMask, kn) {
currentKey = ffjtcontainerStateSubnetMask
state = fflib.FFParse_want_colon
goto mainparse
}
if fflib.EqualFoldRight(ffjKeycontainerStateIPAddress, kn) {
currentKey = ffjtcontainerStateIPAddress
state = fflib.FFParse_want_colon
goto mainparse
}
if fflib.SimpleLetterEqualFold(ffjKeycontainerStatePID, kn) {
currentKey = ffjtcontainerStatePID
state = fflib.FFParse_want_colon
@ -3222,15 +3281,15 @@ mainparse:
case ffjtcontainerStatePID:
goto handle_PID
case ffjtcontainerStateIPAddress:
goto handle_IPAddress
case ffjtcontainerStateSubnetMask:
goto handle_SubnetMask
case ffjtcontainerStateExecSessions:
goto handle_ExecSessions
case ffjtcontainerStateIPs:
goto handle_IPs
case ffjtcontainerStateRoutes:
goto handle_Routes
case ffjtcontainerStatenosuchkey:
err = fs.SkipField(tok)
if err != nil {
@ -3533,58 +3592,6 @@ handle_PID:
state = fflib.FFParse_after_value
goto mainparse
handle_IPAddress:
/* handler: j.IPAddress type=string kind=string quoted=false*/
{
{
if tok != fflib.FFTok_string && tok != fflib.FFTok_null {
return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok))
}
}
if tok == fflib.FFTok_null {
} else {
outBuf := fs.Output.Bytes()
j.IPAddress = string(string(outBuf))
}
}
state = fflib.FFParse_after_value
goto mainparse
handle_SubnetMask:
/* handler: j.SubnetMask type=string kind=string quoted=false*/
{
{
if tok != fflib.FFTok_string && tok != fflib.FFTok_null {
return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok))
}
}
if tok == fflib.FFTok_null {
} else {
outBuf := fs.Output.Bytes()
j.SubnetMask = string(string(outBuf))
}
}
state = fflib.FFParse_after_value
goto mainparse
handle_ExecSessions:
/* handler: j.ExecSessions type=map[string]*libpod.ExecSession kind=map quoted=false*/
@ -3690,6 +3697,152 @@ handle_ExecSessions:
state = fflib.FFParse_after_value
goto mainparse
handle_IPs:
/* handler: j.IPs type=[]*current.IPConfig kind=slice quoted=false*/
{
{
if tok != fflib.FFTok_left_brace && tok != fflib.FFTok_null {
return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for ", tok))
}
}
if tok == fflib.FFTok_null {
j.IPs = nil
} else {
j.IPs = []*current.IPConfig{}
wantVal := true
for {
var tmpJIPs *current.IPConfig
tok = fs.Scan()
if tok == fflib.FFTok_error {
goto tokerror
}
if tok == fflib.FFTok_right_brace {
break
}
if tok == fflib.FFTok_comma {
if wantVal == true {
// TODO(pquerna): this isn't an ideal error message, this handles
// things like [,,,] as an array value.
return fs.WrapErr(fmt.Errorf("wanted value token, but got token: %v", tok))
}
continue
} else {
wantVal = true
}
/* handler: tmpJIPs type=*current.IPConfig kind=ptr quoted=false*/
{
if tok == fflib.FFTok_null {
} else {
tbuf, err := fs.CaptureField(tok)
if err != nil {
return fs.WrapErr(err)
}
err = tmpJIPs.UnmarshalJSON(tbuf)
if err != nil {
return fs.WrapErr(err)
}
}
state = fflib.FFParse_after_value
}
j.IPs = append(j.IPs, tmpJIPs)
wantVal = false
}
}
}
state = fflib.FFParse_after_value
goto mainparse
handle_Routes:
/* handler: j.Routes type=[]*types.Route kind=slice quoted=false*/
{
{
if tok != fflib.FFTok_left_brace && tok != fflib.FFTok_null {
return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for ", tok))
}
}
if tok == fflib.FFTok_null {
j.Routes = nil
} else {
j.Routes = []*types.Route{}
wantVal := true
for {
var tmpJRoutes *types.Route
tok = fs.Scan()
if tok == fflib.FFTok_error {
goto tokerror
}
if tok == fflib.FFTok_right_brace {
break
}
if tok == fflib.FFTok_comma {
if wantVal == true {
// TODO(pquerna): this isn't an ideal error message, this handles
// things like [,,,] as an array value.
return fs.WrapErr(fmt.Errorf("wanted value token, but got token: %v", tok))
}
continue
} else {
wantVal = true
}
/* handler: tmpJRoutes type=*types.Route kind=ptr quoted=false*/
{
if tok == fflib.FFTok_null {
} else {
tbuf, err := fs.CaptureField(tok)
if err != nil {
return fs.WrapErr(err)
}
err = tmpJRoutes.UnmarshalJSON(tbuf)
if err != nil {
return fs.WrapErr(err)
}
}
state = fflib.FFParse_after_value
}
j.Routes = append(j.Routes, tmpJRoutes)
wantVal = false
}
}
}
state = fflib.FFParse_after_value
goto mainparse
wantedvalue:
return fs.WrapErr(fmt.Errorf("wanted value token, but got token: %v", tok))
wrongtokenerror:

View File

@ -382,8 +382,8 @@ func (c *Container) cleanupNetwork() error {
}
c.state.NetNS = nil
c.state.SubnetMask = ""
c.state.IPAddress = ""
c.state.IPs = nil
c.state.Routes = nil
return c.save()
}

View File

@ -3,6 +3,7 @@ package libpod
import (
"net"
cnitypes "github.com/containernetworking/cni/pkg/types/current"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/cri-o/ocicni/pkg/ocicni"
"github.com/pkg/errors"
@ -37,12 +38,17 @@ func (r *Runtime) createNetNS(ctr *Container) (err error) {
logrus.Debugf("Made network namespace at %s for container %s", ctrNS.Path(), ctr.ID())
podNetwork := getPodNetwork(ctr.ID(), ctr.Name(), ctrNS.Path(), ctr.config.PortMappings)
_, err = r.netPlugin.SetUpPod(podNetwork)
result, err := r.netPlugin.SetUpPod(podNetwork)
if err != nil {
return errors.Wrapf(err, "error configuring network namespace for container %s", ctr.ID())
}
resultStruct := result.(*cnitypes.Result)
logrus.Debugf("Response from CNI plugins: %v", resultStruct.String())
ctr.state.NetNS = ctrNS
ctr.state.IPs = resultStruct.IPs
ctr.state.Routes = resultStruct.Routes
return nil
}

View File

@ -5,6 +5,8 @@ import (
"encoding/json"
"os"
"github.com/containernetworking/cni/pkg/types"
cnitypes "github.com/containernetworking/cni/pkg/types/current"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -14,7 +16,7 @@ import (
// DBSchema is the current DB schema version
// Increments every time a change is made to the database's tables
const DBSchema = 12
const DBSchema = 13
// SQLState is a state implementation backed by a persistent SQLite3 database
type SQLState struct {
@ -101,9 +103,9 @@ func (s *SQLState) Refresh() (err error) {
Mountpoint=?,
Pid=?,
NetNSPath=?,
IPAddress=?,
SubnetMask=?,
ExecSessions=?;`
ExecSessions=?,
IPs=?,
Routes=?;`
if !s.valid {
return ErrDBClosed
@ -132,9 +134,9 @@ func (s *SQLState) Refresh() (err error) {
"",
0,
"",
"",
"",
"{}")
"{}",
"[]",
"[]")
if err != nil {
return errors.Wrapf(err, "error refreshing database state")
}
@ -270,9 +272,9 @@ func (s *SQLState) UpdateContainer(ctr *Container) error {
OomKilled,
Pid,
NetNSPath,
IPAddress,
SubnetMask,
ExecSessions
ExecSessions,
IPs,
Routes
FROM containerState WHERE ID=?;`
var (
@ -286,9 +288,9 @@ func (s *SQLState) UpdateContainer(ctr *Container) error {
oomKilled int
pid int
netNSPath string
ipAddress string
subnetMask string
execSessions string
ipsJSON string
routesJSON string
)
if !s.valid {
@ -311,9 +313,9 @@ func (s *SQLState) UpdateContainer(ctr *Container) error {
&oomKilled,
&pid,
&netNSPath,
&ipAddress,
&subnetMask,
&execSessions)
&execSessions,
&ipsJSON,
&routesJSON)
if err != nil {
// The container may not exist in the database
if err == sql.ErrNoRows {
@ -335,14 +337,28 @@ func (s *SQLState) UpdateContainer(ctr *Container) error {
newState.ExitCode = exitCode
newState.OOMKilled = boolFromSQL(oomKilled)
newState.PID = pid
newState.IPAddress = ipAddress
newState.SubnetMask = subnetMask
newState.ExecSessions = make(map[string]*ExecSession)
if err := json.Unmarshal([]byte(execSessions), &newState.ExecSessions); err != nil {
return errors.Wrapf(err, "error parsing container %s exec sessions", ctr.ID())
}
ips := []*cnitypes.IPConfig{}
if err := json.Unmarshal([]byte(ipsJSON), &ips); err != nil {
return errors.Wrapf(err, "error parsing container %s IPs JSON", ctr.ID())
}
if len(ips) > 0 {
newState.IPs = ips
}
routes := []*types.Route{}
if err := json.Unmarshal([]byte(routesJSON), &routes); err != nil {
return errors.Wrapf(err, "error parsing container %s routes JSON", ctr.ID())
}
if len(routes) > 0 {
newState.Routes = routes
}
if newState.Mountpoint != "" {
newState.Mounted = true
}
@ -404,9 +420,9 @@ func (s *SQLState) SaveContainer(ctr *Container) (err error) {
OomKilled=?,
Pid=?,
NetNSPath=?,
IPAddress=?,
SubnetMask=?,
ExecSessions=?
ExecSessions=?,
IPs=?,
Routes=?
WHERE Id=?;`
if !ctr.valid {
@ -423,6 +439,16 @@ func (s *SQLState) SaveContainer(ctr *Container) (err error) {
netNSPath = ctr.state.NetNS.Path()
}
ipsJSON, err := json.Marshal(ctr.state.IPs)
if err != nil {
return errors.Wrapf(err, "error marshalling container %s IPs to JSON", ctr.ID())
}
routesJSON, err := json.Marshal(ctr.state.Routes)
if err != nil {
return errors.Wrapf(err, "error marshalling container %s routes to JSON", ctr.ID())
}
if !s.valid {
return ErrDBClosed
}
@ -453,9 +479,9 @@ func (s *SQLState) SaveContainer(ctr *Container) (err error) {
boolToSQL(ctr.state.OOMKilled),
ctr.state.PID,
netNSPath,
ctr.state.IPAddress,
ctr.state.SubnetMask,
execSessionsJSON,
ipsJSON,
routesJSON,
ctr.ID())
if err != nil {
return errors.Wrapf(err, "error updating container %s state in database", ctr.ID())

View File

@ -8,6 +8,8 @@ import (
"path/filepath"
"time"
"github.com/containernetworking/cni/pkg/types"
cnitypes "github.com/containernetworking/cni/pkg/types/current"
"github.com/containers/storage"
spec "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
@ -32,9 +34,9 @@ const (
containerState.OomKilled,
containerState.Pid,
containerState.NetNSPath,
containerState.IPAddress,
containerState.SubnetMask,
containerState.ExecSessions
containerState.ExecSessions,
containerState.IPs,
containerState.Routes
FROM containers
INNER JOIN
containerState ON containers.Id = containerState.Id `
@ -273,9 +275,9 @@ func prepareDB(db *sql.DB) (err error) {
OomKilled INTEGER NOT NULL,
Pid INTEGER NOT NULL,
NetNSPath TEXT NOT NULL,
IPAddress TEXT NOT NULL,
SubnetMask TEXT NOT NULL,
ExecSessions TEXT NOT NULL,
IPs TEXT NOT NULL,
Routes TEXT NOT NULL.
CHECK (State>0),
CHECK (OomKilled IN (0, 1)),
@ -483,9 +485,9 @@ func (s *SQLState) ctrFromScannable(row scannable) (*Container, error) {
oomKilled int
pid int
netNSPath string
ipAddress string
subnetMask string
execSessions string
ipsJSON string
routesJSON string
)
err := row.Scan(
@ -538,9 +540,9 @@ func (s *SQLState) ctrFromScannable(row scannable) (*Container, error) {
&oomKilled,
&pid,
&netNSPath,
&ipAddress,
&subnetMask,
&execSessions)
&execSessions,
&ipsJSON,
&routesJSON)
if err != nil {
if err == sql.ErrNoRows {
return nil, ErrNoSuchCtr
@ -592,8 +594,6 @@ func (s *SQLState) ctrFromScannable(row scannable) (*Container, error) {
ctr.state.ExitCode = exitCode
ctr.state.OOMKilled = boolFromSQL(oomKilled)
ctr.state.PID = pid
ctr.state.IPAddress = ipAddress
ctr.state.SubnetMask = subnetMask
// TODO should we store this in the database separately instead?
if ctr.state.Mountpoint != "" {
@ -625,6 +625,22 @@ func (s *SQLState) ctrFromScannable(row scannable) (*Container, error) {
return nil, errors.Wrapf(err, "error parsing container %s exec sessions JSON", id)
}
ips := []*cnitypes.IPConfig{}
if err := json.Unmarshal([]byte(ipsJSON), &ips); err != nil {
return nil, errors.Wrapf(err, "error parsing container %s IP addresses JSON", id)
}
if len(ips) > 0 {
ctr.state.IPs = ips
}
routes := []*types.Route{}
if err := json.Unmarshal([]byte(routesJSON), &routes); err != nil {
return nil, errors.Wrapf(err, "error parsing container %s routes JSON", id)
}
if len(routes) > 0 {
ctr.state.Routes = routes
}
labels := make(map[string]string)
if err := json.Unmarshal([]byte(labelsJSON), &labels); err != nil {
return nil, errors.Wrapf(err, "error parsing container %s labels JSON", id)
@ -809,6 +825,16 @@ func (s *SQLState) addContainer(ctr *Container, pod *Pod) (err error) {
return errors.Wrapf(err, "error marshalling container %s exec sessions to JSON", ctr.ID())
}
ipsJSON, err := json.Marshal(ctr.state.IPs)
if err != nil {
return errors.Wrapf(err, "error marshalling container %s IPs to JSON", ctr.ID())
}
routesJSON, err := json.Marshal(ctr.state.Routes)
if err != nil {
return errors.Wrapf(err, "error marshalling container %s routes to JSON", ctr.ID())
}
netNSPath := ""
if ctr.state.NetNS != nil {
netNSPath = ctr.state.NetNS.Path()
@ -931,9 +957,9 @@ func (s *SQLState) addContainer(ctr *Container, pod *Pod) (err error) {
boolToSQL(ctr.state.OOMKilled),
ctr.state.PID,
netNSPath,
ctr.state.IPAddress,
ctr.state.SubnetMask,
execSessionsJSON)
execSessionsJSON,
ipsJSON,
routesJSON)
if err != nil {
return errors.Wrapf(err, "error adding container %s state to database", ctr.ID())
}