From 08d2538796ead45b281b7c5b7effc901730fc2fc Mon Sep 17 00:00:00 2001 From: Leon Mai Date: Fri, 14 Aug 2020 11:05:51 -0700 Subject: [PATCH] Corresponding changes for multi pubsub (#434) * Corresponding changes for multi pubsub * Fix quote --- pubsub/envelope.go | 6 +++++- pubsub/envelope_test.go | 17 +++++++++-------- pubsub/requests.go | 5 +++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/pubsub/envelope.go b/pubsub/envelope.go index a817429bc..34a004c5b 100644 --- a/pubsub/envelope.go +++ b/pubsub/envelope.go @@ -32,10 +32,11 @@ type CloudEventsEnvelope struct { Data interface{} `json:"data"` Subject string `json:"subject"` Topic string `json:"topic"` + PubsubName string `json:"pubsubname"` } // NewCloudEventsEnvelope returns CloudEventsEnvelope from data or a new one when data content was not -func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string, data []byte) *CloudEventsEnvelope { +func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string, pubsubName string, data []byte) *CloudEventsEnvelope { // defaults if id == "" { id = uuid.New().String() @@ -63,6 +64,7 @@ func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string, Type: eventType, Subject: subject, Topic: topic, + PubsubName: pubsubName, Data: string(data), } } @@ -79,6 +81,7 @@ func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string, Type: getStrVal(m, "type"), Subject: getStrVal(m, "subject"), Topic: topic, + PubsubName: pubsubName, Data: m["data"], } // check if CE is valid @@ -97,6 +100,7 @@ func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string, Type: eventType, Subject: subject, Topic: topic, + PubsubName: pubsubName, Data: j, } } diff --git a/pubsub/envelope_test.go b/pubsub/envelope_test.go index 9962cbcbb..f8c29e139 100644 --- a/pubsub/envelope_test.go +++ b/pubsub/envelope_test.go @@ -14,7 +14,7 @@ import ( ) func TestCreateCloudEventsEnvelope(t *testing.T) { - envelope := NewCloudEventsEnvelope("a", "source", "eventType", "", "", nil) + envelope := NewCloudEventsEnvelope("a", "source", "eventType", "", "", "", nil) assert.NotNil(t, envelope) } @@ -32,11 +32,12 @@ func TestEnvelopeUsingExistingCloudEvents(t *testing.T) { "datacontenttype" : "text/xml", "data" : "" }` - envelope := NewCloudEventsEnvelope("a", "", "", "", "routed.topic", []byte(str)) + envelope := NewCloudEventsEnvelope("a", "", "", "", "routed.topic", "mypubsub", []byte(str)) assert.Equal(t, "A234-1234-1234", envelope.ID) assert.Equal(t, "text/xml", envelope.DataContentType) assert.Equal(t, "1.0", envelope.SpecVersion) assert.Equal(t, "routed.topic", envelope.Topic) + assert.Equal(t, "mypubsub", envelope.PubsubName) }) } @@ -50,7 +51,7 @@ func TestCreateFromJSON(t *testing.T) { 1, } data, _ := json.Marshal(obj1) - envelope := NewCloudEventsEnvelope("a", "source", "", "", "", data) + envelope := NewCloudEventsEnvelope("a", "source", "", "", "", "mypubsub", data) t.Logf("data: %v", envelope.Data) assert.Equal(t, "application/json", envelope.DataContentType) @@ -67,31 +68,31 @@ func TestCreateFromJSON(t *testing.T) { func TestCreateCloudEventsEnvelopeDefaults(t *testing.T) { t.Run("default event type", func(t *testing.T) { - envelope := NewCloudEventsEnvelope("a", "source", "", "", "", nil) + envelope := NewCloudEventsEnvelope("a", "source", "", "", "", "mypubsub", nil) assert.Equal(t, DefaultCloudEventType, envelope.Type) }) t.Run("non-default event type", func(t *testing.T) { - envelope := NewCloudEventsEnvelope("a", "source", "e1", "", "", nil) + envelope := NewCloudEventsEnvelope("a", "source", "e1", "", "", "mypubsub", nil) assert.Equal(t, "e1", envelope.Type) }) t.Run("spec version", func(t *testing.T) { - envelope := NewCloudEventsEnvelope("a", "source", "", "", "", nil) + envelope := NewCloudEventsEnvelope("a", "source", "", "", "", "mypubsub", nil) assert.Equal(t, CloudEventsSpecVersion, envelope.SpecVersion) }) t.Run("quoted data", func(t *testing.T) { list := []string{"v1", "v2", "v3"} data := strings.Join(list, ",") - envelope := NewCloudEventsEnvelope("a", "source", "", "", "", []byte(data)) + envelope := NewCloudEventsEnvelope("a", "source", "", "", "", "mypubsub", []byte(data)) t.Logf("data: %v", envelope.Data) assert.Equal(t, "text/plain", envelope.DataContentType) assert.Equal(t, data, envelope.Data.(string)) }) t.Run("string data content type", func(t *testing.T) { - envelope := NewCloudEventsEnvelope("a", "source", "", "", "", []byte("data")) + envelope := NewCloudEventsEnvelope("a", "source", "", "", "", "mypubsub", []byte("data")) assert.Equal(t, "text/plain", envelope.DataContentType) }) } diff --git a/pubsub/requests.go b/pubsub/requests.go index 90bf70184..deeafbee3 100644 --- a/pubsub/requests.go +++ b/pubsub/requests.go @@ -7,8 +7,9 @@ package pubsub // PublishRequest is the request to publish a message type PublishRequest struct { - Data []byte `json:"data"` - Topic string `json:"topic"` + Data []byte `json:"data"` + PubsubName string `json:"pubsubname"` + Topic string `json:"topic"` } // SubscribeRequest is the request to subscribe to a topic