diff --git a/service/common/type.go b/service/common/type.go index 13a8a03..88fff2c 100644 --- a/service/common/type.go +++ b/service/common/type.go @@ -1,5 +1,9 @@ package common +import ( + "encoding/json" +) + // TopicEvent is the content of the inbound topic message. type TopicEvent struct { // ID identifies the event. @@ -15,8 +19,12 @@ type TopicEvent struct { // The content of the event. // Note, this is why the gRPC and HTTP implementations need separate structs for cloud events. Data interface{} `json:"data"` + // The content of the event represented as raw bytes. + // This can be deserialized into a Go struct using `Struct`. + RawData []byte `json:"-"` // The base64 encoding content of the event. - // Note, this is processing rawPayload and binary content types . + // Note, this is processing rawPayload and binary content types. + // This field is deprecated and will be removed in the future. DataBase64 string `json:"data_base64,omitempty"` // Cloud event subject Subject string `json:"subject"` @@ -26,6 +34,12 @@ type TopicEvent struct { PubsubName string `json:"pubsubname"` } +func (e *TopicEvent) Struct(target interface{}) error { + // TODO: Enhance to inspect DataContentType for the best + // deserialization method. + return json.Unmarshal(e.RawData, target) +} + // InvocationEvent represents the input and output of binding invocation. type InvocationEvent struct { // Data is the payload that the input bindings sent. diff --git a/service/grpc/topic.go b/service/grpc/topic.go index caf16e4..671293e 100644 --- a/service/grpc/topic.go +++ b/service/grpc/topic.go @@ -2,7 +2,10 @@ package grpc import ( "context" + "encoding/json" "fmt" + "mime" + "strings" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" @@ -62,13 +65,38 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*p } key := fmt.Sprintf("%s-%s", in.PubsubName, in.Topic) if h, ok := s.topicSubscriptions[key]; ok { + data := interface{}(in.Data) + if len(in.Data) > 0 { + mediaType, _, err := mime.ParseMediaType(in.DataContentType) + if err == nil { + var v interface{} + switch mediaType { + case "application/json": + if err := json.Unmarshal(in.Data, &v); err == nil { + data = v + } + case "text/plain": + // Assume UTF-8 encoded string. + data = string(in.Data) + default: + if strings.HasPrefix(mediaType, "application/") && + strings.HasSuffix(mediaType, "+json") { + if err := json.Unmarshal(in.Data, &v); err == nil { + data = v + } + } + } + } + } + e := &common.TopicEvent{ ID: in.Id, Source: in.Source, Type: in.Type, SpecVersion: in.SpecVersion, DataContentType: in.DataContentType, - Data: in.Data, + Data: data, + RawData: in.Data, Topic: in.Topic, PubsubName: in.PubsubName, } diff --git a/service/grpc/topic_test.go b/service/grpc/topic_test.go index c162666..6aa6588 100644 --- a/service/grpc/topic_test.go +++ b/service/grpc/topic_test.go @@ -161,3 +161,79 @@ func eventHandlerWithRetryError(ctx context.Context, event *common.TopicEvent) ( func eventHandlerWithError(ctx context.Context, event *common.TopicEvent) (retry bool, err error) { return false, errors.New("nil event") } + +func TestEventDataHandling(t *testing.T) { + ctx := context.Background() + + tests := map[string]struct { + contentType string + data string + value interface{} + }{ + "JSON bytes": { + contentType: "application/json", + data: `{"message":"hello"}`, + value: map[string]interface{}{ + "message": "hello", + }, + }, + "JSON entension media type bytes": { + contentType: "application/extension+json", + data: `{"message":"hello"}`, + value: map[string]interface{}{ + "message": "hello", + }, + }, + "Test": { + contentType: "text/plain", + data: `message = hello`, + value: `message = hello`, + }, + "Other": { + contentType: "application/octet-stream", + data: `message = hello`, + value: []byte(`message = hello`), + }, + } + + s := getTestServer() + + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/test", + Metadata: map[string]string{}, + } + + recv := make(chan struct{}, 1) + var topicEvent *common.TopicEvent + handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + topicEvent = e + recv <- struct{}{} + + return false, nil + } + err := s.AddTopicEventHandler(sub, handler) + assert.NoErrorf(t, err, "error adding event handler") + + startTestServer(s) + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + in := runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: tt.contentType, + Data: []byte(tt.data), + Topic: sub.Topic, + PubsubName: sub.PubsubName, + } + + s.OnTopicEvent(ctx, &in) + <-recv + assert.Equal(t, tt.value, topicEvent.Data) + }) + } +} diff --git a/service/http/topic.go b/service/http/topic.go index acfa0a1..f4a980b 100644 --- a/service/http/topic.go +++ b/service/http/topic.go @@ -2,6 +2,7 @@ package http import ( "context" + "encoding/base64" "encoding/json" "fmt" "io/ioutil" @@ -29,6 +30,34 @@ const ( PubSubHandlerDropStatusCode int = http.StatusSeeOther ) +// topicEventJSON is identical to `common.TopicEvent` +// except for it treats `data` as a json.RawMessage so it can +// be used as bytes or interface{}. +type topicEventJSON struct { + // ID identifies the event. + ID string `json:"id"` + // The version of the CloudEvents specification. + SpecVersion string `json:"specversion"` + // The type of event related to the originating occurrence. + Type string `json:"type"` + // Source identifies the context in which an event happened. + Source string `json:"source"` + // The content type of data value. + DataContentType string `json:"datacontenttype"` + // The content of the event. + // Note, this is why the gRPC and HTTP implementations need separate structs for cloud events. + Data json.RawMessage `json:"data"` + // The base64 encoding content of the event. + // Note, this is processing rawPayload and binary content types. + DataBase64 string `json:"data_base64,omitempty"` + // Cloud event subject + Subject string `json:"subject"` + // The pubsub topic which publisher sent to. + Topic string `json:"topic"` + // PubsubName is name of the pub/sub this message came from + PubsubName string `json:"pubsubname"` +} + func (s *Server) registerBaseHandler() { // register subscribe handler f := func(w http.ResponseWriter, r *http.Request) { @@ -168,21 +197,78 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn func(ctx cont } // deserialize the event - var in common.TopicEvent + var in topicEventJSON if err := json.NewDecoder(r.Body).Decode(&in); err != nil { http.Error(w, err.Error(), PubSubHandlerDropStatusCode) return } + if in.PubsubName == "" { + in.Topic = sub.PubsubName + } if in.Topic == "" { in.Topic = sub.Topic } + var data interface{} + var rawData []byte + if len(in.Data) > 0 { + rawData = []byte(in.Data) + data = rawData + var v interface{} + // We can assume that rawData is valid JSON + // without checking in.DataContentType == "application/json". + if err := json.Unmarshal(rawData, &v); err == nil { + data = v + // Handling of JSON base64 encoded or escaped in a string. + if str, ok := v.(string); ok { + // This is the path that will most likely succeed. + var vString interface{} + if err := json.Unmarshal([]byte(str), &vString); err == nil { + data = vString + } else if decoded, err := base64.StdEncoding.DecodeString(str); err == nil { + // Decoded Base64 encoded JSON does not seem to be in the spec + // but it is in existing unit tests so this handles that case. + var vBase64 interface{} + if err := json.Unmarshal(decoded, &vBase64); err == nil { + data = vBase64 + } + } + } + } + } else if in.DataBase64 != "" { + var err error + rawData, err = base64.StdEncoding.DecodeString(in.DataBase64) + if err == nil { + data = rawData + if in.DataContentType == "application/json" { + var v interface{} + if err := json.Unmarshal(rawData, &v); err == nil { + data = v + } + } + } + } + + te := common.TopicEvent{ + ID: in.ID, + SpecVersion: in.SpecVersion, + Type: in.Type, + Source: in.Source, + DataContentType: in.DataContentType, + Data: data, + RawData: rawData, + DataBase64: in.DataBase64, + Subject: in.Subject, + PubsubName: in.PubsubName, + Topic: in.Topic, + } + w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) // execute user handler - retry, err := fn(r.Context(), &in) + retry, err := fn(r.Context(), &te) if err == nil { writeStatus(w, common.SubscriptionResponseStatusSuccess) return diff --git a/service/http/topic_test.go b/service/http/topic_test.go index 931c5d5..3190453 100644 --- a/service/http/topic_test.go +++ b/service/http/topic_test.go @@ -85,6 +85,129 @@ func TestEventHandler(t *testing.T) { makeEventRequest(t, s, "/errors", data, http.StatusOK) } +func TestEventDataHandling(t *testing.T) { + tests := map[string]struct { + data string + result interface{} + }{ + "JSON nested": { + data: `{ + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data" : { + "message":"hello" + } + }`, + result: map[string]interface{}{ + "message": "hello", + }, + }, + "JSON base64 encoded in data": { + data: `{ + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ==" + }`, + result: map[string]interface{}{ + "message": "hello", + }, + }, + "JSON base64 encoded in data_base64": { + data: `{ + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data_base64" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ==" + }`, + result: map[string]interface{}{ + "message": "hello", + }, + }, + "Binary base64 encoded in data_base64": { + data: `{ + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/octet-stream", + "data_base64" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ==" + }`, + result: []byte(`{"message":"hello"}`), + }, + "JSON string escaped": { + data: `{ + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data" : "{\"message\":\"hello\"}" + }`, + result: map[string]interface{}{ + "message": "hello", + }, + }, + } + + s := newServer("", nil) + + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/test", + Metadata: map[string]string{}, + } + + recv := make(chan struct{}, 1) + var topicEvent *common.TopicEvent + handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + topicEvent = e + recv <- struct{}{} + + return false, nil + } + err := s.AddTopicEventHandler(sub, handler) + assert.NoErrorf(t, err, "error adding event handler") + + s.registerBaseHandler() + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + makeEventRequest(t, s, "/test", tt.data, http.StatusOK) + <-recv + assert.Equal(t, tt.result, topicEvent.Data) + }) + } +} + func TestHealthCheck(t *testing.T) { s := newServer("", nil) s.registerBaseHandler()