diff --git a/pkg/store/consul.go b/pkg/store/consul.go index dd902fd073..39a01e11cd 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -95,22 +95,12 @@ func (s *Consul) setEphemeralTTL(ttl time.Duration) error { return nil } -// CreateEphemeralSession creates the a global session +// createEphemeralSession creates the global session // once that is used to delete keys at node failure -func (s *Consul) createEphemeralSession(key string) error { +func (s *Consul) createEphemeralSession() error { s.Lock() defer s.Unlock() - // Recover existing global session - pair, _, err := s.client.KV().Get(key, nil) - if err != nil { - return err - } - if pair != nil && pair.Session != "" { - s.ephemeralSession = pair.Session - return nil - } - // Create new session if s.ephemeralSession == "" { entry := &api.SessionEntry{ @@ -127,6 +117,18 @@ func (s *Consul) createEphemeralSession(key string) error { return nil } +// checkActiveSession checks if the key already has a session attached +func (s *Consul) checkActiveSession(key string) (string, error) { + pair, _, err := s.client.KV().Get(key, nil) + if err != nil { + return "", err + } + if pair != nil && pair.Session != "" { + return pair.Session, nil + } + return "", nil +} + // Normalize the key for usage in Consul func (s *Consul) normalize(key string) string { key = normalize(key) @@ -161,20 +163,32 @@ func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error { } if opts != nil && opts.Ephemeral { - // Creates the global ephemeral session - // if it does not exist + // Check if there is any previous session with an active TTL + previous, err := s.checkActiveSession(key) + if err != nil { + return err + } + + // Create the global ephemeral session if it does not exist yet if s.ephemeralSession == "" { - err := s.createEphemeralSession(key) - if err != nil { + if err = s.createEphemeralSession(); err != nil { return err } } + // If a previous session is still active for that key, use it + // else we use the global ephemeral session + if previous != "" { + p.Session = previous + } else { + p.Session = s.ephemeralSession + } + // Create lock option with the // EphemeralSession lockOpts := &api.LockOptions{ Key: key, - Session: s.ephemeralSession, + Session: p.Session, } // Lock and ignore if lock is held @@ -185,11 +199,8 @@ func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error { lock.Lock(nil) } - // Place the session on key - p.Session = s.ephemeralSession - // Renew the session - _, _, err := s.client.Session().Renew(p.Session, nil) + _, _, err = s.client.Session().Renew(p.Session, nil) if err != nil { s.ephemeralSession = "" return err