Merge branch 'feature/pubsub-batching' of github.com:dapr/components-contrib into ps_batch_c_kafka

This commit is contained in:
Deepanshu Agarwal 2022-09-16 11:36:23 +05:30
commit d31f4d99f5
11 changed files with 309 additions and 63 deletions

3
.gitignore vendored
View File

@ -4,4 +4,5 @@
vendor
.dccache
go.work
go.work.sum
go.work.sum
.DS_Store

View File

@ -17,6 +17,8 @@ import (
"errors"
"github.com/Shopify/sarama"
"github.com/dapr/components-contrib/pubsub"
)
func getSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int) (sarama.SyncProducer, error) {
@ -74,3 +76,39 @@ func (k *Kafka) Publish(topic string, data []byte, metadata map[string]string) e
return nil
}
func (k *Kafka) BulkPublish(topic string, entries []pubsub.BulkMessageEntry, metadata map[string]string) (pubsub.BulkPublishResponse, error) {
if k.producer == nil {
err := errors.New("component is closed")
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err), err
}
k.logger.Debugf("Bulk Publishing on topic %v", topic)
msgs := []*sarama.ProducerMessage{}
for _, entry := range entries {
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(entry.Event),
}
for name, value := range metadata {
if name == key {
msg.Key = sarama.StringEncoder(value)
} else {
if msg.Headers == nil {
msg.Headers = make([]sarama.RecordHeader, 0, len(metadata))
}
msg.Headers = append(msg.Headers, sarama.RecordHeader{
Key: []byte(name),
Value: []byte(value),
})
}
}
msgs = append(msgs, msg)
}
if err := k.producer.SendMessages(msgs); err != nil {
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err), err
}
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishSucceeded, nil), nil
}

View File

