Corresponding changes for multi pubsub (#434)
* Corresponding changes for multi pubsub * Fix quote
This commit is contained in:
parent
3efcb40430
commit
08d2538796
|
|
@ -32,10 +32,11 @@ type CloudEventsEnvelope struct {
|
||||||
Data interface{} `json:"data"`
|
Data interface{} `json:"data"`
|
||||||
Subject string `json:"subject"`
|
Subject string `json:"subject"`
|
||||||
Topic string `json:"topic"`
|
Topic string `json:"topic"`
|
||||||
|
PubsubName string `json:"pubsubname"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCloudEventsEnvelope returns CloudEventsEnvelope from data or a new one when data content was not
|
// NewCloudEventsEnvelope returns CloudEventsEnvelope from data or a new one when data content was not
|
||||||
func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string, data []byte) *CloudEventsEnvelope {
|
func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string, pubsubName string, data []byte) *CloudEventsEnvelope {
|
||||||
// defaults
|
// defaults
|
||||||
if id == "" {
|
if id == "" {
|
||||||
id = uuid.New().String()
|
id = uuid.New().String()
|
||||||
|
|
@ -63,6 +64,7 @@ func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string,
|
||||||
Type: eventType,
|
Type: eventType,
|
||||||
Subject: subject,
|
Subject: subject,
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
|
PubsubName: pubsubName,
|
||||||
Data: string(data),
|
Data: string(data),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -79,6 +81,7 @@ func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string,
|
||||||
Type: getStrVal(m, "type"),
|
Type: getStrVal(m, "type"),
|
||||||
Subject: getStrVal(m, "subject"),
|
Subject: getStrVal(m, "subject"),
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
|
PubsubName: pubsubName,
|
||||||
Data: m["data"],
|
Data: m["data"],
|
||||||
}
|
}
|
||||||
// check if CE is valid
|
// check if CE is valid
|
||||||
|
|
@ -97,6 +100,7 @@ func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string,
|
||||||
Type: eventType,
|
Type: eventType,
|
||||||
Subject: subject,
|
Subject: subject,
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
|
PubsubName: pubsubName,
|
||||||
Data: j,
|
Data: j,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCreateCloudEventsEnvelope(t *testing.T) {
|
func TestCreateCloudEventsEnvelope(t *testing.T) {
|
||||||
envelope := NewCloudEventsEnvelope("a", "source", "eventType", "", "", nil)
|
envelope := NewCloudEventsEnvelope("a", "source", "eventType", "", "", "", nil)
|
||||||
assert.NotNil(t, envelope)
|
assert.NotNil(t, envelope)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -32,11 +32,12 @@ func TestEnvelopeUsingExistingCloudEvents(t *testing.T) {
|
||||||
"datacontenttype" : "text/xml",
|
"datacontenttype" : "text/xml",
|
||||||
"data" : "<much wow=\"xml\"/>"
|
"data" : "<much wow=\"xml\"/>"
|
||||||
}`
|
}`
|
||||||
envelope := NewCloudEventsEnvelope("a", "", "", "", "routed.topic", []byte(str))
|
envelope := NewCloudEventsEnvelope("a", "", "", "", "routed.topic", "mypubsub", []byte(str))
|
||||||
assert.Equal(t, "A234-1234-1234", envelope.ID)
|
assert.Equal(t, "A234-1234-1234", envelope.ID)
|
||||||
assert.Equal(t, "text/xml", envelope.DataContentType)
|
assert.Equal(t, "text/xml", envelope.DataContentType)
|
||||||
assert.Equal(t, "1.0", envelope.SpecVersion)
|
assert.Equal(t, "1.0", envelope.SpecVersion)
|
||||||
assert.Equal(t, "routed.topic", envelope.Topic)
|
assert.Equal(t, "routed.topic", envelope.Topic)
|
||||||
|
assert.Equal(t, "mypubsub", envelope.PubsubName)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -50,7 +51,7 @@ func TestCreateFromJSON(t *testing.T) {
|
||||||
1,
|
1,
|
||||||
}
|
}
|
||||||
data, _ := json.Marshal(obj1)
|
data, _ := json.Marshal(obj1)
|
||||||
envelope := NewCloudEventsEnvelope("a", "source", "", "", "", data)
|
envelope := NewCloudEventsEnvelope("a", "source", "", "", "", "mypubsub", data)
|
||||||
t.Logf("data: %v", envelope.Data)
|
t.Logf("data: %v", envelope.Data)
|
||||||
assert.Equal(t, "application/json", envelope.DataContentType)
|
assert.Equal(t, "application/json", envelope.DataContentType)
|
||||||
|
|
||||||
|
|
@ -67,31 +68,31 @@ func TestCreateFromJSON(t *testing.T) {
|
||||||
|
|
||||||
func TestCreateCloudEventsEnvelopeDefaults(t *testing.T) {
|
func TestCreateCloudEventsEnvelopeDefaults(t *testing.T) {
|
||||||
t.Run("default event type", func(t *testing.T) {
|
t.Run("default event type", func(t *testing.T) {
|
||||||
envelope := NewCloudEventsEnvelope("a", "source", "", "", "", nil)
|
envelope := NewCloudEventsEnvelope("a", "source", "", "", "", "mypubsub", nil)
|
||||||
assert.Equal(t, DefaultCloudEventType, envelope.Type)
|
assert.Equal(t, DefaultCloudEventType, envelope.Type)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("non-default event type", func(t *testing.T) {
|
t.Run("non-default event type", func(t *testing.T) {
|
||||||
envelope := NewCloudEventsEnvelope("a", "source", "e1", "", "", nil)
|
envelope := NewCloudEventsEnvelope("a", "source", "e1", "", "", "mypubsub", nil)
|
||||||
assert.Equal(t, "e1", envelope.Type)
|
assert.Equal(t, "e1", envelope.Type)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("spec version", func(t *testing.T) {
|
t.Run("spec version", func(t *testing.T) {
|
||||||
envelope := NewCloudEventsEnvelope("a", "source", "", "", "", nil)
|
envelope := NewCloudEventsEnvelope("a", "source", "", "", "", "mypubsub", nil)
|
||||||
assert.Equal(t, CloudEventsSpecVersion, envelope.SpecVersion)
|
assert.Equal(t, CloudEventsSpecVersion, envelope.SpecVersion)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("quoted data", func(t *testing.T) {
|
t.Run("quoted data", func(t *testing.T) {
|
||||||
list := []string{"v1", "v2", "v3"}
|
list := []string{"v1", "v2", "v3"}
|
||||||
data := strings.Join(list, ",")
|
data := strings.Join(list, ",")
|
||||||
envelope := NewCloudEventsEnvelope("a", "source", "", "", "", []byte(data))
|
envelope := NewCloudEventsEnvelope("a", "source", "", "", "", "mypubsub", []byte(data))
|
||||||
t.Logf("data: %v", envelope.Data)
|
t.Logf("data: %v", envelope.Data)
|
||||||
assert.Equal(t, "text/plain", envelope.DataContentType)
|
assert.Equal(t, "text/plain", envelope.DataContentType)
|
||||||
assert.Equal(t, data, envelope.Data.(string))
|
assert.Equal(t, data, envelope.Data.(string))
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("string data content type", func(t *testing.T) {
|
t.Run("string data content type", func(t *testing.T) {
|
||||||
envelope := NewCloudEventsEnvelope("a", "source", "", "", "", []byte("data"))
|
envelope := NewCloudEventsEnvelope("a", "source", "", "", "", "mypubsub", []byte("data"))
|
||||||
assert.Equal(t, "text/plain", envelope.DataContentType)
|
assert.Equal(t, "text/plain", envelope.DataContentType)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,8 +7,9 @@ package pubsub
|
||||||
|
|
||||||
// PublishRequest is the request to publish a message
|
// PublishRequest is the request to publish a message
|
||||||
type PublishRequest struct {
|
type PublishRequest struct {
|
||||||
Data []byte `json:"data"`
|
Data []byte `json:"data"`
|
||||||
Topic string `json:"topic"`
|
PubsubName string `json:"pubsubname"`
|
||||||
|
Topic string `json:"topic"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeRequest is the request to subscribe to a topic
|
// SubscribeRequest is the request to subscribe to a topic
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue