From ad5397ea25489943acf5156c8af764e6f79ea788 Mon Sep 17 00:00:00 2001 From: Yash Nisar Date: Mon, 26 Sep 2022 16:44:57 -0500 Subject: [PATCH] Add time to CloudEvent automatically if not present (#2123) Closes https://github.com/dapr/dapr/issues/5137 Signed-off-by: Yash Nisar Signed-off-by: Yash Nisar --- pubsub/envelope.go | 10 ++++++++++ pubsub/envelope_test.go | 8 ++++++++ 2 files changed, 18 insertions(+) diff --git a/pubsub/envelope.go b/pubsub/envelope.go index 3b4802cd0..f844c57fd 100644 --- a/pubsub/envelope.go +++ b/pubsub/envelope.go @@ -51,6 +51,7 @@ const ( SourceField = "source" IDField = "id" SubjectField = "subject" + TimeField = "time" ) // unmarshalPrecise is a wrapper around encoding/json's Decoder @@ -110,6 +111,7 @@ func NewCloudEventsEnvelope(id, source, eventType, subject string, topic string, TraceIDField: traceParent, TraceParentField: traceParent, TraceStateField: traceState, + TimeField: time.Now().Format(time.RFC3339), } ce[ceDataField] = ceData @@ -129,6 +131,14 @@ func FromCloudEvent(cloudEvent []byte, topic, pubsub, traceParent string, traceS return m, err } + customTimeVal, keyExists := m[TimeField] + + if keyExists { + m[TimeField] = customTimeVal + } else { + m[TimeField] = time.Now().Format(time.RFC3339) + } + m[TraceIDField] = traceParent m[TraceParentField] = traceParent m[TraceStateField] = traceState diff --git a/pubsub/envelope_test.go b/pubsub/envelope_test.go index 29e4bcc18..3701b3f6d 100644 --- a/pubsub/envelope_test.go +++ b/pubsub/envelope_test.go @@ -41,6 +41,7 @@ func TestEnvelopeXML(t *testing.T) { assert.Equal(t, "1.0", envelope[SpecVersionField]) assert.Equal(t, "routed.topic", envelope[TopicField]) assert.Equal(t, "mypubsub", envelope[PubsubField]) + assert.NotNil(t, envelope[TimeField]) }) t.Run("xml without content-type", func(t *testing.T) { @@ -52,6 +53,7 @@ func TestEnvelopeXML(t *testing.T) { assert.Equal(t, "1.0", envelope[SpecVersionField]) assert.Equal(t, "routed.topic", envelope[TopicField]) assert.Equal(t, "mypubsub", envelope[PubsubField]) + assert.NotNil(t, envelope[TimeField]) }) } @@ -246,6 +248,7 @@ func TestNewFromExisting(t *testing.T) { m := map[string]interface{}{ "specversion": "1.0", "customfield": "a", + "time": "2021-08-02T09:00:00Z", } b, _ := json.Marshal(&m) @@ -257,6 +260,7 @@ func TestNewFromExisting(t *testing.T) { assert.Equal(t, "pubsub", n[PubsubField]) assert.Equal(t, "1", n[TraceParentField]) assert.Equal(t, "key=value", n[TraceStateField]) + assert.Equal(t, "2021-08-02T09:00:00Z", n[TimeField]) assert.Nil(t, n[DataField]) assert.Nil(t, n[DataBase64Field]) }) @@ -271,6 +275,7 @@ func TestNewFromExisting(t *testing.T) { "specversion": "1.0", "customfield": "a", "data": "hello world", + "time": "2021-08-02T09:00:00Z", } b, _ := json.Marshal(&m) @@ -282,6 +287,7 @@ func TestNewFromExisting(t *testing.T) { assert.Equal(t, "pubsub", n[PubsubField]) assert.Equal(t, "1", n[TraceParentField]) assert.Equal(t, "key=value", n[TraceStateField]) + assert.Equal(t, "2021-08-02T09:00:00Z", n[TimeField]) assert.Nil(t, n[DataBase64Field]) assert.Equal(t, "hello world", n[DataField]) }) @@ -302,6 +308,7 @@ func TestNewFromExisting(t *testing.T) { assert.Equal(t, "pubsub", n[PubsubField]) assert.Equal(t, "1", n[TraceParentField]) assert.Equal(t, "key=value", n[TraceStateField]) + assert.NotNil(t, n[TimeField]) assert.Nil(t, n[DataField]) assert.Equal(t, base64.StdEncoding.EncodeToString([]byte{0x1}), n[DataBase64Field]) }) @@ -312,6 +319,7 @@ func TestCreateFromBinaryPayload(t *testing.T) { envelope := NewCloudEventsEnvelope("", "", "", "", "", "", "application/octet-stream", []byte{0x1}, "trace", "") assert.Equal(t, base64Encoding, envelope[DataBase64Field]) + assert.NotNil(t, envelope[TimeField]) assert.Nil(t, envelope[DataField]) }