@ -120,6 +120,11 @@ func (p *PubSub) Publish(req *pubsub.PublishRequest) error {
return p.kafka.Publish(req.Topic, req.Data, req.Metadata)
}
// BatchPublish messages to Kafka cluster.
func (p *PubSub) BulkPublish(ctx context.Context, req *pubsub.BulkPublishRequest) (pubsub.BulkPublishResponse, error) {
return p.kafka.BulkPublish(req.Topic, req.Entries, req.Metadata)
}
func (p *PubSub) Close() (err error) {
p.subscribeCancel()
return p.kafka.Close()

View File

@ -34,7 +34,7 @@ type PubSub interface {
// BulkPublish publishes a collection of entries/messages in a BulkPublishRequest to a
// message bus topic and returns a BulkPublishResponse with individual statuses for each message.
type BulkPublisher interface {
BulkPublish(req *BulkPublishRequest) (BulkPublishResponse, error)
BulkPublish(ctx context.Context, req *BulkPublishRequest) (BulkPublishResponse, error)
}
// BulkSubscriber is the interface defining BulkSubscribe definition for message buses

View File

@ -53,7 +53,7 @@ type BulkMessage struct {
// BulkMessageEntry represents a single message inside a bulk request.
type BulkMessageEntry struct {
EntryID string `json:entryID`
EntryID string `json:"entryID"`
Event []byte `json:"event"`
ContentType string `json:"contentType,omitempty"`
Metadata map[string]string `json:"metadata"`

View File

@ -74,3 +74,20 @@ type BulkSubscribeResponse struct {
Error error `json:"error"`
Statuses []BulkSubscribeResponseEntry `json:"statuses"`
}
// NewBulkPublishResponse returns a BulkPublishResponse with each entry having same status and error.
// This method is a helper method to map a single error/success response on BulkPublish to multiple events.
func NewBulkPublishResponse(messages []BulkMessageEntry, status BulkPublishStatus, err error) BulkPublishResponse {
response := BulkPublishResponse{}
response.Statuses = make([]BulkPublishResponseEntry, len(messages))
for i, msg := range messages {
st := BulkPublishResponseEntry{}
st.EntryID = msg.EntryID
st.Status = status
if err != nil {
st.Error = err
}
response.Statuses[i] = st
}
return response
}

79
pubsub/responses_test.go Normal file
View File

@ -0,0 +1,79 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pubsub
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestNewBulkPublishResponse(t *testing.T) {
messages := []BulkMessageEntry{
{
EntryID: "1",
Event: []byte("event 1"),
Metadata: map[string]string{
"ttlInSeconds": "22",
},
ContentType: "text/plain",
},
{
EntryID: "2",
Event: []byte("event 2"),
Metadata: map[string]string{
"ttlInSeconds": "11",
},
ContentType: "text/plain",
},
}
t.Run("populate success", func(t *testing.T) {
res := NewBulkPublishResponse(messages, PublishSucceeded, nil)
assert.NotEmpty(t, res, "expected res to be populated")
assert.Equal(t, 2, len(res.Statuses), "expected two statuses")
expectedRes := BulkPublishResponse{
Statuses: []BulkPublishResponseEntry{
{
EntryID: "1",
Status: PublishSucceeded,
},
{
EntryID: "2",
Status: PublishSucceeded,
},
},
}
assert.ElementsMatch(t, expectedRes.Statuses, res.Statuses, "expected output to match")
})
t.Run("populate failure", func(t *testing.T) {
res := NewBulkPublishResponse(messages, PublishFailed, assert.AnError)
assert.NotEmpty(t, res, "expected res to be populated")
assert.Equal(t, 2, len(res.Statuses), "expected two statuses")
expectedRes := BulkPublishResponse{
Statuses: []BulkPublishResponseEntry{
{
EntryID: "1",
Status: PublishFailed,
Error: assert.AnError,
},
{
EntryID: "2",
Status: PublishFailed,
Error: assert.AnError,
},
},
}
assert.ElementsMatch(t, expectedRes.Statuses, res.Statuses, "expected output to match")
})
}

View File

@ -41,10 +41,12 @@ type parameterStoreMetaData struct {
AccessKey string `json:"accessKey"`
SecretKey string `json:"secretKey"`
SessionToken string `json:"sessionToken"`
Prefix string `json:"prefix"`
}
type ssmSecretStore struct {
client ssmiface.SSMAPI
prefix string
logger logger.Logger
}
@ -60,6 +62,7 @@ func (s *ssmSecretStore) Init(metadata secretstores.Metadata) error {
return err
}
s.client = client
s.prefix = meta.Prefix
return nil
}
@ -75,7 +78,7 @@ func (s *ssmSecretStore) GetSecret(req secretstores.GetSecretRequest) (secretsto
}
output, err := s.client.GetParameter(&ssm.GetParameterInput{
Name: aws.String(name),
Name: aws.String(s.prefix + name),
WithDecryption: aws.Bool(true),
})
if err != nil {
@ -86,7 +89,8 @@ func (s *ssmSecretStore) GetSecret(req secretstores.GetSecretRequest) (secretsto
Data: map[string]string{},
}
if output.Parameter.Name != nil && output.Parameter.Value != nil {
resp.Data[*output.Parameter.Name] = *output.Parameter.Value
secretName := (*output.Parameter.Name)[len(s.prefix):]
resp.Data[secretName] = *output.Parameter.Value
}
return resp, nil
@ -101,10 +105,22 @@ func (s *ssmSecretStore) BulkGetSecret(req secretstores.BulkGetSecretRequest) (s
search := true
var nextToken *string = nil
var filters []*ssm.ParameterStringFilter
if s.prefix != "" {
filters = []*ssm.ParameterStringFilter{
{
Key: aws.String(ssm.ParametersFilterKeyName),
Option: aws.String("BeginsWith"),
Values: aws.StringSlice([]string{s.prefix}),
},
}
}
for search {
output, err := s.client.DescribeParameters(&ssm.DescribeParametersInput{
MaxResults: nil,
NextToken: nextToken,
MaxResults: nil,
NextToken: nextToken,
ParameterFilters: filters,
})
if err != nil {
return secretstores.BulkGetSecretResponse{Data: nil}, fmt.Errorf("couldn't list secrets: %s", err)
@ -120,7 +136,8 @@ func (s *ssmSecretStore) BulkGetSecret(req secretstores.BulkGetSecretRequest) (s
}
if entry.Name != nil && params.Parameter.Value != nil {
resp.Data[*entry.Name] = map[string]string{*entry.Name: *params.Parameter.Value}
secretName := (*entry.Name)[len(s.prefix):]
resp.Data[secretName] = map[string]string{secretName: *params.Parameter.Value}
}
}

View File

@ -117,6 +117,33 @@ func TestGetSecret(t *testing.T) {
assert.Nil(t, e)
assert.Equal(t, secretValue, output.Data[req.Name])
})
t.Run("with prefix", func(t *testing.T) {
s := ssmSecretStore{
client: &mockedSSM{
GetParameterFn: func(input *ssm.GetParameterInput) (*ssm.GetParameterOutput, error) {
assert.Equal(t, "/prefix/aws/dev/secret", *input.Name)
secret := secretValue
return &ssm.GetParameterOutput{
Parameter: &ssm.Parameter{
Name: input.Name,
Value: &secret,
},
}, nil
},
},
prefix: "/prefix",
}
req := secretstores.GetSecretRequest{
Name: "/aws/dev/secret",
Metadata: map[string]string{},
}
output, e := s.GetSecret(req)
assert.Nil(t, e)
assert.Equal(t, "secret", output.Data[req.Name])
})
})
t.Run("unsuccessfully retrieve secret", func(t *testing.T) {
@ -172,6 +199,42 @@ func TestGetBulkSecrets(t *testing.T) {
assert.Contains(t, output.Data, "/aws/dev/secret2")
})
t.Run("successfully retrieve bulk secrets with prefix", func(t *testing.T) {
s := ssmSecretStore{
client: &mockedSSM{
DescribeParametersFn: func(*ssm.DescribeParametersInput) (*ssm.DescribeParametersOutput, error) {
return &ssm.DescribeParametersOutput{NextToken: nil, Parameters: []*ssm.ParameterMetadata{
{
Name: aws.String("/prefix/aws/dev/secret1"),
},
{
Name: aws.String("/prefix/aws/dev/secret2"),
},
}}, nil
},
GetParameterFn: func(input *ssm.GetParameterInput) (*ssm.GetParameterOutput, error) {
secret := fmt.Sprintf("%s-%s", *input.Name, secretValue)
return &ssm.GetParameterOutput{
Parameter: &ssm.Parameter{
Name: input.Name,
Value: &secret,
},
}, nil
},
},
prefix: "/prefix",
}
req := secretstores.BulkGetSecretRequest{
Metadata: map[string]string{},
}
output, e := s.BulkGetSecret(req)
assert.Nil(t, e)
assert.Equal(t, "map[/aws/dev/secret1:/prefix/aws/dev/secret1-secret]", fmt.Sprint(output.Data["/aws/dev/secret1"]))
assert.Equal(t, "map[/aws/dev/secret2:/prefix/aws/dev/secret2-secret]", fmt.Sprint(output.Data["/aws/dev/secret2"]))
})
t.Run("unsuccessfully retrieve bulk secrets on get parameter", func(t *testing.T) {
s := ssmSecretStore{
client: &mockedSSM{

View File

@ -36,10 +36,15 @@ type Pagination struct {
Token string `json:"token,omitempty"`
}
type Query struct {
// used only for intermediate query value.
type QueryFields struct {
Filters map[string]interface{} `json:"filter"`
Sort []Sorting `json:"sort"`
Page Pagination `json:"page"`
}
type Query struct {
QueryFields `json:",inline"`
// derived from Filters
Filter Filter
@ -96,45 +101,19 @@ func (h *Builder) buildFilter(filter Filter) (string, error) {
}
func (q *Query) UnmarshalJSON(data []byte) error {
var m map[string]interface{}
err := json.Unmarshal(data, &m)
err := json.Unmarshal(data, &q.QueryFields)
if err != nil {
return err
}
if elem, ok := m[FILTER]; ok {
q.Filter, err = parseFilter(elem)
if err != nil {
return err
}
}
// setting sorting
if elem, ok := m[SORT]; ok {
arr, ok := elem.([]interface{})
if !ok {
return fmt.Errorf("%q must be an array", SORT)
}
jdata, err := json.Marshal(arr)
if err != nil {
return err
}
if err = json.Unmarshal(jdata, &q.Sort); err != nil {
return err
}
}
// setting pagination
if elem, ok := m[PAGE]; ok {
page, ok := elem.(map[string]interface{})
if !ok {
return fmt.Errorf("%q must be a map", PAGE)
}
jdata, err := json.Marshal(page)
if err != nil {
return err
}
if err = json.Unmarshal(jdata, &q.Page); err != nil {
return err
}
if len(q.QueryFields.Filters) == 0 {
return nil
}
filter, err := parseFilter(q.QueryFields.Filters)
if err != nil {
return err
}
q.Filter = filter
return nil
}

View File

@ -29,34 +29,57 @@ func TestQuery(t *testing.T) {
{
input: "../../tests/state/query/q1.json",
query: Query{
Filters: nil,
Sort: nil,
Page: Pagination{Limit: 2, Token: ""},
Filter: nil,
QueryFields: QueryFields{
Filters: nil,
Sort: nil,
Page: Pagination{Limit: 2, Token: ""},
},
Filter: nil,
},
},
{
input: "../../tests/state/query/q2.json",
query: Query{
Filters: nil,
Sort: nil,
Page: Pagination{Limit: 2, Token: ""},
Filter: &EQ{Key: "state", Val: "CA"},
QueryFields: QueryFields{
Filters: map[string]any{
"EQ": map[string]any{
"state": "CA",
},
},
Sort: nil,
Page: Pagination{Limit: 2, Token: ""},
},
Filter: &EQ{Key: "state", Val: "CA"},
},
},
{
input: "../../tests/state/query/q3.json",
query: Query{
Filters: nil,
Sort: []Sorting{
{Key: "state", Order: "DESC"},
{Key: "person.name", Order: ""},
QueryFields: QueryFields{
Filters: map[string]any{
"AND": []any{
map[string]any{
"EQ": map[string]any{
"person.org": "A",
},
},
map[string]any{
"IN": map[string]any{
"state": []any{"CA", "WA"},
},
},
},
},
Sort: []Sorting{
{Key: "state", Order: "DESC"},
{Key: "person.name", Order: ""},
},
Page: Pagination{Limit: 0, Token: ""},
},
Page: Pagination{Limit: 0, Token: ""},
Filter: &AND{
Filters: []Filter{
&EQ{Key: "person.org", Val: "A"},
&IN{Key: "state", Vals: []interface{}{"CA", "WA"}},
&IN{Key: "state", Vals: []any{"CA", "WA"}},
},
},
},
@ -64,12 +87,36 @@ func TestQuery(t *testing.T) {
{
input: "../../tests/state/query/q4.json",
query: Query{
Filters: nil,
Sort: []Sorting{
{Key: "state", Order: "DESC"},
{Key: "person.name", Order: ""},
QueryFields: QueryFields{
Filters: map[string]any{
"OR": []any{
map[string]any{
"EQ": map[string]any{
"person.org": "A",
},
},
map[string]any{
"AND": []any{
map[string]any{
"EQ": map[string]any{
"person.org": "B",
},
},
map[string]any{
"IN": map[string]any{
"state": []any{"CA", "WA"},
},
},
},
},
},
},
Sort: []Sorting{
{Key: "state", Order: "DESC"},
{Key: "person.name", Order: ""},
},
Page: Pagination{Limit: 2, Token: ""},
},
Page: Pagination{Limit: 2, Token: ""},
Filter: &OR{
Filters: []Filter{
&EQ{Key: "person.org", Val: "A"},