pubsub.in-memory: add support for wildcard topics (#1966)
* pubsub.in-memory: add support for wildcard topics Fixes #1964 Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * make modtidy-all Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> Co-authored-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
This commit is contained in:
parent
cd740b4a7b
commit
b3e18bb11d
3
go.mod
3
go.mod
|
|
@ -37,7 +37,6 @@ require (
|
||||||
github.com/andybalholm/brotli v1.0.2 // indirect
|
github.com/andybalholm/brotli v1.0.2 // indirect
|
||||||
github.com/apache/rocketmq-client-go/v2 v2.1.1-rc2
|
github.com/apache/rocketmq-client-go/v2 v2.1.1-rc2
|
||||||
github.com/apache/thrift v0.16.0 // indirect
|
github.com/apache/thrift v0.16.0 // indirect
|
||||||
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
|
|
||||||
github.com/aws/aws-sdk-go v1.43.16
|
github.com/aws/aws-sdk-go v1.43.16
|
||||||
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
|
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
|
||||||
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
|
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
|
||||||
|
|
@ -521,7 +520,7 @@ require (
|
||||||
github.com/xdg-go/stringprep v1.0.2 // indirect
|
github.com/xdg-go/stringprep v1.0.2 // indirect
|
||||||
github.com/yashtewari/glob-intersection v0.1.0 // indirect
|
github.com/yashtewari/glob-intersection v0.1.0 // indirect
|
||||||
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
|
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
|
||||||
go.uber.org/atomic v1.9.0 // indirect
|
go.uber.org/atomic v1.9.0
|
||||||
go.uber.org/multierr v1.8.0 // indirect
|
go.uber.org/multierr v1.8.0 // indirect
|
||||||
go.uber.org/zap v1.21.0 // indirect
|
go.uber.org/zap v1.21.0 // indirect
|
||||||
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
|
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
|
||||||
|
|
|
||||||
2
go.sum
2
go.sum
|
|
@ -369,8 +369,6 @@ github.com/armon/go-metrics v0.3.10/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb
|
||||||
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
||||||
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
||||||
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
|
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
|
||||||
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM=
|
|
||||||
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII=
|
|
||||||
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
|
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
|
||||||
github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496 h1:zV3ejI06GQ59hwDQAvmK1qxOQGB3WuVTRoY0okPTAv0=
|
github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496 h1:zV3ejI06GQ59hwDQAvmK1qxOQGB3WuVTRoY0okPTAv0=
|
||||||
github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496/go.mod h1:oGkLhpf+kjZl6xBf758TQhh5XrAeiJv/7FRz/2spLIg=
|
github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496/go.mod h1:oGkLhpf+kjZl6xBf758TQhh5XrAeiJv/7FRz/2spLIg=
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,29 @@
|
||||||
|
# eventbus
|
||||||
|
|
||||||
|
This is a based on [github.com/asaskevich/EventBus](https://github.com/asaskevich/EventBus) (commit: 49d423059eefd67a7243331db83e16d347139b4a), picking only the in-memory event bus and with modifications to add support for wildcard topics.
|
||||||
|
|
||||||
|
License for the original code:
|
||||||
|
|
||||||
|
```
|
||||||
|
The MIT License (MIT)
|
||||||
|
|
||||||
|
Copyright (c) 2014 Alex Saskevich
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
SOFTWARE.
|
||||||
|
```
|
||||||
|
|
@ -0,0 +1,223 @@
|
||||||
|
/*
|
||||||
|
Copyright 2021 The Dapr Authors
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
The MIT License (MIT)
|
||||||
|
Copyright (c) 2014 Alex Saskevich
|
||||||
|
*/
|
||||||
|
|
||||||
|
package eventbus
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BusSubscriber defines subscription-related bus behavior
|
||||||
|
type BusSubscriber interface {
|
||||||
|
Subscribe(topic string, fn interface{}) error
|
||||||
|
SubscribeAsync(topic string, fn interface{}, transactional bool) error
|
||||||
|
Unsubscribe(topic string, handler interface{}) error
|
||||||
|
WaitAsync()
|
||||||
|
}
|
||||||
|
|
||||||
|
// BusPublisher defines publishing-related bus behavior
|
||||||
|
type BusPublisher interface {
|
||||||
|
Publish(topic string, args ...interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bus englobes global (subscribe, publish, control) bus behavior
|
||||||
|
type Bus interface {
|
||||||
|
BusSubscriber
|
||||||
|
BusPublisher
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventBus - box for handlers and callbacks.
|
||||||
|
type EventBus struct {
|
||||||
|
enableWildcards bool
|
||||||
|
handlers map[string][]*eventHandler
|
||||||
|
lock sync.Mutex // a lock for the map
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
type eventHandler struct {
|
||||||
|
callBack reflect.Value
|
||||||
|
async bool
|
||||||
|
transactional bool
|
||||||
|
sync.Mutex // lock for an event handler - useful for running async callbacks serially
|
||||||
|
}
|
||||||
|
|
||||||
|
// New returns new EventBus with empty handlers.
|
||||||
|
func New(enableWildcards bool) Bus {
|
||||||
|
b := &EventBus{
|
||||||
|
enableWildcards,
|
||||||
|
make(map[string][]*eventHandler),
|
||||||
|
sync.Mutex{},
|
||||||
|
sync.WaitGroup{},
|
||||||
|
}
|
||||||
|
return Bus(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// doSubscribe handles the subscription logic and is utilized by the public Subscribe functions
|
||||||
|
func (bus *EventBus) doSubscribe(topic string, fn interface{}, handler *eventHandler) error {
|
||||||
|
bus.lock.Lock()
|
||||||
|
defer bus.lock.Unlock()
|
||||||
|
if !(reflect.TypeOf(fn).Kind() == reflect.Func) {
|
||||||
|
return fmt.Errorf("%s is not of type reflect.Func", reflect.TypeOf(fn).Kind())
|
||||||
|
}
|
||||||
|
bus.handlers[topic] = append(bus.handlers[topic], handler)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe subscribes to a topic.
|
||||||
|
// Returns error if `fn` is not a function.
|
||||||
|
func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
|
||||||
|
return bus.doSubscribe(topic, fn, &eventHandler{
|
||||||
|
reflect.ValueOf(fn), false, false, sync.Mutex{},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubscribeAsync subscribes to a topic with an asynchronous callback
|
||||||
|
// Transactional determines whether subsequent callbacks for a topic are
|
||||||
|
// run serially (true) or concurrently (false)
|
||||||
|
// Returns error if `fn` is not a function.
|
||||||
|
func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error {
|
||||||
|
return bus.doSubscribe(topic, fn, &eventHandler{
|
||||||
|
reflect.ValueOf(fn), true, transactional, sync.Mutex{},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// getCallbacks returns the callback(s) registered for the topic
|
||||||
|
func (bus *EventBus) getCallbacks(topic string) []*eventHandler {
|
||||||
|
if !bus.enableWildcards {
|
||||||
|
handlers, ok := bus.handlers[topic]
|
||||||
|
if ok && len(handlers) > 0 {
|
||||||
|
// Make a hard copy to prevent the slice to be changed after this method
|
||||||
|
copyHandlers := make([]*eventHandler, len(handlers))
|
||||||
|
copy(copyHandlers, handlers)
|
||||||
|
return copyHandlers
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
handlers := []*eventHandler{}
|
||||||
|
for k, h := range bus.handlers {
|
||||||
|
if k == topic ||
|
||||||
|
(strings.HasSuffix(k, "*") && strings.HasPrefix(topic, k[0:len(k)-1]) && topic != k[0:len(k)-1]) {
|
||||||
|
handlers = append(handlers, h...)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(handlers) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return handlers
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribe removes callback defined for a topic.
|
||||||
|
// Returns error if there are no callbacks subscribed to the topic.
|
||||||
|
func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error {
|
||||||
|
bus.lock.Lock()
|
||||||
|
defer bus.lock.Unlock()
|
||||||
|
|
||||||
|
if _, ok := bus.handlers[topic]; ok && len(bus.handlers[topic]) > 0 {
|
||||||
|
bus.removeHandler(topic, bus.findHandlerIdx(topic, reflect.ValueOf(handler)))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("topic %s doesn't exist", topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
|
||||||
|
func (bus *EventBus) Publish(topic string, args ...interface{}) {
|
||||||
|
bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
|
||||||
|
defer bus.lock.Unlock()
|
||||||
|
|
||||||
|
handlers := bus.getCallbacks(topic)
|
||||||
|
if len(handlers) > 0 {
|
||||||
|
for _, handler := range handlers {
|
||||||
|
if !handler.async {
|
||||||
|
bus.doPublish(handler, topic, args...)
|
||||||
|
} else {
|
||||||
|
bus.wg.Add(1)
|
||||||
|
if handler.transactional {
|
||||||
|
bus.lock.Unlock()
|
||||||
|
handler.Lock()
|
||||||
|
bus.lock.Lock()
|
||||||
|
}
|
||||||
|
go bus.doPublishAsync(handler, topic, args...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bus *EventBus) doPublish(handler *eventHandler, topic string, args ...interface{}) {
|
||||||
|
passedArguments := bus.setUpPublish(handler, args...)
|
||||||
|
handler.callBack.Call(passedArguments)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bus *EventBus) doPublishAsync(handler *eventHandler, topic string, args ...interface{}) {
|
||||||
|
defer bus.wg.Done()
|
||||||
|
if handler.transactional {
|
||||||
|
defer handler.Unlock()
|
||||||
|
}
|
||||||
|
bus.doPublish(handler, topic, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bus *EventBus) removeHandler(topic string, idx int) {
|
||||||
|
if _, ok := bus.handlers[topic]; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
l := len(bus.handlers[topic])
|
||||||
|
|
||||||
|
if !(0 <= idx && idx < l) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:])
|
||||||
|
bus.handlers[topic][l-1] = nil // or the zero value of T
|
||||||
|
bus.handlers[topic] = bus.handlers[topic][:l-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bus *EventBus) findHandlerIdx(topic string, callback reflect.Value) int {
|
||||||
|
if _, ok := bus.handlers[topic]; ok {
|
||||||
|
for idx, handler := range bus.handlers[topic] {
|
||||||
|
if handler.callBack.Type() == callback.Type() &&
|
||||||
|
handler.callBack.Pointer() == callback.Pointer() {
|
||||||
|
return idx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bus *EventBus) setUpPublish(callback *eventHandler, args ...interface{}) []reflect.Value {
|
||||||
|
funcType := callback.callBack.Type()
|
||||||
|
passedArguments := make([]reflect.Value, len(args))
|
||||||
|
for i, v := range args {
|
||||||
|
if v == nil {
|
||||||
|
passedArguments[i] = reflect.New(funcType.In(i)).Elem()
|
||||||
|
} else {
|
||||||
|
passedArguments[i] = reflect.ValueOf(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return passedArguments
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitAsync waits for all async callbacks to complete
|
||||||
|
func (bus *EventBus) WaitAsync() {
|
||||||
|
bus.wg.Wait()
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,218 @@
|
||||||
|
/*
|
||||||
|
Copyright 2021 The Dapr Authors
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
The MIT License (MIT)
|
||||||
|
Copyright (c) 2014 Alex Saskevich
|
||||||
|
*/
|
||||||
|
|
||||||
|
package eventbus
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNew(t *testing.T) {
|
||||||
|
bus := New(false)
|
||||||
|
if bus == nil {
|
||||||
|
t.Log("New EventBus not created!")
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetCallbacks(t *testing.T) {
|
||||||
|
t.Run("no wildcards", func(t *testing.T) {
|
||||||
|
bus := New(false).(*EventBus)
|
||||||
|
bus.Subscribe("topic", func() {})
|
||||||
|
bus.Subscribe("topic*", func() {})
|
||||||
|
if cbs := bus.getCallbacks("my_topic"); len(cbs) > 0 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if cbs := bus.getCallbacks("topic"); len(cbs) != 1 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if cbs := bus.getCallbacks("topic*"); len(cbs) != 1 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if cbs := bus.getCallbacks("topic1"); len(cbs) > 0 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with wildcards", func(t *testing.T) {
|
||||||
|
bus := New(true).(*EventBus)
|
||||||
|
bus.Subscribe("topic", func() {})
|
||||||
|
bus.Subscribe("topic*", func() {})
|
||||||
|
bus.Subscribe("topi*", func() {})
|
||||||
|
if cbs := bus.getCallbacks("my_topic"); len(cbs) > 0 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if cbs := bus.getCallbacks("topic"); len(cbs) != 2 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if cbs := bus.getCallbacks("topic1"); len(cbs) != 2 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if cbs := bus.getCallbacks("topicfoobar"); len(cbs) != 2 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubscribe(t *testing.T) {
|
||||||
|
bus := New(false)
|
||||||
|
if bus.Subscribe("topic", func() {}) != nil {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if bus.Subscribe("topic", "String") == nil {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnsubscribe(t *testing.T) {
|
||||||
|
bus := New(false)
|
||||||
|
handler := func() {}
|
||||||
|
bus.Subscribe("topic*", handler)
|
||||||
|
if bus.Unsubscribe("topic*", handler) != nil {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if bus.Unsubscribe("topic*", handler) == nil {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type handler struct {
|
||||||
|
val int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handler) Handle() {
|
||||||
|
h.val++
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnsubscribeMethod(t *testing.T) {
|
||||||
|
bus := New(false)
|
||||||
|
h := &handler{val: 0}
|
||||||
|
|
||||||
|
bus.Subscribe("topic", h.Handle)
|
||||||
|
bus.Publish("topic")
|
||||||
|
if bus.Unsubscribe("topic", h.Handle) != nil {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if bus.Unsubscribe("topic", h.Handle) == nil {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
bus.Publish("topic")
|
||||||
|
bus.WaitAsync()
|
||||||
|
|
||||||
|
if h.val != 1 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPublish(t *testing.T) {
|
||||||
|
bus := New(false)
|
||||||
|
bus.Subscribe("topic", func(a int, err error) {
|
||||||
|
if a != 10 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
bus.Publish("topic", 10, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubscribeAsyncTransactional(t *testing.T) {
|
||||||
|
results := make([]int, 0)
|
||||||
|
|
||||||
|
bus := New(false)
|
||||||
|
bus.SubscribeAsync("topic", func(a int, out *[]int, dur string) {
|
||||||
|
sleep, _ := time.ParseDuration(dur)
|
||||||
|
time.Sleep(sleep)
|
||||||
|
*out = append(*out, a)
|
||||||
|
}, true)
|
||||||
|
|
||||||
|
bus.Publish("topic", 1, &results, "1s")
|
||||||
|
bus.Publish("topic", 2, &results, "0s")
|
||||||
|
|
||||||
|
bus.WaitAsync()
|
||||||
|
|
||||||
|
if len(results) != 2 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
if results[0] != 1 || results[1] != 2 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubscribeAsync(t *testing.T) {
|
||||||
|
results := make(chan int)
|
||||||
|
|
||||||
|
bus := New(false)
|
||||||
|
bus.SubscribeAsync("topic", func(a int, out chan<- int) {
|
||||||
|
out <- a
|
||||||
|
}, false)
|
||||||
|
|
||||||
|
numResults := atomic.NewInt32(0)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for range results {
|
||||||
|
numResults.Inc()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
bus.Publish("topic", 1, results)
|
||||||
|
bus.Publish("topic", 2, results)
|
||||||
|
|
||||||
|
bus.WaitAsync()
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
if numResults.Load() != 2 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWildcards(t *testing.T) {
|
||||||
|
results := make(chan int)
|
||||||
|
|
||||||
|
bus := New(true)
|
||||||
|
bus.SubscribeAsync("topic/*", func(a int, out chan<- int) {
|
||||||
|
out <- a
|
||||||
|
}, false)
|
||||||
|
|
||||||
|
numResults := atomic.NewInt32(0)
|
||||||
|
go func() {
|
||||||
|
for range results {
|
||||||
|
numResults.Inc()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
bus.Publish("topic", 1, results)
|
||||||
|
bus.Publish("topic", 2, results)
|
||||||
|
bus.Publish("topic/1", 3, results)
|
||||||
|
bus.Publish("topic/2", 4, results)
|
||||||
|
|
||||||
|
bus.WaitAsync()
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
if numResults.Load() != 2 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -17,14 +17,13 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/asaskevich/EventBus"
|
"github.com/dapr/components-contrib/internal/eventbus"
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/pubsub"
|
"github.com/dapr/components-contrib/pubsub"
|
||||||
"github.com/dapr/kit/logger"
|
"github.com/dapr/kit/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
type bus struct {
|
type bus struct {
|
||||||
bus EventBus.Bus
|
bus eventbus.Bus
|
||||||
log logger.Logger
|
log logger.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -39,11 +38,11 @@ func (a *bus) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *bus) Features() []pubsub.Feature {
|
func (a *bus) Features() []pubsub.Feature {
|
||||||
return nil
|
return []pubsub.Feature{pubsub.FeatureSubscribeWildcards}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *bus) Init(metadata pubsub.Metadata) error {
|
func (a *bus) Init(metadata pubsub.Metadata) error {
|
||||||
a.bus = EventBus.New()
|
a.bus = eventbus.New(true)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -57,6 +57,30 @@ func TestMultipleSubscribers(t *testing.T) {
|
||||||
assert.Equal(t, "ABCD", string(<-ch2))
|
assert.Equal(t, "ABCD", string(<-ch2))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWildcards(t *testing.T) {
|
||||||
|
bus := New(logger.NewLogger("test"))
|
||||||
|
bus.Init(pubsub.Metadata{})
|
||||||
|
|
||||||
|
ch1 := make(chan []byte)
|
||||||
|
ch2 := make(chan []byte)
|
||||||
|
bus.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: "mytopic"}, func(ctx context.Context, msg *pubsub.NewMessage) error {
|
||||||
|
return publish(ch1, msg)
|
||||||
|
})
|
||||||
|
|
||||||
|
bus.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: "topic*"}, func(ctx context.Context, msg *pubsub.NewMessage) error {
|
||||||
|
return publish(ch2, msg)
|
||||||
|
})
|
||||||
|
|
||||||
|
bus.Publish(&pubsub.PublishRequest{Data: []byte("1"), Topic: "mytopic"})
|
||||||
|
assert.Equal(t, "1", string(<-ch1))
|
||||||
|
|
||||||
|
bus.Publish(&pubsub.PublishRequest{Data: []byte("2"), Topic: "topic1"})
|
||||||
|
assert.Equal(t, "2", string(<-ch2))
|
||||||
|
|
||||||
|
bus.Publish(&pubsub.PublishRequest{Data: []byte("3"), Topic: "topicX"})
|
||||||
|
assert.Equal(t, "3", string(<-ch2))
|
||||||
|
}
|
||||||
|
|
||||||
func TestRetry(t *testing.T) {
|
func TestRetry(t *testing.T) {
|
||||||
bus := New(logger.NewLogger("test"))
|
bus := New(logger.NewLogger("test"))
|
||||||
bus.Init(pubsub.Metadata{})
|
bus.Init(pubsub.Metadata{})
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue