support metadata in state bulk get state (#114)

* update GetBulkState method: add metedata, change return struct to BulkStateItem

* generate code from dapr proto files; support metadata in get response

* update for code review
This commit is contained in:
Sky/敖小剑 2020-12-07 21:33:47 +08:00 committed by GitHub
parent 0ca0bbebc0
commit 32dbe98dc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 10 deletions

View File

@ -64,7 +64,7 @@ type Client interface {
GetStateWithConsistency(ctx context.Context, store, key string, meta map[string]string, sc StateConsistency) (item *StateItem, err error)
// GetBulkState retrieves state for multiple keys from specific store.
GetBulkState(ctx context.Context, store string, keys []string, parallelism int32) ([]*StateItem, error)
GetBulkState(ctx context.Context, store string, keys []string, meta map[string]string, parallelism int32) ([]*BulkStateItem, error)
// DeleteState deletes content from store using default state options.
DeleteState(ctx context.Context, store, key string) error

View File

@ -106,6 +106,15 @@ type StateItem struct {
Etag string
}
// StateItem represents a single state item.
type BulkStateItem struct {
Key string
Value []byte
Etag string
Metadata map[string]string
Error string
}
// SetStateItem represents a single state to be persisted.
type SetStateItem struct {
Key string
@ -215,18 +224,19 @@ func (c *GRPCClient) SaveBulkState(ctx context.Context, store string, items ...*
}
// GetBulkState retreaves state for multiple keys from specific store.
func (c *GRPCClient) GetBulkState(ctx context.Context, store string, keys []string, parallelism int32) ([]*StateItem, error) {
func (c *GRPCClient) GetBulkState(ctx context.Context, store string, keys []string, meta map[string]string, parallelism int32) ([]*BulkStateItem, error) {
if store == "" {
return nil, errors.New("nil store")
}
if len(keys) == 0 {
return nil, errors.New("keys required")
}
items := make([]*StateItem, 0)
items := make([]*BulkStateItem, 0)
req := &pb.GetBulkStateRequest{
StoreName: store,
Keys: keys,
Metadata: meta,
Parallelism: parallelism,
}
@ -240,10 +250,12 @@ func (c *GRPCClient) GetBulkState(ctx context.Context, store string, keys []stri
}
for _, r := range results.Items {
item := &StateItem{
Key: r.Key,
Etag: r.Etag,
Value: r.Data,
item := &BulkStateItem{
Key: r.Key,
Etag: r.Etag,
Value: r.Data,
Metadata: r.Metadata,
Error: r.Error,
}
items = append(items, item)
}

View File

@ -185,7 +185,7 @@ func TestStateTransactions(t *testing.T) {
})
t.Run("exec upserts", func(t *testing.T) {
items, err := testClient.GetBulkState(ctx, store, keys, 10)
items, err := testClient.GetBulkState(ctx, store, keys, nil, 10)
assert.Nil(t, err)
assert.NotNil(t, items)
assert.Len(t, items, len(keys))
@ -207,7 +207,7 @@ func TestStateTransactions(t *testing.T) {
})
t.Run("get and validate inserts", func(t *testing.T) {
items, err := testClient.GetBulkState(ctx, store, keys, 10)
items, err := testClient.GetBulkState(ctx, store, keys, nil, 10)
assert.Nil(t, err)
assert.NotNil(t, items)
assert.Len(t, items, len(keys))
@ -224,7 +224,7 @@ func TestStateTransactions(t *testing.T) {
})
t.Run("ensure deletes", func(t *testing.T) {
items, err := testClient.GetBulkState(ctx, store, keys, 3)
items, err := testClient.GetBulkState(ctx, store, keys, nil, 3)
assert.Nil(t, err)
assert.NotNil(t, items)
assert.Len(t, items, 0)