Merge pull request #1158 from mheon/prevent_multiple_boltdb_conns

Add a mutex to BoltDB state to prevent lock issues
This commit is contained in:
Matthew Heon 2018-07-26 10:41:30 -04:00 committed by GitHub
commit d9ae17400d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 23 deletions

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"os"
"strings"
"sync"
"github.com/boltdb/bolt"
"github.com/pkg/errors"
@ -15,6 +16,7 @@ import (
type BoltState struct {
valid bool
dbPath string
dbLock sync.Mutex
namespace string
namespaceBytes []byte
lockDir string
@ -73,6 +75,12 @@ func NewBoltState(path, lockDir string, runtime *Runtime) (State, error) {
if err != nil {
return nil, errors.Wrapf(err, "error opening database %s", path)
}
// Everywhere else, we use s.closeDBCon(db) to ensure the state's DB
// mutex is also unlocked.
// However, here, the mutex has not been locked, since we just created
// the DB connection, and it hasn't left this function yet - no risk of
// concurrent access.
// As such, just a db.Close() is fine here.
defer db.Close()
// Perform initial database setup
@ -133,7 +141,7 @@ func (s *BoltState) Refresh() error {
if err != nil {
return err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.Update(func(tx *bolt.Tx) error {
idBucket, err := getIDBucket(tx)
@ -266,7 +274,7 @@ func (s *BoltState) Container(id string) (*Container, error) {
if err != nil {
return nil, err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.View(func(tx *bolt.Tx) error {
ctrBucket, err := getCtrBucket(tx)
@ -302,7 +310,7 @@ func (s *BoltState) LookupContainer(idOrName string) (*Container, error) {
if err != nil {
return nil, err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.View(func(tx *bolt.Tx) error {
idBucket, err := getIDBucket(tx)
@ -390,7 +398,7 @@ func (s *BoltState) HasContainer(id string) (bool, error) {
if err != nil {
return false, err
}
defer db.Close()
defer s.closeDBCon(db)
exists := false
@ -455,7 +463,7 @@ func (s *BoltState) RemoveContainer(ctr *Container) error {
if err != nil {
return err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.Update(func(tx *bolt.Tx) error {
return s.removeContainer(ctr, nil, tx)
@ -486,7 +494,7 @@ func (s *BoltState) UpdateContainer(ctr *Container) error {
if err != nil {
return err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.View(func(tx *bolt.Tx) error {
ctrBucket, err := getCtrBucket(tx)
@ -555,7 +563,7 @@ func (s *BoltState) SaveContainer(ctr *Container) error {
if err != nil {
return err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.Update(func(tx *bolt.Tx) error {
ctrBucket, err := getCtrBucket(tx)
@ -612,7 +620,7 @@ func (s *BoltState) ContainerInUse(ctr *Container) ([]string, error) {
if err != nil {
return nil, err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.View(func(tx *bolt.Tx) error {
ctrBucket, err := getCtrBucket(tx)
@ -663,7 +671,7 @@ func (s *BoltState) AllContainers() ([]*Container, error) {
if err != nil {
return nil, err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.View(func(tx *bolt.Tx) error {
allCtrsBucket, err := getAllCtrsBucket(tx)
@ -732,7 +740,7 @@ func (s *BoltState) Pod(id string) (*Pod, error) {
if err != nil {
return nil, err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.View(func(tx *bolt.Tx) error {
podBkt, err := getPodBucket(tx)
@ -767,7 +775,7 @@ func (s *BoltState) LookupPod(idOrName string) (*Pod, error) {
if err != nil {
return nil, err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.View(func(tx *bolt.Tx) error {
idBucket, err := getIDBucket(tx)
@ -859,7 +867,7 @@ func (s *BoltState) HasPod(id string) (bool, error) {
if err != nil {
return false, err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.View(func(tx *bolt.Tx) error {
podBkt, err := getPodBucket(tx)
@ -915,7 +923,7 @@ func (s *BoltState) PodHasContainer(pod *Pod, id string) (bool, error) {
if err != nil {
return false, err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.View(func(tx *bolt.Tx) error {
podBkt, err := getPodBucket(tx)
@ -977,7 +985,7 @@ func (s *BoltState) PodContainersByID(pod *Pod) ([]string, error) {
if err != nil {
return nil, err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.View(func(tx *bolt.Tx) error {
podBkt, err := getPodBucket(tx)
@ -1039,7 +1047,7 @@ func (s *BoltState) PodContainers(pod *Pod) ([]*Container, error) {
if err != nil {
return nil, err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.View(func(tx *bolt.Tx) error {
podBkt, err := getPodBucket(tx)
@ -1123,7 +1131,7 @@ func (s *BoltState) AddPod(pod *Pod) error {
if err != nil {
return err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.Update(func(tx *bolt.Tx) error {
podBkt, err := getPodBucket(tx)
@ -1232,7 +1240,7 @@ func (s *BoltState) RemovePod(pod *Pod) error {
if err != nil {
return err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.Update(func(tx *bolt.Tx) error {
podBkt, err := getPodBucket(tx)
@ -1327,7 +1335,7 @@ func (s *BoltState) RemovePodContainers(pod *Pod) error {
if err != nil {
return err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.Update(func(tx *bolt.Tx) error {
podBkt, err := getPodBucket(tx)
@ -1488,7 +1496,7 @@ func (s *BoltState) RemoveContainerFromPod(pod *Pod, ctr *Container) error {
if err != nil {
return err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.Update(func(tx *bolt.Tx) error {
return s.removeContainer(ctr, pod, tx)
@ -1516,7 +1524,7 @@ func (s *BoltState) UpdatePod(pod *Pod) error {
if err != nil {
return err
}
defer db.Close()
defer s.closeDBCon(db)
podID := []byte(pod.ID())
@ -1576,7 +1584,7 @@ func (s *BoltState) SavePod(pod *Pod) error {
if err != nil {
return err
}
defer db.Close()
defer s.closeDBCon(db)
podID := []byte(pod.ID())
@ -1618,7 +1626,7 @@ func (s *BoltState) AllPods() ([]*Pod, error) {
if err != nil {
return nil, err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.View(func(tx *bolt.Tx) error {
allPodsBucket, err := getAllPodsBucket(tx)

View File

@ -145,7 +145,15 @@ func validateDBAgainstConfig(bucket *bolt.Bucket, fieldName, runtimeValue string
return nil
}
// Open a connection to the database.
// Must be paired with a `defer closeDBCon()` on the returned database, to
// ensure the state is properly unlocked
func (s *BoltState) getDBCon() (*bolt.DB, error) {
// We need an in-memory lock to avoid issues around POSIX file advisory
// locks as described in the link below:
// https://www.sqlite.org/src/artifact/c230a7a24?ln=994-1081
s.dbLock.Lock()
db, err := bolt.Open(s.dbPath, 0600, nil)
if err != nil {
return nil, errors.Wrapf(err, "error opening database %s", s.dbPath)
@ -154,6 +162,17 @@ func (s *BoltState) getDBCon() (*bolt.DB, error) {
return db, nil
}
// Close a connection to the database.
// MUST be used in place of `db.Close()` to ensure proper unlocking of the
// state.
func (s *BoltState) closeDBCon(db *bolt.DB) error {
err := db.Close()
s.dbLock.Unlock()
return err
}
func getIDBucket(tx *bolt.Tx) (*bolt.Bucket, error) {
bkt := tx.Bucket(idRegistryBkt)
if bkt == nil {
@ -296,7 +315,7 @@ func (s *BoltState) addContainer(ctr *Container, pod *Pod) error {
if err != nil {
return err
}
defer db.Close()
defer s.closeDBCon(db)
err = db.Update(func(tx *bolt.Tx) error {
idsBucket, err := getIDBucket(tx)