224 lines
6.3 KiB
Go
224 lines
6.3 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
/*
|
|
The MIT License (MIT)
|
|
Copyright (c) 2014 Alex Saskevich
|
|
*/
|
|
|
|
package eventbus
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
)
|
|
|
|
// BusSubscriber defines subscription-related bus behavior
|
|
type BusSubscriber interface {
|
|
Subscribe(topic string, fn interface{}) error
|
|
SubscribeAsync(topic string, fn interface{}, transactional bool) error
|
|
Unsubscribe(topic string, handler interface{}) error
|
|
WaitAsync()
|
|
}
|
|
|
|
// BusPublisher defines publishing-related bus behavior
|
|
type BusPublisher interface {
|
|
Publish(topic string, args ...interface{})
|
|
}
|
|
|
|
// Bus englobes global (subscribe, publish, control) bus behavior
|
|
type Bus interface {
|
|
BusSubscriber
|
|
BusPublisher
|
|
}
|
|
|
|
// EventBus - box for handlers and callbacks.
|
|
type EventBus struct {
|
|
enableWildcards bool
|
|
handlers map[string][]*eventHandler
|
|
lock sync.Mutex // a lock for the map
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
type eventHandler struct {
|
|
callBack reflect.Value
|
|
async bool
|
|
transactional bool
|
|
sync.Mutex // lock for an event handler - useful for running async callbacks serially
|
|
}
|
|
|
|
// New returns new EventBus with empty handlers.
|
|
func New(enableWildcards bool) Bus {
|
|
b := &EventBus{
|
|
enableWildcards,
|
|
make(map[string][]*eventHandler),
|
|
sync.Mutex{},
|
|
sync.WaitGroup{},
|
|
}
|
|
return Bus(b)
|
|
}
|
|
|
|
// doSubscribe handles the subscription logic and is utilized by the public Subscribe functions
|
|
func (bus *EventBus) doSubscribe(topic string, fn interface{}, handler *eventHandler) error {
|
|
bus.lock.Lock()
|
|
defer bus.lock.Unlock()
|
|
if !(reflect.TypeOf(fn).Kind() == reflect.Func) {
|
|
return fmt.Errorf("%s is not of type reflect.Func", reflect.TypeOf(fn).Kind())
|
|
}
|
|
bus.handlers[topic] = append(bus.handlers[topic], handler)
|
|
return nil
|
|
}
|
|
|
|
// Subscribe subscribes to a topic.
|
|
// Returns error if `fn` is not a function.
|
|
func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
|
|
return bus.doSubscribe(topic, fn, &eventHandler{
|
|
reflect.ValueOf(fn), false, false, sync.Mutex{},
|
|
})
|
|
}
|
|
|
|
// SubscribeAsync subscribes to a topic with an asynchronous callback
|
|
// Transactional determines whether subsequent callbacks for a topic are
|
|
// run serially (true) or concurrently (false)
|
|
// Returns error if `fn` is not a function.
|
|
func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error {
|
|
return bus.doSubscribe(topic, fn, &eventHandler{
|
|
reflect.ValueOf(fn), true, transactional, sync.Mutex{},
|
|
})
|
|
}
|
|
|
|
// getCallbacks returns the callback(s) registered for the topic
|
|
func (bus *EventBus) getCallbacks(topic string) []*eventHandler {
|
|
if !bus.enableWildcards {
|
|
handlers, ok := bus.handlers[topic]
|
|
if ok && len(handlers) > 0 {
|
|
// Make a hard copy to prevent the slice to be changed after this method
|
|
copyHandlers := make([]*eventHandler, len(handlers))
|
|
copy(copyHandlers, handlers)
|
|
return copyHandlers
|
|
}
|
|
return nil
|
|
}
|
|
|
|
handlers := []*eventHandler{}
|
|
for k, h := range bus.handlers {
|
|
if k == topic ||
|
|
(strings.HasSuffix(k, "*") && strings.HasPrefix(topic, k[0:len(k)-1]) && topic != k[0:len(k)-1]) {
|
|
handlers = append(handlers, h...)
|
|
continue
|
|
}
|
|
}
|
|
|
|
if len(handlers) == 0 {
|
|
return nil
|
|
}
|
|
return handlers
|
|
}
|
|
|
|
// Unsubscribe removes callback defined for a topic.
|
|
// Returns error if there are no callbacks subscribed to the topic.
|
|
func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error {
|
|
bus.lock.Lock()
|
|
defer bus.lock.Unlock()
|
|
|
|
if _, ok := bus.handlers[topic]; ok && len(bus.handlers[topic]) > 0 {
|
|
bus.removeHandler(topic, bus.findHandlerIdx(topic, reflect.ValueOf(handler)))
|
|
return nil
|
|
}
|
|
return fmt.Errorf("topic %s doesn't exist", topic)
|
|
}
|
|
|
|
// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
|
|
func (bus *EventBus) Publish(topic string, args ...interface{}) {
|
|
bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
|
|
defer bus.lock.Unlock()
|
|
|
|
handlers := bus.getCallbacks(topic)
|
|
if len(handlers) > 0 {
|
|
for _, handler := range handlers {
|
|
if !handler.async {
|
|
bus.doPublish(handler, topic, args...)
|
|
} else {
|
|
bus.wg.Add(1)
|
|
if handler.transactional {
|
|
bus.lock.Unlock()
|
|
handler.Lock()
|
|
bus.lock.Lock()
|
|
}
|
|
go bus.doPublishAsync(handler, topic, args...)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (bus *EventBus) doPublish(handler *eventHandler, topic string, args ...interface{}) {
|
|
passedArguments := bus.setUpPublish(handler, args...)
|
|
handler.callBack.Call(passedArguments)
|
|
}
|
|
|
|
func (bus *EventBus) doPublishAsync(handler *eventHandler, topic string, args ...interface{}) {
|
|
defer bus.wg.Done()
|
|
if handler.transactional {
|
|
defer handler.Unlock()
|
|
}
|
|
bus.doPublish(handler, topic, args...)
|
|
}
|
|
|
|
func (bus *EventBus) removeHandler(topic string, idx int) {
|
|
if _, ok := bus.handlers[topic]; !ok {
|
|
return
|
|
}
|
|
l := len(bus.handlers[topic])
|
|
|
|
if !(0 <= idx && idx < l) {
|
|
return
|
|
}
|
|
|
|
copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:])
|
|
bus.handlers[topic][l-1] = nil // or the zero value of T
|
|
bus.handlers[topic] = bus.handlers[topic][:l-1]
|
|
}
|
|
|
|
func (bus *EventBus) findHandlerIdx(topic string, callback reflect.Value) int {
|
|
if _, ok := bus.handlers[topic]; ok {
|
|
for idx, handler := range bus.handlers[topic] {
|
|
if handler.callBack.Type() == callback.Type() &&
|
|
handler.callBack.Pointer() == callback.Pointer() {
|
|
return idx
|
|
}
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func (bus *EventBus) setUpPublish(callback *eventHandler, args ...interface{}) []reflect.Value {
|
|
funcType := callback.callBack.Type()
|
|
passedArguments := make([]reflect.Value, len(args))
|
|
for i, v := range args {
|
|
if v == nil {
|
|
passedArguments[i] = reflect.New(funcType.In(i)).Elem()
|
|
} else {
|
|
passedArguments[i] = reflect.ValueOf(v)
|
|
}
|
|
}
|
|
|
|
return passedArguments
|
|
}
|
|
|
|
// WaitAsync waits for all async callbacks to complete
|
|
func (bus *EventBus) WaitAsync() {
|
|
bus.wg.Wait()
|
|
}
|