Pub/Sub routing support (#227)

Signed-off-by: Phil Kedy <phil.kedy@gmail.com>
This commit is contained in:
Phil Kedy 2022-01-06 13:19:24 -05:00 committed by GitHub
parent 078d0cdc40
commit 40d8a4eabe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 558 additions and 99 deletions

View File

@ -67,6 +67,22 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er
}
```
Optionally, you can use [routing rules](https://docs.dapr.io/developing-applications/building-blocks/pubsub/howto-route-messages/) to send messages to different handlers based on the contents of the CloudEvent.
```go
sub := &common.Subscription{
PubsubName: "messages",
Topic: "topic1",
Route: "/important",
Match: `event.type == "important"`,
Priority: 1,
}
err := s.AddTopicEventHandler(sub, importantHandler)
if err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}
```
### Service Invocation Handler
To handle service invocations you will need to add at least one service invocation handler before starting the service:

View File

@ -43,8 +43,8 @@ To handle events from specific topic you need to add at least one topic event ha
```go
sub := &common.Subscription{
PubsubName: "messages",
Topic: "topic1",
Route: "/events",
Topic: "topic1",
Route: "/events",
}
err := s.AddTopicEventHandler(sub, eventHandler)
if err != nil {
@ -62,6 +62,22 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er
}
```
Optionally, you can use [routing rules](https://docs.dapr.io/developing-applications/building-blocks/pubsub/howto-route-messages/) to send messages to different handlers based on the contents of the CloudEvent.
```go
sub := &common.Subscription{
PubsubName: "messages",
Topic: "topic1",
Route: "/important",
Match: `event.type == "important"`,
Priority: 1,
}
err := s.AddTopicEventHandler(sub, importantHandler)
if err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}
```
### Service Invocation Handler
To handle service invocations you will need to add at least one service invocation handler before starting the service:

View File

@ -17,3 +17,6 @@ require (
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
// Needed to validate SDK changes in CI/CD
replace github.com/dapr/go-sdk => ../../

View File

@ -232,8 +232,6 @@ github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E
github.com/dapr/components-contrib v1.5.1-rc.1/go.mod h1:k40RvOMnDmJMSSbWZ10ajjWJ9pEuq4Z5eKxCa/yrAe8=
github.com/dapr/dapr v1.5.1 h1:AMSf8Z0bs2MsNDJYSJv03kinV/TBEm4M2DejfVTAfPw=
github.com/dapr/dapr v1.5.1/go.mod h1:2YhuJCkJ/j3WKSii7M/Ma7QlX40T6I1nsgZu2/UKEAM=
github.com/dapr/go-sdk v1.3.1-0.20211214200612-a38be4e38b7d h1:mAc8+pXI+soaVt/qJJf33wnsa/+FzkOOsb6UT8pHGCc=
github.com/dapr/go-sdk v1.3.1-0.20211214200612-a38be4e38b7d/go.mod h1:TUTITZTcalzH6uICpQYTMvwC9Hm/2XrGFjZWopYrGlo=
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233/go.mod h1:y8r0VqUNKyd6xBXp7gQjwA59wlCLGfKzL5J8iJsN09w=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@ -26,16 +26,31 @@ import (
// - PubsubName: is the name of the component configured in the metadata of pubsub.yaml.
// - Topic: is the name of the topic to subscribe.
// - Route: tell dapr where to request the API to publish the message to the subscriber when get a message from topic.
var sub = &common.Subscription{
// - Match: (Optional) The CEL expression to match on the CloudEvent to select this route.
// - Priority: (Optional) The priority order of the route when Match is specificed.
// If not specified, the matches are evaluated in the order in which they are added.
var defaultSubscription = &common.Subscription{
PubsubName: "messages",
Topic: "neworder",
Route: "/orders",
}
var importantSubscription = &common.Subscription{
PubsubName: "messages",
Topic: "neworder",
Route: "/important",
Match: `event.type == "important"`,
Priority: 1,
}
func main() {
s := daprd.NewService(":8080")
if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
if err := s.AddTopicEventHandler(defaultSubscription, eventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}
if err := s.AddTopicEventHandler(importantSubscription, importantEventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}
@ -48,3 +63,8 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}
func importantEventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("important event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}

View File

@ -29,12 +29,12 @@ const (
// Service represents Dapr callback service.
type Service interface {
// AddServiceInvocationHandler appends provided service invocation handler with its name to the service.
AddServiceInvocationHandler(name string, fn func(ctx context.Context, in *InvocationEvent) (out *Content, err error)) error
AddServiceInvocationHandler(name string, fn ServiceInvocationHandler) error
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service.
// Note, retries are only considered when there is an error. Lack of error is considered as a success
AddTopicEventHandler(sub *Subscription, fn func(ctx context.Context, e *TopicEvent) (retry bool, err error)) error
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
AddBindingInvocationHandler(name string, fn func(ctx context.Context, in *BindingEvent) (out []byte, err error)) error
AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error
// RegisterActorImplFactory Register a new actor to actor runtime of go sdk
RegisterActorImplFactory(f actor.Factory, opts ...config.Option)
// Start starts service.
@ -42,3 +42,9 @@ type Service interface {
// Stop stops the previously started service.
Stop() error
}
type (
ServiceInvocationHandler func(ctx context.Context, in *InvocationEvent) (out *Content, err error)
TopicEventHandler func(ctx context.Context, e *TopicEvent) (retry bool, err error)
BindingInvocationHandler func(ctx context.Context, in *BindingEvent) (out []byte, err error)
)

View File

@ -91,10 +91,14 @@ type Subscription struct {
PubsubName string `json:"pubsubname"`
// Topic is the name of the topic
Topic string `json:"topic"`
// Route is the route of the handler where HTTP topic events should be published (not used in gRPC)
Route string `json:"route"`
// Metadata is the subscription metadata
Metadata map[string]string `json:"metadata,omitempty"`
// Route is the route of the handler where HTTP topic events should be published (passed as Path in gRPC)
Route string `json:"route"`
// Match is the CEL expression to match on the CloudEvent envelope.
Match string `json:"match"`
// Priority is the priority in which to evaluate the match (lower to higher).
Priority int `json:"priority"`
}
const (

View File

@ -25,7 +25,7 @@ import (
)
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
func (s *Server) AddBindingInvocationHandler(name string, fn func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)) error {
func (s *Server) AddBindingInvocationHandler(name string, fn common.BindingInvocationHandler) error {
if name == "" {
return fmt.Errorf("binding name required")
}

View File

@ -26,7 +26,7 @@ import (
)
// AddServiceInvocationHandler appends provided service invocation handler with its method to the service.
func (s *Server) AddServiceInvocationHandler(method string, fn func(ctx context.Context, in *cc.InvocationEvent) (our *cc.Content, err error)) error {
func (s *Server) AddServiceInvocationHandler(method string, fn cc.ServiceInvocationHandler) error {
if method == "" {
return fmt.Errorf("servie name required")
}

View File

@ -14,18 +14,18 @@ limitations under the License.
package grpc
import (
"context"
"net"
"os"
"github.com/pkg/errors"
"google.golang.org/grpc"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/actor"
"github.com/dapr/go-sdk/actor/config"
"github.com/dapr/go-sdk/service/common"
"github.com/pkg/errors"
"google.golang.org/grpc"
"github.com/dapr/go-sdk/service/internal"
)
// NewService creates new Service.
@ -49,35 +49,28 @@ func NewServiceWithListener(lis net.Listener) common.Service {
func newService(lis net.Listener) *Server {
return &Server{
listener: lis,
invokeHandlers: make(map[string]func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error)),
topicSubscriptions: make(map[string]*topicEventHandler),
bindingHandlers: make(map[string]func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)),
authToken: os.Getenv(common.AppAPITokenEnvVar),
listener: lis,
invokeHandlers: make(map[string]common.ServiceInvocationHandler),
topicRegistrar: make(internal.TopicRegistrar),
bindingHandlers: make(map[string]common.BindingInvocationHandler),
authToken: os.Getenv(common.AppAPITokenEnvVar),
}
}
// Server is the gRPC service implementation for Dapr.
type Server struct {
pb.UnimplementedAppCallbackServer
listener net.Listener
invokeHandlers map[string]func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error)
topicSubscriptions map[string]*topicEventHandler
bindingHandlers map[string]func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)
authToken string
listener net.Listener
invokeHandlers map[string]common.ServiceInvocationHandler
topicRegistrar internal.TopicRegistrar
bindingHandlers map[string]common.BindingInvocationHandler
authToken string
}
func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option) {
panic("Actor is not supported by gRPC API")
}
type topicEventHandler struct {
component string
topic string
fn func(ctx context.Context, e *common.TopicEvent) (retry bool, err error)
meta map[string]string
}
// Start registers the server and starts it.
func (s *Server) Start() error {
gs := grpc.NewServer()

View File

@ -25,40 +25,31 @@ import (
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/go-sdk/service/internal"
)
// AddTopicEventHandler appends provided event handler with topic name to the service.
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn func(ctx context.Context, e *common.TopicEvent) (retry bool, err error)) error {
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error {
if sub == nil {
return errors.New("subscription required")
}
if sub.Topic == "" {
return errors.New("topic name required")
}
if sub.PubsubName == "" {
return errors.New("pub/sub name required")
}
if fn == nil {
return fmt.Errorf("topic handler required")
}
key := fmt.Sprintf("%s-%s", sub.PubsubName, sub.Topic)
s.topicSubscriptions[key] = &topicEventHandler{
component: sub.PubsubName,
topic: sub.Topic,
fn: fn,
meta: sub.Metadata,
if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil {
return err
}
return nil
}
// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to.
func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*pb.ListTopicSubscriptionsResponse, error) {
subs := make([]*pb.TopicSubscription, 0)
for _, v := range s.topicSubscriptions {
for _, v := range s.topicRegistrar {
s := v.Subscription
sub := &pb.TopicSubscription{
PubsubName: v.component,
Topic: v.topic,
Metadata: v.meta,
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
}
subs = append(subs, sub)
}
@ -68,6 +59,23 @@ func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*
}, nil
}
func convertRoutes(routes *internal.TopicRoutes) *pb.TopicRoutes {
if routes == nil {
return nil
}
rules := make([]*pb.TopicRule, len(routes.Rules))
for i, rule := range routes.Rules {
rules[i] = &pb.TopicRule{
Match: rule.Match,
Path: rule.Path,
}
}
return &pb.TopicRoutes{
Rules: rules,
Default: routes.Default,
}
}
// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed.
// Dapr sends published messages in a CloudEvents v1.0 envelope.
func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*pb.TopicEventResponse, error) {
@ -76,8 +84,8 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*p
// since Dapr will not get updated until long after this event expires, just drop it
return &pb.TopicEventResponse{Status: pb.TopicEventResponse_DROP}, errors.New("pub/sub and topic names required")
}
key := fmt.Sprintf("%s-%s", in.PubsubName, in.Topic)
if h, ok := s.topicSubscriptions[key]; ok {
key := in.PubsubName + "-" + in.Topic
if sub, ok := s.topicRegistrar[key]; ok {
data := interface{}(in.Data)
if len(in.Data) > 0 {
mediaType, _, err := mime.ParseMediaType(in.DataContentType)
@ -113,7 +121,19 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*p
Topic: in.Topic,
PubsubName: in.PubsubName,
}
retry, err := h.fn(ctx, e)
h := sub.DefaultHandler
if in.Path != "" {
if pathHandler, ok := sub.RouteHandlers[in.Path]; ok {
h = pathHandler
}
}
if h == nil {
return &pb.TopicEventResponse{Status: pb.TopicEventResponse_RETRY}, fmt.Errorf(
"route %s for pub/sub and topic combination not configured: %s/%s",
in.Path, in.PubsubName, in.Topic,
)
}
retry, err := h(ctx, e)
if err == nil {
return &pb.TopicEventResponse{Status: pb.TopicEventResponse_SUCCESS}, nil
}

View File

@ -44,17 +44,51 @@ func TestTopicErrors(t *testing.T) {
}
func TestTopicSubscriptionList(t *testing.T) {
sub := &common.Subscription{
server := getTestServer()
// Add default route.
sub1 := &common.Subscription{
PubsubName: "messages",
Topic: "test",
Route: "/test",
}
server := getTestServer()
err := server.AddTopicEventHandler(sub, eventHandler)
err := server.AddTopicEventHandler(sub1, eventHandler)
assert.Nil(t, err)
resp, err := server.ListTopicSubscriptions(context.Background(), &empty.Empty{})
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Lenf(t, resp.Subscriptions, 1, "expected 1 handlers")
if assert.Lenf(t, resp.Subscriptions, 1, "expected 1 handlers") {
sub := resp.Subscriptions[0]
assert.Equal(t, "messages", sub.PubsubName)
assert.Equal(t, "test", sub.Topic)
assert.Nil(t, sub.Routes)
}
// Add routing rule.
sub2 := &common.Subscription{
PubsubName: "messages",
Topic: "test",
Route: "/other",
Match: `event.type == "other"`,
}
err = server.AddTopicEventHandler(sub2, eventHandler)
assert.Nil(t, err)
resp, err = server.ListTopicSubscriptions(context.Background(), &empty.Empty{})
assert.NoError(t, err)
assert.NotNil(t, resp)
if assert.Lenf(t, resp.Subscriptions, 1, "expected 1 handlers") {
sub := resp.Subscriptions[0]
assert.Equal(t, "messages", sub.PubsubName)
assert.Equal(t, "test", sub.Topic)
if assert.NotNil(t, sub.Routes) {
assert.Equal(t, "/test", sub.Routes.Default)
if assert.Len(t, sub.Routes.Rules, 1) {
rule := sub.Routes.Rules[0]
assert.Equal(t, "/other", rule.Path)
assert.Equal(t, `event.type == "other"`, rule.Match)
}
}
}
}
// go test -timeout 30s ./service/grpc -count 1 -run ^TestTopic$

View File

@ -14,7 +14,6 @@ limitations under the License.
package http
import (
"context"
"fmt"
"io/ioutil"
"net/http"
@ -24,7 +23,7 @@ import (
)
// AddBindingInvocationHandler appends provided binding invocation handler with its route to the service.
func (s *Server) AddBindingInvocationHandler(route string, fn func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)) error {
func (s *Server) AddBindingInvocationHandler(route string, fn common.BindingInvocationHandler) error {
if route == "" {
return fmt.Errorf("binding route required")
}

View File

@ -14,7 +14,6 @@ limitations under the License.
package http
import (
"context"
"fmt"
"io/ioutil"
"net/http"
@ -24,7 +23,7 @@ import (
)
// AddServiceInvocationHandler appends provided service invocation handler with its route to the service.
func (s *Server) AddServiceInvocationHandler(route string, fn func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error)) error {
func (s *Server) AddServiceInvocationHandler(route string, fn common.ServiceInvocationHandler) error {
if route == "" {
return fmt.Errorf("service route required")
}

View File

@ -24,8 +24,8 @@ import (
"github.com/dapr/go-sdk/actor"
"github.com/dapr/go-sdk/actor/config"
"github.com/dapr/go-sdk/actor/runtime"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/go-sdk/service/internal"
)
// NewService creates new Service.
@ -48,19 +48,19 @@ func newServer(address string, router *mux.Router) *Server {
Addr: address,
Handler: router,
},
mux: router,
topicSubscriptions: make([]*common.Subscription, 0),
authToken: os.Getenv(common.AppAPITokenEnvVar),
mux: router,
topicRegistrar: make(internal.TopicRegistrar),
authToken: os.Getenv(common.AppAPITokenEnvVar),
}
}
// Server is the HTTP server wrapping mux many Dapr helpers.
type Server struct {
address string
mux *mux.Router
httpServer *http.Server
topicSubscriptions []*common.Subscription
authToken string
address string
mux *mux.Router
httpServer *http.Server
topicRegistrar internal.TopicRegistrar
authToken string
}
func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option) {

View File

@ -14,22 +14,18 @@ limitations under the License.
package http
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"errors"
"io/ioutil"
"net/http"
"strings"
"github.com/gorilla/mux"
actorErr "github.com/dapr/go-sdk/actor/error"
"github.com/dapr/go-sdk/actor/runtime"
"github.com/pkg/errors"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/go-sdk/service/internal"
)
const (
@ -74,8 +70,12 @@ type topicEventJSON struct {
func (s *Server) registerBaseHandler() {
// register subscribe handler
f := func(w http.ResponseWriter, r *http.Request) {
subs := make([]*internal.TopicSubscription, 0, len(s.topicRegistrar))
for _, s := range s.topicRegistrar {
subs = append(subs, s.Subscription)
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(s.topicSubscriptions); err != nil {
if err := json.NewEncoder(w).Encode(subs); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@ -95,11 +95,10 @@ func (s *Server) registerBaseHandler() {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
if _, err = w.Write(data); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
s.mux.HandleFunc("/dapr/config", fRegister).Methods(http.MethodGet)
@ -178,29 +177,19 @@ func (s *Server) registerBaseHandler() {
}
// AddTopicEventHandler appends provided event handler with it's name to the service.
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn func(ctx context.Context, e *common.TopicEvent) (retry bool, err error)) error {
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error {
if sub == nil {
return errors.New("subscription required")
}
if sub.Topic == "" {
return errors.New("topic name required")
}
if sub.PubsubName == "" {
return errors.New("pub/sub name required")
}
// Route is only required for HTTP but should be specified for the
// app protocol to be interchangeable.
if sub.Route == "" {
return errors.New("handler route name")
}
if fn == nil {
return fmt.Errorf("topic handler required")
if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil {
return err
}
if !strings.HasPrefix(sub.Route, "/") {
sub.Route = fmt.Sprintf("/%s", sub.Route)
}
s.topicSubscriptions = append(s.topicSubscriptions, sub)
s.mux.Handle(sub.Route, optionsHandler(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
// check for post with no data

View File

@ -18,7 +18,10 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"sort"
"strings"
"testing"
@ -26,8 +29,10 @@ import (
"github.com/dapr/go-sdk/actor/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/go-sdk/service/internal"
)
func testTopicFunc(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
@ -90,8 +95,52 @@ func TestEventHandler(t *testing.T) {
err = s.AddTopicEventHandler(sub2, testErrorTopicFunc)
assert.NoErrorf(t, err, "error adding error event handler")
sub3 := &common.Subscription{
PubsubName: "messages",
Topic: "test",
Route: "/other",
Match: `event.type == "other"`,
Priority: 1,
}
err = s.AddTopicEventHandler(sub3, testTopicFunc)
assert.NoErrorf(t, err, "error adding error event handler")
s.registerBaseHandler()
req, err := http.NewRequest(http.MethodGet, "/dapr/subscribe", nil)
require.NoErrorf(t, err, "error creating request: %s", data)
req.Header.Set("Accept", "application/json")
rr := httptest.NewRecorder()
s.mux.ServeHTTP(rr, req)
resp := rr.Result()
defer resp.Body.Close()
payload, err := io.ReadAll(resp.Body)
require.NoErrorf(t, err, "error reading response")
var subs []internal.TopicSubscription
require.NoErrorf(t, json.Unmarshal(payload, &subs), "could not decode subscribe response")
sort.Slice(subs, func(i, j int) bool {
less := strings.Compare(subs[i].PubsubName, subs[j].PubsubName)
if less != 0 {
return less < 0
}
return strings.Compare(subs[i].Topic, subs[j].Topic) <= 0
})
if assert.Lenf(t, subs, 2, "unexpected subscription count") {
assert.Equal(t, "messages", subs[0].PubsubName)
assert.Equal(t, "errors", subs[0].Topic)
assert.Equal(t, "messages", subs[1].PubsubName)
assert.Equal(t, "test", subs[1].Topic)
assert.Equal(t, "", subs[1].Route)
assert.Equal(t, "/", subs[1].Routes.Default)
if assert.Lenf(t, subs[1].Routes.Rules, 1, "unexpected rules count") {
assert.Equal(t, `event.type == "other"`, subs[1].Routes.Rules[0].Match)
assert.Equal(t, "/other", subs[1].Routes.Rules[0].Path)
}
}
makeEventRequest(t, s, "/", data, http.StatusOK)
makeEventRequest(t, s, "/", "", http.StatusSeeOther)
makeEventRequest(t, s, "/", "not JSON", http.StatusSeeOther)

View File

@ -0,0 +1,56 @@
package internal
import (
"errors"
"fmt"
"github.com/dapr/go-sdk/service/common"
)
// TopicRegistrar is a map of <pubsubname>-<topic> to `TopicRegistration`
// and acts as a lookup as the application is building up subscriptions with
// potentially multiple routes per topic.
type TopicRegistrar map[string]*TopicRegistration
// TopicRegistration encapsulates the subscription and handlers.
type TopicRegistration struct {
Subscription *TopicSubscription
DefaultHandler common.TopicEventHandler
RouteHandlers map[string]common.TopicEventHandler
}
func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventHandler) error {
if sub.Topic == "" {
return errors.New("topic name required")
}
if sub.PubsubName == "" {
return errors.New("pub/sub name required")
}
if fn == nil {
return fmt.Errorf("topic handler required")
}
key := sub.PubsubName + "-" + sub.Topic
ts, ok := m[key]
if !ok {
ts = &TopicRegistration{
Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic),
RouteHandlers: make(map[string]common.TopicEventHandler),
DefaultHandler: nil,
}
m[key] = ts
}
if sub.Match != "" {
if err := ts.Subscription.AddRoutingRule(sub.Route, sub.Match, sub.Priority); err != nil {
return err
}
} else {
if err := ts.Subscription.SetDefaultRoute(sub.Route); err != nil {
return err
}
ts.DefaultHandler = fn
}
ts.RouteHandlers[sub.Route] = fn
return nil
}

View File

@ -0,0 +1,73 @@
package internal_test
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/go-sdk/service/internal"
)
func TestTopicRegistrarValidation(t *testing.T) {
fn := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
return false, nil
}
tests := map[string]struct {
sub common.Subscription
fn common.TopicEventHandler
err string
}{
"pubsub required": {
common.Subscription{ //nolint:exhaustivestruct
PubsubName: "",
Topic: "test",
}, fn, "pub/sub name required",
},
"topic required": {
common.Subscription{ //nolint:exhaustivestruct
PubsubName: "test",
Topic: "",
}, fn, "topic name required",
},
"handler required": {
common.Subscription{ //nolint:exhaustivestruct
PubsubName: "test",
Topic: "test",
}, nil, "topic handler required",
},
"route required for routing rule": {
common.Subscription{ //nolint:exhaustivestruct
PubsubName: "test",
Topic: "test",
Route: "",
Match: `event.type == "test"`,
}, fn, "path is required for routing rules",
},
"success default route": {
common.Subscription{ //nolint:exhaustivestruct
PubsubName: "test",
Topic: "test",
}, fn, "",
},
"success routing rule": {
common.Subscription{ //nolint:exhaustivestruct
PubsubName: "test",
Topic: "test",
Route: "/test",
Match: `event.type == "test"`,
}, fn, "",
},
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
m := internal.TopicRegistrar{}
if tt.err != "" {
assert.EqualError(t, m.AddSubscription(&tt.sub, tt.fn), tt.err)
} else {
assert.NoError(t, m.AddSubscription(&tt.sub, tt.fn))
}
})
}
}

View File

@ -0,0 +1,112 @@
package internal
import (
"errors"
"fmt"
"sort"
)
// TopicSubscription internally represents single topic subscription.
type TopicSubscription struct {
// PubsubName is name of the pub/sub this message came from.
PubsubName string `json:"pubsubname"`
// Topic is the name of the topic.
Topic string `json:"topic"`
// Route is the route of the handler where HTTP topic events should be published (passed as Path in gRPC).
Route string `json:"route,omitempty"`
// Routes specify multiple routes where topic events should be sent.
Routes *TopicRoutes `json:"routes,omitempty"`
// Metadata is the subscription metadata.
Metadata map[string]string `json:"metadata,omitempty"`
}
// TopicRoutes encapsulates the default route and multiple routing rules.
type TopicRoutes struct {
Rules []TopicRule `json:"rules,omitempty"`
Default string `json:"default,omitempty"`
// priority is used to track duplicate priorities where priority > 0.
// when priority is not specified (0), then the order in which they
// were added is used.
priorities map[int]struct{}
}
// TopicRule represents a single routing rule.
type TopicRule struct {
// Match is the CEL expression to match on the CloudEvent envelope.
Match string `json:"match"`
// Path is the HTTP path to post the event to (passed as Path in gRPC).
Path string `json:"path"`
// priority is the optional priority order (low to high) for this rule.
priority int `json:"-"`
}
// NewTopicSubscription creates a new `TopicSubscription`.
func NewTopicSubscription(pubsubName, topic string) *TopicSubscription {
return &TopicSubscription{ //nolint:exhaustivestruct
PubsubName: pubsubName,
Topic: topic,
}
}
// SetMetadata sets the metadata for the subscription if not already set.
// An error is returned if it is already set.
func (s *TopicSubscription) SetMetadata(metadata map[string]string) error {
if s.Metadata != nil {
return fmt.Errorf("subscription for topic %s on pubsub %s already has metadata set", s.Topic, s.PubsubName)
}
s.Metadata = metadata
return nil
}
// SetDefaultRoute sets the default route if not already set.
// An error is returned if it is already set.
func (s *TopicSubscription) SetDefaultRoute(path string) error {
if s.Routes == nil {
if s.Route != "" {
return fmt.Errorf("subscription for topic %s on pubsub %s already has route %s", s.Topic, s.PubsubName, s.Route)
}
s.Route = path
} else {
if s.Routes.Default != "" {
return fmt.Errorf("subscription for topic %s on pubsub %s already has route %s", s.Topic, s.PubsubName, s.Routes.Default)
}
s.Routes.Default = path
}
return nil
}
// AddRoutingRule adds a routing rule.
// An error is returned if a there id a duplicate priority > 1.
func (s *TopicSubscription) AddRoutingRule(path, match string, priority int) error {
if path == "" {
return errors.New("path is required for routing rules")
}
if s.Routes == nil {
s.Routes = &TopicRoutes{ //nolint:exhaustivestruct
Default: s.Route,
priorities: map[int]struct{}{},
}
s.Route = ""
}
if priority > 0 {
if _, exists := s.Routes.priorities[priority]; exists {
return fmt.Errorf("subscription for topic %s on pubsub %s already has a routing rule with priority %d", s.Topic, s.PubsubName, priority)
}
}
s.Routes.Rules = append(s.Routes.Rules, TopicRule{
Match: match,
Path: path,
priority: priority,
})
sort.SliceStable(s.Routes.Rules, func(i, j int) bool {
return s.Routes.Rules[i].priority < s.Routes.Rules[j].priority
})
if priority > 0 {
s.Routes.priorities[priority] = struct{}{}
}
return nil
}

View File

@ -0,0 +1,72 @@
package internal_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/dapr/go-sdk/service/internal"
)
func TestTopicSubscripiton(t *testing.T) {
t.Run("duplicate metadata", func(t *testing.T) {
sub := internal.NewTopicSubscription("test", "mytopic")
assert.NoError(t, sub.SetMetadata(map[string]string{
"test": "test",
}))
assert.EqualError(t, sub.SetMetadata(map[string]string{
"test": "test",
}), "subscription for topic mytopic on pubsub test already has metadata set")
})
t.Run("duplicate route", func(t *testing.T) {
sub := internal.NewTopicSubscription("test", "mytopic")
assert.NoError(t, sub.SetDefaultRoute("/test"))
assert.Equal(t, "/test", sub.Route)
assert.EqualError(t, sub.SetDefaultRoute("/test"),
"subscription for topic mytopic on pubsub test already has route /test")
})
t.Run("duplicate route after routing rule", func(t *testing.T) {
sub := internal.NewTopicSubscription("test", "mytopic")
assert.NoError(t, sub.AddRoutingRule("/other", `event.type == "test"`, 0))
assert.NoError(t, sub.SetDefaultRoute("/test"))
assert.EqualError(t, sub.SetDefaultRoute("/test"),
"subscription for topic mytopic on pubsub test already has route /test")
})
t.Run("default route after routing rule", func(t *testing.T) {
sub := internal.NewTopicSubscription("test", "mytopic")
assert.NoError(t, sub.SetDefaultRoute("/test"))
assert.Equal(t, "/test", sub.Route)
assert.NoError(t, sub.AddRoutingRule("/other", `event.type == "test"`, 0))
assert.Equal(t, "", sub.Route)
assert.Equal(t, "/test", sub.Routes.Default)
assert.EqualError(t, sub.SetDefaultRoute("/test"),
"subscription for topic mytopic on pubsub test already has route /test")
})
t.Run("duplicate routing rule priority", func(t *testing.T) {
sub := internal.NewTopicSubscription("test", "mytopic")
assert.NoError(t, sub.AddRoutingRule("/other", `event.type == "other"`, 1))
assert.EqualError(t, sub.AddRoutingRule("/test", `event.type == "test"`, 1),
"subscription for topic mytopic on pubsub test already has a routing rule with priority 1")
})
t.Run("priority ordering", func(t *testing.T) {
sub := internal.NewTopicSubscription("test", "mytopic")
assert.NoError(t, sub.AddRoutingRule("/100", `event.type == "100"`, 100))
assert.NoError(t, sub.AddRoutingRule("/1", `event.type == "1"`, 1))
assert.NoError(t, sub.AddRoutingRule("/50", `event.type == "50"`, 50))
assert.NoError(t, sub.SetDefaultRoute("/default"))
assert.Equal(t, "/default", sub.Routes.Default)
if assert.Len(t, sub.Routes.Rules, 3) {
assert.Equal(t, "/1", sub.Routes.Rules[0].Path)
assert.Equal(t, `event.type == "1"`, sub.Routes.Rules[0].Match)
assert.Equal(t, "/50", sub.Routes.Rules[1].Path)
assert.Equal(t, `event.type == "50"`, sub.Routes.Rules[1].Match)
assert.Equal(t, "/100", sub.Routes.Rules[2].Path)
assert.Equal(t, `event.type == "100"`, sub.Routes.Rules[2].Match)
}
})
}