Add support for bulk publish (#384)

* Initial implementation

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

* Add a response type and more tests

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

* Change logic and add more tests

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

* Add example

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

* Add docs

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

* Fix lint

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

* gofumpt

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

---------

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>
This commit is contained in:
Shubham Sharma 2023-03-31 12:27:57 +05:30 committed by GitHub
parent 0cc2c4ce7a
commit b48e8ade5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 545 additions and 11 deletions

View File

@ -84,6 +84,11 @@ type Client interface {
// Deprecated: This method is deprecated and will be removed in a future version of the SDK. Please use `PublishEvent` instead.
PublishEventfromCustomContent(ctx context.Context, pubsubName, topicName string, data interface{}) error
// PublishEvents publishes multiple events onto topic in specific pubsub component.
// If all events are successfully published, response Error will be nil.
// The FailedEvents field will contain all events that failed to publish.
PublishEvents(ctx context.Context, pubsubName, topicName string, events []interface{}, opts ...PublishEventsOption) PublishEventsResponse
// GetSecret retrieves preconfigured secret from specified store using key.
GetSecret(ctx context.Context, storeName, key string, meta map[string]string) (data map[string]string, err error)

View File

@ -14,6 +14,7 @@ limitations under the License.
package client
import (
"bytes"
"context"
"encoding/json"
"errors"
@ -341,6 +342,26 @@ func (s *testDaprServer) PublishEvent(ctx context.Context, req *pb.PublishEventR
return &empty.Empty{}, nil
}
// BulkPublishEventAlpha1 mocks the BulkPublishEventAlpha1 API.
// It will fail to publish events that start with "fail".
// It will fail the entire request if an event starts with "failall".
func (s *testDaprServer) BulkPublishEventAlpha1(ctx context.Context, req *pb.BulkPublishRequest) (*pb.BulkPublishResponse, error) {
failedEntries := make([]*pb.BulkPublishResponseFailedEntry, 0)
for _, entry := range req.Entries {
if bytes.HasPrefix(entry.Event, []byte("failall")) {
// fail the entire request
return nil, errors.New("failed to publish events")
} else if bytes.HasPrefix(entry.Event, []byte("fail")) {
// fail this entry
failedEntries = append(failedEntries, &pb.BulkPublishResponseFailedEntry{
EntryId: entry.EntryId,
Error: "failed to publish events",
})
}
}
return &pb.BulkPublishResponse{FailedEntries: failedEntries}, nil
}
func (s *testDaprServer) InvokeBinding(ctx context.Context, req *pb.InvokeBindingRequest) (*pb.InvokeBindingResponse, error) {
if req.Data == nil {
return &pb.InvokeBindingResponse{

View File

@ -20,6 +20,8 @@ import (
"fmt"
"log"
"github.com/google/uuid"
pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
)
@ -110,3 +112,153 @@ func (c *GRPCClient) PublishEventfromCustomContent(ctx context.Context, pubsubNa
return c.PublishEvent(ctx, pubsubName, topicName, enc, PublishEventWithContentType("application/json"))
}
// PublishEventsEvent is a type of event that can be published using PublishEvents.
type PublishEventsEvent struct {
EntryID string
Data []byte
ContentType string
Metadata map[string]string
}
// PublishEventsResponse is the response type for PublishEvents.
type PublishEventsResponse struct {
Error error
FailedEvents []interface{}
}
// PublishEventsOption is the type for the functional option.
type PublishEventsOption func(*pb.BulkPublishRequest)
// PublishEvents publishes multiple events onto topic in specific pubsub component.
// If all events are successfully published, response Error will be nil.
// The FailedEvents field will contain all events that failed to publish.
func (c *GRPCClient) PublishEvents(ctx context.Context, pubsubName, topicName string, events []interface{}, opts ...PublishEventsOption) PublishEventsResponse {
if pubsubName == "" {
return PublishEventsResponse{
Error: errors.New("pubsubName name required"),
FailedEvents: events,
}
}
if topicName == "" {
return PublishEventsResponse{
Error: errors.New("topic name required"),
FailedEvents: events,
}
}
failedEvents := make([]interface{}, 0, len(events))
eventMap := make(map[string]interface{}, len(events))
entries := make([]*pb.BulkPublishRequestEntry, 0, len(events))
for _, event := range events {
entry, err := createBulkPublishRequestEntry(event)
if err != nil {
failedEvents = append(failedEvents, event)
continue
}
eventMap[entry.EntryId] = event
entries = append(entries, entry)
}
request := &pb.BulkPublishRequest{
PubsubName: pubsubName,
Topic: topicName,
Entries: entries,
}
for _, o := range opts {
o(request)
}
res, err := c.protoClient.BulkPublishEventAlpha1(c.withAuthToken(ctx), request)
// If there is an error, all events failed to publish.
if err != nil {
return PublishEventsResponse{
Error: fmt.Errorf("error publishing events unto %s topic: %w", topicName, err),
FailedEvents: events,
}
}
for _, failedEntry := range res.FailedEntries {
event, ok := eventMap[failedEntry.EntryId]
if !ok {
// This should never happen.
failedEvents = append(failedEvents, failedEntry.EntryId)
}
failedEvents = append(failedEvents, event)
}
if len(failedEvents) != 0 {
return PublishEventsResponse{
Error: fmt.Errorf("error publishing events unto %s topic: %w", topicName, err),
FailedEvents: failedEvents,
}
}
return PublishEventsResponse{
Error: nil,
FailedEvents: make([]interface{}, 0),
}
}
// createBulkPublishRequestEntry creates a BulkPublishRequestEntry from an interface{}.
func createBulkPublishRequestEntry(data interface{}) (*pb.BulkPublishRequestEntry, error) {
entry := &pb.BulkPublishRequestEntry{}
switch d := data.(type) {
case PublishEventsEvent:
entry.EntryId = d.EntryID
entry.Event = d.Data
entry.ContentType = d.ContentType
entry.Metadata = d.Metadata
case []byte:
entry.Event = d
entry.ContentType = "application/octet-stream"
case string:
entry.Event = []byte(d)
entry.ContentType = "text/plain"
default:
var err error
entry.ContentType = "application/json"
entry.Event, err = json.Marshal(d)
if err != nil {
return &pb.BulkPublishRequestEntry{}, fmt.Errorf("error serializing input struct: %w", err)
}
if isCloudEvent(entry.Event) {
entry.ContentType = "application/cloudevents+json"
}
}
if entry.EntryId == "" {
entry.EntryId = uuid.New().String()
}
return entry, nil
}
// PublishEventsWithContentType can be passed as option to PublishEvents to explicitly set the same Content-Type for all events.
func PublishEventsWithContentType(contentType string) PublishEventsOption {
return func(r *pb.BulkPublishRequest) {
for _, entry := range r.Entries {
entry.ContentType = contentType
}
}
}
// PublishEventsWithMetadata can be passed as option to PublishEvents to set request metadata.
func PublishEventsWithMetadata(metadata map[string]string) PublishEventsOption {
return func(r *pb.BulkPublishRequest) {
r.Metadata = metadata
}
}
// PublishEventsWithRawPayload can be passed as option to PublishEvents to set rawPayload request metadata.
func PublishEventsWithRawPayload() PublishEventsOption {
return func(r *pb.BulkPublishRequest) {
if r.Metadata == nil {
r.Metadata = map[string]string{rawPayload: trueValue}
} else {
r.Metadata[rawPayload] = trueValue
}
}
}

View File

@ -17,6 +17,7 @@ import (
"context"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
@ -90,3 +91,240 @@ func TestPublishEvent(t *testing.T) {
assert.Nil(t, err)
})
}
// go test -timeout 30s ./client -count 1 -run ^TestPublishEvents$
func TestPublishEvents(t *testing.T) {
ctx := context.Background()
t.Run("without pubsub name", func(t *testing.T) {
res := testClient.PublishEvents(ctx, "", "test", []interface{}{"ping", "pong"})
assert.Error(t, res.Error)
assert.Len(t, res.FailedEvents, 2)
assert.Contains(t, res.FailedEvents, "ping")
assert.Contains(t, res.FailedEvents, "pong")
})
t.Run("without topic name", func(t *testing.T) {
res := testClient.PublishEvents(ctx, "messages", "", []interface{}{"ping", "pong"})
assert.Error(t, res.Error)
assert.Len(t, res.FailedEvents, 2)
assert.Contains(t, res.FailedEvents, "ping")
assert.Contains(t, res.FailedEvents, "pong")
})
t.Run("with data", func(t *testing.T) {
res := testClient.PublishEvents(ctx, "messages", "test", []interface{}{"ping", "pong"})
assert.Nil(t, res.Error)
assert.Len(t, res.FailedEvents, 0)
})
t.Run("without data", func(t *testing.T) {
res := testClient.PublishEvents(ctx, "messages", "test", nil)
assert.Nil(t, res.Error)
assert.Len(t, res.FailedEvents, 0)
})
t.Run("with struct data", func(t *testing.T) {
testcases := []struct {
name string
data interface{}
}{
{
name: "with text",
data: _testStructwithText{
Key1: "value1",
Key2: "value2",
},
},
{
name: "with text and numbers",
data: _testStructwithTextandNumbers{
Key1: "value1",
Key2: 2500,
},
},
{
name: "with slices",
data: _testStructwithSlices{
Key1: []string{"value1", "value2", "value3"},
Key2: []int{25, 40, 600},
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
res := testClient.PublishEvents(ctx, "messages", "test", []interface{}{tc.data})
assert.Nil(t, res.Error)
assert.Len(t, res.FailedEvents, 0)
})
}
})
t.Run("error serializing one event", func(t *testing.T) {
res := testClient.PublishEvents(ctx, "messages", "test", []interface{}{make(chan struct{}), "pong"})
assert.Error(t, res.Error)
assert.Len(t, res.FailedEvents, 1)
assert.IsType(t, make(chan struct{}), res.FailedEvents[0])
})
t.Run("with raw payload", func(t *testing.T) {
res := testClient.PublishEvents(ctx, "messages", "test", []interface{}{"ping", "pong"}, PublishEventsWithRawPayload())
assert.Nil(t, res.Error)
assert.Len(t, res.FailedEvents, 0)
})
t.Run("with metadata", func(t *testing.T) {
res := testClient.PublishEvents(ctx, "messages", "test", []interface{}{"ping", "pong"}, PublishEventsWithMetadata(map[string]string{"key": "value"}))
assert.Nil(t, res.Error)
assert.Len(t, res.FailedEvents, 0)
})
t.Run("with custom content type", func(t *testing.T) {
res := testClient.PublishEvents(ctx, "messages", "test", []interface{}{"ping", "pong"}, PublishEventsWithContentType("text/plain"))
assert.Nil(t, res.Error)
assert.Len(t, res.FailedEvents, 0)
})
t.Run("with events that will fail some events", func(t *testing.T) {
res := testClient.PublishEvents(ctx, "messages", "test", []interface{}{"ping", "pong", "fail-ping"})
assert.Error(t, res.Error)
assert.Len(t, res.FailedEvents, 1)
assert.Contains(t, res.FailedEvents, "fail-ping")
})
t.Run("with events that will fail the entire request", func(t *testing.T) {
res := testClient.PublishEvents(ctx, "messages", "test", []interface{}{"ping", "pong", "failall-ping"})
assert.Error(t, res.Error)
assert.Len(t, res.FailedEvents, 3)
assert.Contains(t, res.FailedEvents, "ping")
assert.Contains(t, res.FailedEvents, "pong")
assert.Contains(t, res.FailedEvents, "failall-ping")
})
}
func TestCreateBulkPublishRequestEntry(t *testing.T) {
type _testJSONStruct struct {
Key1 string `json:"key1"`
Key2 string `json:"key2"`
}
type _testCloudEventStruct struct {
ID string `json:"id"`
Source string `json:"source"`
SpecVersion string `json:"specversion"`
Type string `json:"type"`
Data string `json:"data"`
}
t.Run("should serialize and set content type", func(t *testing.T) {
testcases := []struct {
name string
data interface{}
expectedEvent []byte
expectedContentType string
expectedError bool
}{
{
name: "plain text",
data: "ping",
expectedEvent: []byte(`ping`),
expectedContentType: "text/plain",
expectedError: false,
},
{
name: "raw bytes",
data: []byte("ping"),
expectedEvent: []byte(`ping`),
expectedContentType: "application/octet-stream",
expectedError: false,
},
{
name: "valid json",
data: _testJSONStruct{
Key1: "value1",
Key2: "value2",
},
expectedEvent: []byte(`{"key1":"value1","key2":"value2"}`),
expectedContentType: "application/json",
expectedError: false,
},
{
name: "valid cloudevent",
data: _testCloudEventStruct{
ID: "123",
Source: "test",
SpecVersion: "1.0",
Type: "test",
Data: "foo",
},
expectedEvent: []byte(`{"id":"123","source":"test","specversion":"1.0","type":"test","data":"foo"}`),
expectedContentType: "application/cloudevents+json",
expectedError: false,
},
{
name: "invalid json",
data: make(chan struct{}),
expectedEvent: nil,
expectedContentType: "",
expectedError: true,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
entry, err := createBulkPublishRequestEntry(tc.data)
if tc.expectedError {
assert.Error(t, err)
} else {
assert.Nil(t, err)
assert.Equal(t, tc.expectedEvent, entry.Event)
assert.Equal(t, tc.expectedContentType, entry.ContentType)
}
})
}
})
t.Run("should set same entryID and metadata when provided", func(t *testing.T) {
entry, err := createBulkPublishRequestEntry(PublishEventsEvent{
ContentType: "text/plain",
Data: []byte("ping"),
EntryID: "123",
Metadata: map[string]string{"key": "value"},
})
assert.Nil(t, err)
assert.Equal(t, "123", entry.EntryId)
assert.Equal(t, map[string]string{"key": "value"}, entry.Metadata)
})
t.Run("should set random uuid as entryID when not provided", func(t *testing.T) {
testcases := []struct {
name string
data interface{}
}{
{
name: "plain text",
data: "ping",
},
{
name: "PublishEventsEvent",
data: PublishEventsEvent{
ContentType: "text/plain",
Data: []byte("ping"),
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
entry, err := createBulkPublishRequestEntry(tc.data)
assert.Nil(t, err)
assert.NotEmpty(t, entry.EntryId)
assert.Nil(t, entry.Metadata)
_, err = uuid.Parse(entry.EntryId)
assert.Nil(t, err)
})
}
})
}

32
client/utils.go Normal file
View File

@ -0,0 +1,32 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import "encoding/json"
// isCloudEvent returns true if the event is a CloudEvent.
// An event is a CloudEvent if it `id`, `source`, `specversion` and `type` fields.
// See https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md for more details.
func isCloudEvent(event []byte) bool {
var ce struct {
ID string `json:"id"`
Source string `json:"source"`
SpecVersion string `json:"specversion"`
Type string `json:"type"`
}
if err := json.Unmarshal(event, &ce); err != nil {
return false
}
return ce.ID != "" && ce.Source != "" && ce.SpecVersion != "" && ce.Type != ""
}

64
client/utils_test.go Normal file
View File

@ -0,0 +1,64 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import "testing"
func TestIsCloudEvent(t *testing.T) {
testcases := []struct {
name string
event []byte
expected bool
}{
{
name: "empty event",
event: []byte{},
expected: false,
},
{
name: "event in invalid format",
event: []byte(`foo`),
expected: false,
},
{
name: "event in JSON format without cloudevent fields",
event: []byte(`{"foo":"bar"}`),
expected: false,
},
{
name: "event with id, source, specversion and type",
event: []byte(`{"id":"123","source":"source","specversion":"1.0","type":"type"}`),
expected: true,
},
{
name: "event with missing id",
event: []byte(`{"source":"source","specversion":"1.0","type":"type"}`),
expected: false,
},
{
name: "event with extra fields",
event: []byte(`{"id":"123","source":"source","specversion":"1.0","type":"type","foo":"bar"}`),
expected: true,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
actual := isCloudEvent(tc.event)
if actual != tc.expected {
t.Errorf("expected %v, got %v", tc.expected, actual)
}
})
}
}

View File

@ -152,6 +152,16 @@ if err := client.PublishEvent(ctx, "component-name", "topic-name", data); err !=
}
```
To publish multiple messages at once, the `PublishEvents` method can be used:
```go
events := []string{"event1", "event2", "event3"}
res := client.PublishEvents(ctx, "component-name", "topic-name", events)
if res.Error != nil {
panic(res.Error)
}
```
- For a full list of state operations visit [How-To: Publish & subscribe]({{< ref howto-publish-subscribe.md >}}).
### Output Bindings

View File

@ -75,5 +75,7 @@ dapr stop --app-id sub
## Result
```shell
== APP == 2020/08/23 13:21:58 event - PubsubName: messages, Topic: neworder, ID: 11acaa82-23c4-4244-8969-7360dae52e5d, Data: ping
== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: 82427280-1c18-4fab-b901-c7e68d295d31, Data: ping
== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: cc13829c-af77-4303-a4d7-55cdc0b0fa7d, Data: multi-pong
== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: 0147f10a-d6c3-4b16-ad5a-6776956757dd, Data: multi-ping
```

View File

@ -8,8 +8,9 @@ replace github.com/dapr/go-sdk => ../../
require github.com/dapr/go-sdk v0.0.0-00010101000000-000000000000
require (
github.com/go-chi/chi/v5 v5.0.8 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect

View File

@ -413,6 +413,8 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-chi/chi/v5 v5.0.8 h1:lD+NLqFcAi1ovnVZpsnObHGW4xb4J8lNmoYVfECH1Y0=
github.com/go-chi/chi/v5 v5.0.8/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
@ -503,7 +505,6 @@ github.com/googleapis/gax-go/v2 v2.5.1/go.mod h1:h6B0KMMFNtI2ddbGJn3T3ZbwkeT6yqE
github.com/googleapis/gax-go/v2 v2.6.0/go.mod h1:1mjbznJAPHFpesgE5ucqfYEscaz5kMdcIDwU/6+DDoY=
github.com/googleapis/gax-go/v2 v2.7.0/go.mod h1:TEop28CZZQ2y+c0VxMUmu1lV+fQx57QpBWsYpwqHJx8=
github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=

View File

@ -29,7 +29,8 @@ var (
func main() {
ctx := context.Background()
data := []byte("ping")
publishEventData := []byte("ping")
publishEventsData := []interface{}{"multi-ping", "multi-pong"}
client, err := dapr.NewClient()
if err != nil {
@ -37,9 +38,16 @@ func main() {
}
defer client.Close()
if err := client.PublishEvent(ctx, pubsubName, topicName, data); err != nil {
// Publish a single event
if err := client.PublishEvent(ctx, pubsubName, topicName, publishEventData); err != nil {
panic(err)
}
// Publish multiple events
if res := client.PublishEvents(ctx, pubsubName, topicName, publishEventsData); res.Error != nil {
panic(err)
}
fmt.Println("data published")
fmt.Println("Done (CTRL+C to Exit)")

View File

@ -23,12 +23,12 @@ import (
)
// Subscription to tell the dapr what topic to subscribe.
// - PubsubName: is the name of the component configured in the metadata of pubsub.yaml.
// - Topic: is the name of the topic to subscribe.
// - Route: tell dapr where to request the API to publish the message to the subscriber when get a message from topic.
// - Match: (Optional) The CEL expression to match on the CloudEvent to select this route.
// - Priority: (Optional) The priority order of the route when Match is specificed.
// If not specified, the matches are evaluated in the order in which they are added.
// - PubsubName: is the name of the component configured in the metadata of pubsub.yaml.
// - Topic: is the name of the topic to subscribe.
// - Route: tell dapr where to request the API to publish the message to the subscriber when get a message from topic.
// - Match: (Optional) The CEL expression to match on the CloudEvent to select this route.
// - Priority: (Optional) The priority order of the route when Match is specificed.
// If not specified, the matches are evaluated in the order in which they are added.
var defaultSubscription = &common.Subscription{
PubsubName: "messages",
Topic: "neworder",