diff --git a/pkg/search/proxy/store/util.go b/pkg/search/proxy/store/util.go index d8faad7a4..dbb765959 100644 --- a/pkg/search/proxy/store/util.go +++ b/pkg/search/proxy/store/util.go @@ -195,9 +195,21 @@ func (w *watchMux) AddSource(watcher watch.Interface, decorator func(watch.Event // Start run the watcher func (w *watchMux) Start() { - for _, source := range w.sources { - go w.startWatchSource(source.watcher, source.decorator) + wg := sync.WaitGroup{} + for i := range w.sources { + source := w.sources[i] + wg.Add(1) + go func() { + defer wg.Done() + w.startWatchSource(source.watcher, source.decorator) + }() } + + go func() { + // close result chan after all goroutines exit, avoiding data race. + defer close(w.result) + wg.Wait() + }() } // ResultChan implements watch.Interface @@ -220,7 +232,6 @@ func (w *watchMux) Stop() { case <-w.done: default: close(w.done) - close(w.result) } } @@ -246,19 +257,8 @@ func (w *watchMux) startWatchSource(source watch.Interface, decorator func(watch select { case <-w.done: return - default: + case w.result <- copyEvent: } - - func() { - w.lock.RLock() - defer w.lock.RUnlock() - select { - case <-w.done: - return - default: - w.result <- copyEvent - } - }() } }