From b3e18bb11d51e26fd340ba743774222287c6e720 Mon Sep 17 00:00:00 2001 From: "Alessandro (Ale) Segala" <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 12 Aug 2022 17:13:47 -0700 Subject: [PATCH] pubsub.in-memory: add support for wildcard topics (#1966) * pubsub.in-memory: add support for wildcard topics Fixes #1964 Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * make modtidy-all Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> Co-authored-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> --- go.mod | 3 +- go.sum | 2 - internal/eventbus/README.md | 29 ++++ internal/eventbus/event_bus.go | 223 ++++++++++++++++++++++++++++ internal/eventbus/event_bus_test.go | 218 +++++++++++++++++++++++++++ pubsub/in-memory/in-memory.go | 9 +- pubsub/in-memory/in-memory_test.go | 24 +++ 7 files changed, 499 insertions(+), 9 deletions(-) create mode 100644 internal/eventbus/README.md create mode 100644 internal/eventbus/event_bus.go create mode 100644 internal/eventbus/event_bus_test.go diff --git a/go.mod b/go.mod index e781a30ca..1deb5fa95 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,6 @@ require ( github.com/andybalholm/brotli v1.0.2 // indirect github.com/apache/rocketmq-client-go/v2 v2.1.1-rc2 github.com/apache/thrift v0.16.0 // indirect - github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef github.com/aws/aws-sdk-go v1.43.16 github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b @@ -521,7 +520,7 @@ require ( github.com/xdg-go/stringprep v1.0.2 // indirect github.com/yashtewari/glob-intersection v0.1.0 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect - go.uber.org/atomic v1.9.0 // indirect + go.uber.org/atomic v1.9.0 go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect diff --git a/go.sum b/go.sum index 51191dfb2..e4d0404f6 100644 --- a/go.sum +++ b/go.sum @@ -369,8 +369,6 @@ github.com/armon/go-metrics v0.3.10/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= -github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= -github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496 h1:zV3ejI06GQ59hwDQAvmK1qxOQGB3WuVTRoY0okPTAv0= github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496/go.mod h1:oGkLhpf+kjZl6xBf758TQhh5XrAeiJv/7FRz/2spLIg= diff --git a/internal/eventbus/README.md b/internal/eventbus/README.md new file mode 100644 index 000000000..60a0d85b5 --- /dev/null +++ b/internal/eventbus/README.md @@ -0,0 +1,29 @@ +# eventbus + +This is a based on [github.com/asaskevich/EventBus](https://github.com/asaskevich/EventBus) (commit: 49d423059eefd67a7243331db83e16d347139b4a), picking only the in-memory event bus and with modifications to add support for wildcard topics. + +License for the original code: + +``` +The MIT License (MIT) + +Copyright (c) 2014 Alex Saskevich + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +``` \ No newline at end of file diff --git a/internal/eventbus/event_bus.go b/internal/eventbus/event_bus.go new file mode 100644 index 000000000..ce7edcf47 --- /dev/null +++ b/internal/eventbus/event_bus.go @@ -0,0 +1,223 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +The MIT License (MIT) +Copyright (c) 2014 Alex Saskevich +*/ + +package eventbus + +import ( + "fmt" + "reflect" + "strings" + "sync" +) + +// BusSubscriber defines subscription-related bus behavior +type BusSubscriber interface { + Subscribe(topic string, fn interface{}) error + SubscribeAsync(topic string, fn interface{}, transactional bool) error + Unsubscribe(topic string, handler interface{}) error + WaitAsync() +} + +// BusPublisher defines publishing-related bus behavior +type BusPublisher interface { + Publish(topic string, args ...interface{}) +} + +// Bus englobes global (subscribe, publish, control) bus behavior +type Bus interface { + BusSubscriber + BusPublisher +} + +// EventBus - box for handlers and callbacks. +type EventBus struct { + enableWildcards bool + handlers map[string][]*eventHandler + lock sync.Mutex // a lock for the map + wg sync.WaitGroup +} + +type eventHandler struct { + callBack reflect.Value + async bool + transactional bool + sync.Mutex // lock for an event handler - useful for running async callbacks serially +} + +// New returns new EventBus with empty handlers. +func New(enableWildcards bool) Bus { + b := &EventBus{ + enableWildcards, + make(map[string][]*eventHandler), + sync.Mutex{}, + sync.WaitGroup{}, + } + return Bus(b) +} + +// doSubscribe handles the subscription logic and is utilized by the public Subscribe functions +func (bus *EventBus) doSubscribe(topic string, fn interface{}, handler *eventHandler) error { + bus.lock.Lock() + defer bus.lock.Unlock() + if !(reflect.TypeOf(fn).Kind() == reflect.Func) { + return fmt.Errorf("%s is not of type reflect.Func", reflect.TypeOf(fn).Kind()) + } + bus.handlers[topic] = append(bus.handlers[topic], handler) + return nil +} + +// Subscribe subscribes to a topic. +// Returns error if `fn` is not a function. +func (bus *EventBus) Subscribe(topic string, fn interface{}) error { + return bus.doSubscribe(topic, fn, &eventHandler{ + reflect.ValueOf(fn), false, false, sync.Mutex{}, + }) +} + +// SubscribeAsync subscribes to a topic with an asynchronous callback +// Transactional determines whether subsequent callbacks for a topic are +// run serially (true) or concurrently (false) +// Returns error if `fn` is not a function. +func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error { + return bus.doSubscribe(topic, fn, &eventHandler{ + reflect.ValueOf(fn), true, transactional, sync.Mutex{}, + }) +} + +// getCallbacks returns the callback(s) registered for the topic +func (bus *EventBus) getCallbacks(topic string) []*eventHandler { + if !bus.enableWildcards { + handlers, ok := bus.handlers[topic] + if ok && len(handlers) > 0 { + // Make a hard copy to prevent the slice to be changed after this method + copyHandlers := make([]*eventHandler, len(handlers)) + copy(copyHandlers, handlers) + return copyHandlers + } + return nil + } + + handlers := []*eventHandler{} + for k, h := range bus.handlers { + if k == topic || + (strings.HasSuffix(k, "*") && strings.HasPrefix(topic, k[0:len(k)-1]) && topic != k[0:len(k)-1]) { + handlers = append(handlers, h...) + continue + } + } + + if len(handlers) == 0 { + return nil + } + return handlers +} + +// Unsubscribe removes callback defined for a topic. +// Returns error if there are no callbacks subscribed to the topic. +func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error { + bus.lock.Lock() + defer bus.lock.Unlock() + + if _, ok := bus.handlers[topic]; ok && len(bus.handlers[topic]) > 0 { + bus.removeHandler(topic, bus.findHandlerIdx(topic, reflect.ValueOf(handler))) + return nil + } + return fmt.Errorf("topic %s doesn't exist", topic) +} + +// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback. +func (bus *EventBus) Publish(topic string, args ...interface{}) { + bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish + defer bus.lock.Unlock() + + handlers := bus.getCallbacks(topic) + if len(handlers) > 0 { + for _, handler := range handlers { + if !handler.async { + bus.doPublish(handler, topic, args...) + } else { + bus.wg.Add(1) + if handler.transactional { + bus.lock.Unlock() + handler.Lock() + bus.lock.Lock() + } + go bus.doPublishAsync(handler, topic, args...) + } + } + } +} + +func (bus *EventBus) doPublish(handler *eventHandler, topic string, args ...interface{}) { + passedArguments := bus.setUpPublish(handler, args...) + handler.callBack.Call(passedArguments) +} + +func (bus *EventBus) doPublishAsync(handler *eventHandler, topic string, args ...interface{}) { + defer bus.wg.Done() + if handler.transactional { + defer handler.Unlock() + } + bus.doPublish(handler, topic, args...) +} + +func (bus *EventBus) removeHandler(topic string, idx int) { + if _, ok := bus.handlers[topic]; !ok { + return + } + l := len(bus.handlers[topic]) + + if !(0 <= idx && idx < l) { + return + } + + copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:]) + bus.handlers[topic][l-1] = nil // or the zero value of T + bus.handlers[topic] = bus.handlers[topic][:l-1] +} + +func (bus *EventBus) findHandlerIdx(topic string, callback reflect.Value) int { + if _, ok := bus.handlers[topic]; ok { + for idx, handler := range bus.handlers[topic] { + if handler.callBack.Type() == callback.Type() && + handler.callBack.Pointer() == callback.Pointer() { + return idx + } + } + } + return -1 +} + +func (bus *EventBus) setUpPublish(callback *eventHandler, args ...interface{}) []reflect.Value { + funcType := callback.callBack.Type() + passedArguments := make([]reflect.Value, len(args)) + for i, v := range args { + if v == nil { + passedArguments[i] = reflect.New(funcType.In(i)).Elem() + } else { + passedArguments[i] = reflect.ValueOf(v) + } + } + + return passedArguments +} + +// WaitAsync waits for all async callbacks to complete +func (bus *EventBus) WaitAsync() { + bus.wg.Wait() +} diff --git a/internal/eventbus/event_bus_test.go b/internal/eventbus/event_bus_test.go new file mode 100644 index 000000000..5526d7f44 --- /dev/null +++ b/internal/eventbus/event_bus_test.go @@ -0,0 +1,218 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +The MIT License (MIT) +Copyright (c) 2014 Alex Saskevich +*/ + +package eventbus + +import ( + "testing" + "time" + + "go.uber.org/atomic" +) + +func TestNew(t *testing.T) { + bus := New(false) + if bus == nil { + t.Log("New EventBus not created!") + t.Fail() + } +} + +func TestGetCallbacks(t *testing.T) { + t.Run("no wildcards", func(t *testing.T) { + bus := New(false).(*EventBus) + bus.Subscribe("topic", func() {}) + bus.Subscribe("topic*", func() {}) + if cbs := bus.getCallbacks("my_topic"); len(cbs) > 0 { + t.Fail() + } + if cbs := bus.getCallbacks("topic"); len(cbs) != 1 { + t.Fail() + } + if cbs := bus.getCallbacks("topic*"); len(cbs) != 1 { + t.Fail() + } + if cbs := bus.getCallbacks("topic1"); len(cbs) > 0 { + t.Fail() + } + }) + + t.Run("with wildcards", func(t *testing.T) { + bus := New(true).(*EventBus) + bus.Subscribe("topic", func() {}) + bus.Subscribe("topic*", func() {}) + bus.Subscribe("topi*", func() {}) + if cbs := bus.getCallbacks("my_topic"); len(cbs) > 0 { + t.Fail() + } + if cbs := bus.getCallbacks("topic"); len(cbs) != 2 { + t.Fail() + } + if cbs := bus.getCallbacks("topic1"); len(cbs) != 2 { + t.Fail() + } + if cbs := bus.getCallbacks("topicfoobar"); len(cbs) != 2 { + t.Fail() + } + }) +} + +func TestSubscribe(t *testing.T) { + bus := New(false) + if bus.Subscribe("topic", func() {}) != nil { + t.Fail() + } + if bus.Subscribe("topic", "String") == nil { + t.Fail() + } +} + +func TestUnsubscribe(t *testing.T) { + bus := New(false) + handler := func() {} + bus.Subscribe("topic*", handler) + if bus.Unsubscribe("topic*", handler) != nil { + t.Fail() + } + if bus.Unsubscribe("topic*", handler) == nil { + t.Fail() + } +} + +type handler struct { + val int +} + +func (h *handler) Handle() { + h.val++ +} + +func TestUnsubscribeMethod(t *testing.T) { + bus := New(false) + h := &handler{val: 0} + + bus.Subscribe("topic", h.Handle) + bus.Publish("topic") + if bus.Unsubscribe("topic", h.Handle) != nil { + t.Fail() + } + if bus.Unsubscribe("topic", h.Handle) == nil { + t.Fail() + } + bus.Publish("topic") + bus.WaitAsync() + + if h.val != 1 { + t.Fail() + } +} + +func TestPublish(t *testing.T) { + bus := New(false) + bus.Subscribe("topic", func(a int, err error) { + if a != 10 { + t.Fail() + } + + if err != nil { + t.Fail() + } + }) + bus.Publish("topic", 10, nil) +} + +func TestSubscribeAsyncTransactional(t *testing.T) { + results := make([]int, 0) + + bus := New(false) + bus.SubscribeAsync("topic", func(a int, out *[]int, dur string) { + sleep, _ := time.ParseDuration(dur) + time.Sleep(sleep) + *out = append(*out, a) + }, true) + + bus.Publish("topic", 1, &results, "1s") + bus.Publish("topic", 2, &results, "0s") + + bus.WaitAsync() + + if len(results) != 2 { + t.Fail() + } + + if results[0] != 1 || results[1] != 2 { + t.Fail() + } +} + +func TestSubscribeAsync(t *testing.T) { + results := make(chan int) + + bus := New(false) + bus.SubscribeAsync("topic", func(a int, out chan<- int) { + out <- a + }, false) + + numResults := atomic.NewInt32(0) + + go func() { + for range results { + numResults.Inc() + } + }() + + bus.Publish("topic", 1, results) + bus.Publish("topic", 2, results) + + bus.WaitAsync() + + time.Sleep(100 * time.Millisecond) + + if numResults.Load() != 2 { + t.Fail() + } +} + +func TestWildcards(t *testing.T) { + results := make(chan int) + + bus := New(true) + bus.SubscribeAsync("topic/*", func(a int, out chan<- int) { + out <- a + }, false) + + numResults := atomic.NewInt32(0) + go func() { + for range results { + numResults.Inc() + } + }() + + bus.Publish("topic", 1, results) + bus.Publish("topic", 2, results) + bus.Publish("topic/1", 3, results) + bus.Publish("topic/2", 4, results) + + bus.WaitAsync() + + time.Sleep(100 * time.Millisecond) + + if numResults.Load() != 2 { + t.Fail() + } +} diff --git a/pubsub/in-memory/in-memory.go b/pubsub/in-memory/in-memory.go index f2655fec5..16ffd6e6f 100644 --- a/pubsub/in-memory/in-memory.go +++ b/pubsub/in-memory/in-memory.go @@ -17,14 +17,13 @@ import ( "context" "time" - "github.com/asaskevich/EventBus" - + "github.com/dapr/components-contrib/internal/eventbus" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" ) type bus struct { - bus EventBus.Bus + bus eventbus.Bus log logger.Logger } @@ -39,11 +38,11 @@ func (a *bus) Close() error { } func (a *bus) Features() []pubsub.Feature { - return nil + return []pubsub.Feature{pubsub.FeatureSubscribeWildcards} } func (a *bus) Init(metadata pubsub.Metadata) error { - a.bus = EventBus.New() + a.bus = eventbus.New(true) return nil } diff --git a/pubsub/in-memory/in-memory_test.go b/pubsub/in-memory/in-memory_test.go index 72f91e23e..c06f7cd2f 100644 --- a/pubsub/in-memory/in-memory_test.go +++ b/pubsub/in-memory/in-memory_test.go @@ -57,6 +57,30 @@ func TestMultipleSubscribers(t *testing.T) { assert.Equal(t, "ABCD", string(<-ch2)) } +func TestWildcards(t *testing.T) { + bus := New(logger.NewLogger("test")) + bus.Init(pubsub.Metadata{}) + + ch1 := make(chan []byte) + ch2 := make(chan []byte) + bus.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: "mytopic"}, func(ctx context.Context, msg *pubsub.NewMessage) error { + return publish(ch1, msg) + }) + + bus.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: "topic*"}, func(ctx context.Context, msg *pubsub.NewMessage) error { + return publish(ch2, msg) + }) + + bus.Publish(&pubsub.PublishRequest{Data: []byte("1"), Topic: "mytopic"}) + assert.Equal(t, "1", string(<-ch1)) + + bus.Publish(&pubsub.PublishRequest{Data: []byte("2"), Topic: "topic1"}) + assert.Equal(t, "2", string(<-ch2)) + + bus.Publish(&pubsub.PublishRequest{Data: []byte("3"), Topic: "topicX"}) + assert.Equal(t, "3", string(<-ch2)) +} + func TestRetry(t *testing.T) { bus := New(logger.NewLogger("test")) bus.Init(pubsub.Metadata{})