312 lines
6.8 KiB
Go
312 lines
6.8 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package zookeeper
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"path"
|
|
reflect "reflect"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-zookeeper/zk"
|
|
jsoniter "github.com/json-iterator/go"
|
|
|
|
"github.com/dapr/components-contrib/metadata"
|
|
"github.com/dapr/components-contrib/state"
|
|
"github.com/dapr/kit/logger"
|
|
kitmd "github.com/dapr/kit/metadata"
|
|
"github.com/dapr/kit/ptr"
|
|
)
|
|
|
|
const (
|
|
anyVersion = -1
|
|
defaultMaxBufferSize = 1024 * 1024
|
|
defaultMaxConnBufferSize = 1024 * 1024
|
|
)
|
|
|
|
var (
|
|
errMissingServers = errors.New("servers are required")
|
|
errInvalidSessionTimeout = errors.New("sessionTimeout is invalid")
|
|
)
|
|
|
|
type properties struct {
|
|
Servers string `json:"servers"`
|
|
SessionTimeout string `json:"sessionTimeout"`
|
|
MaxBufferSize int `json:"maxBufferSize"`
|
|
MaxConnBufferSize int `json:"maxConnBufferSize"`
|
|
KeyPrefixPath string `json:"keyPrefixPath"`
|
|
}
|
|
|
|
type config struct {
|
|
servers []string
|
|
sessionTimeout time.Duration
|
|
maxBufferSize int
|
|
maxConnBufferSize int
|
|
keyPrefixPath string
|
|
}
|
|
|
|
func newConfig(meta map[string]string) (c *config, err error) {
|
|
var props properties
|
|
errDecode := kitmd.DecodeMetadata(meta, &props)
|
|
if errDecode != nil {
|
|
return nil, errDecode
|
|
}
|
|
|
|
return props.parse()
|
|
}
|
|
|
|
func (props *properties) parse() (*config, error) {
|
|
if len(props.Servers) == 0 {
|
|
return nil, errMissingServers
|
|
}
|
|
|
|
sessionTimeout, err := time.ParseDuration(props.SessionTimeout)
|
|
if err != nil {
|
|
return nil, errInvalidSessionTimeout
|
|
}
|
|
|
|
maxBufferSize := defaultMaxBufferSize
|
|
if props.MaxBufferSize > 0 {
|
|
maxBufferSize = props.MaxBufferSize
|
|
}
|
|
|
|
maxConnBufferSize := defaultMaxConnBufferSize
|
|
if props.MaxConnBufferSize > 0 {
|
|
maxConnBufferSize = props.MaxConnBufferSize
|
|
}
|
|
|
|
return &config{
|
|
servers: strings.Split(props.Servers, ","),
|
|
sessionTimeout: sessionTimeout,
|
|
maxBufferSize: maxBufferSize,
|
|
maxConnBufferSize: maxConnBufferSize,
|
|
keyPrefixPath: props.KeyPrefixPath,
|
|
}, nil
|
|
}
|
|
|
|
type Conn interface {
|
|
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
|
|
|
|
Get(path string) ([]byte, *zk.Stat, error)
|
|
|
|
Set(path string, data []byte, version int32) (*zk.Stat, error)
|
|
|
|
Delete(path string, version int32) error
|
|
|
|
Multi(ops ...interface{}) ([]zk.MultiResponse, error)
|
|
}
|
|
|
|
//--- StateStore ---
|
|
|
|
// StateStore is a state store.
|
|
type StateStore struct {
|
|
state.BulkStore
|
|
|
|
*config
|
|
conn Conn
|
|
|
|
features []state.Feature
|
|
logger logger.Logger
|
|
}
|
|
|
|
// NewZookeeperStateStore returns a new Zookeeper state store.
|
|
func NewZookeeperStateStore(logger logger.Logger) state.Store {
|
|
s := &StateStore{
|
|
features: []state.Feature{state.FeatureETag},
|
|
logger: logger,
|
|
}
|
|
s.BulkStore = state.NewDefaultBulkStore(s)
|
|
return s
|
|
}
|
|
|
|
func (s *StateStore) Init(_ context.Context, metadata state.Metadata) (err error) {
|
|
var c *config
|
|
|
|
if c, err = newConfig(metadata.Properties); err != nil {
|
|
return
|
|
}
|
|
|
|
conn, _, err := zk.Connect(c.servers, c.sessionTimeout,
|
|
zk.WithMaxBufferSize(c.maxBufferSize), zk.WithMaxConnBufferSize(c.maxConnBufferSize))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
s.config = c
|
|
s.conn = conn
|
|
|
|
return
|
|
}
|
|
|
|
// Features returns the features available in this state store.
|
|
func (s *StateStore) Features() []state.Feature {
|
|
return s.features
|
|
}
|
|
|
|
// Get retrieves state from Zookeeper with a key.
|
|
func (s *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
|
|
value, stat, err := s.conn.Get(s.prefixedKey(req.Key))
|
|
if err != nil {
|
|
if errors.Is(err, zk.ErrNoNode) {
|
|
return &state.GetResponse{}, nil
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
return &state.GetResponse{
|
|
Data: value,
|
|
ETag: ptr.Of(strconv.Itoa(int(stat.Version))),
|
|
}, nil
|
|
}
|
|
|
|
// Delete performs a delete operation.
|
|
func (s *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
|
|
r, err := s.newDeleteRequest(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.conn.Delete(r.Path, r.Version)
|
|
if errors.Is(err, zk.ErrNoNode) {
|
|
return nil
|
|
}
|
|
|
|
if err != nil {
|
|
if req.HasETag() {
|
|
return state.NewETagError(state.ETagMismatch, err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Set saves state into Zookeeper.
|
|
func (s *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
|
|
r, err := s.newSetDataRequest(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = s.conn.Set(r.Path, r.Data, r.Version)
|
|
if errors.Is(err, zk.ErrNoNode) {
|
|
_, err = s.conn.Create(r.Path, r.Data, 0, nil)
|
|
}
|
|
|
|
if err != nil {
|
|
if req.HasETag() {
|
|
return state.NewETagError(state.ETagMismatch, err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *StateStore) newDeleteRequest(req *state.DeleteRequest) (*zk.DeleteRequest, error) {
|
|
err := state.CheckRequestOptions(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var version int32
|
|
|
|
if req.Options.Concurrency == state.LastWrite {
|
|
version = anyVersion
|
|
} else {
|
|
var etag string
|
|
|
|
if req.HasETag() {
|
|
etag = *req.ETag
|
|
}
|
|
version = s.parseETag(etag)
|
|
}
|
|
|
|
return &zk.DeleteRequest{
|
|
Path: s.prefixedKey(req.Key),
|
|
Version: version,
|
|
}, nil
|
|
}
|
|
|
|
func (s *StateStore) newSetDataRequest(req *state.SetRequest) (*zk.SetDataRequest, error) {
|
|
err := state.CheckRequestOptions(req.Options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
data, err := s.marshalData(req.Value)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var version int32
|
|
|
|
if req.Options.Concurrency == state.LastWrite {
|
|
version = anyVersion
|
|
} else {
|
|
var etag string
|
|
|
|
if req.HasETag() {
|
|
etag = *req.ETag
|
|
}
|
|
version = s.parseETag(etag)
|
|
}
|
|
|
|
return &zk.SetDataRequest{
|
|
Path: s.prefixedKey(req.Key),
|
|
Data: data,
|
|
Version: version,
|
|
}, nil
|
|
}
|
|
|
|
func (s *StateStore) prefixedKey(key string) string {
|
|
if s.config == nil {
|
|
return key
|
|
}
|
|
|
|
return path.Join(s.keyPrefixPath, key)
|
|
}
|
|
|
|
func (s *StateStore) parseETag(etag string) int32 {
|
|
if etag != "" {
|
|
// Since the version is taken to be int32
|
|
version, err := strconv.ParseInt(etag, 10, 32)
|
|
if err == nil {
|
|
return int32(version)
|
|
}
|
|
}
|
|
|
|
return anyVersion
|
|
}
|
|
|
|
func (s *StateStore) marshalData(v interface{}) ([]byte, error) {
|
|
if buf, ok := v.([]byte); ok {
|
|
return buf, nil
|
|
}
|
|
|
|
return jsoniter.ConfigFastest.Marshal(v)
|
|
}
|
|
|
|
func (s *StateStore) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
|
|
metadataStruct := properties{}
|
|
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.StateStoreType)
|
|
return
|
|
}
|