Conformance Test: PubSub - Ignore already processed messages (#722)

* Conformance Test: PubSub - Ignore already processed messages

* Fixing a race condition in StartHTTPServer
This commit is contained in:
Phil Kedy 2021-02-26 04:32:00 -05:00 committed by GitHub
parent e8efbca072
commit df1d80d411
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 21 deletions

View File

@ -9,6 +9,7 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -112,13 +113,15 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
// from being considered as part of this test. // from being considered as part of this test.
runID := uuid.Must(uuid.NewRandom()).String() runID := uuid.Must(uuid.NewRandom()).String()
awaitingMessages := make(map[string]struct{}, 20) awaitingMessages := make(map[string]struct{}, 20)
var mu sync.Mutex
processedMessages := make(map[int]struct{}, 20)
processedC := make(chan string, config.messageCount*2) processedC := make(chan string, config.messageCount*2)
errorCount := 0 errorCount := 0
dataPrefix := "message-" + runID + "-" dataPrefix := "message-" + runID + "-"
var outOfOrder bool var outOfOrder bool
// Subscribe // Subscribe
if config.HasOperation("subscribe") { if config.HasOperation("subscribe") { // nolint: nestif
t.Run("subscribe", func(t *testing.T) { t.Run("subscribe", func(t *testing.T) {
var counter int var counter int
var lastSequence int var lastSequence int
@ -133,8 +136,6 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
return nil return nil
} }
counter++
sequence, err := strconv.Atoi(dataString[len(dataPrefix):]) sequence, err := strconv.Atoi(dataString[len(dataPrefix):])
if err != nil { if err != nil {
t.Logf("Message did not contain a sequence number") t.Logf("Message did not contain a sequence number")
@ -143,6 +144,20 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
return err return err
} }
// Ignore already processed messages
// in case we receive a redelivery from the broker
// during retries.
mu.Lock()
_, alreadyProcessed := processedMessages[sequence]
mu.Unlock()
if alreadyProcessed {
t.Logf("Message was already processed: %d", sequence)
return nil
}
counter++
if sequence < lastSequence { if sequence < lastSequence {
outOfOrder = true outOfOrder = true
t.Logf("Message received out of order: expected sequence >= %d, got %d", lastSequence, sequence) t.Logf("Message received out of order: expected sequence >= %d, got %d", lastSequence, sequence)
@ -165,6 +180,10 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
t.Logf("Simulating subscriber success") t.Logf("Simulating subscriber success")
actualReadCount++ actualReadCount++
mu.Lock()
processedMessages[sequence] = struct{}{}
mu.Unlock()
processedC <- dataString processedC <- dataString
return nil return nil

View File

@ -6,13 +6,13 @@
package utils package utils
import ( import (
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"net/http/httptest"
"os" "os"
"os/signal" "os/signal"
"time"
"github.com/dapr/dapr/pkg/logger" "github.com/dapr/dapr/pkg/logger"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -50,32 +50,33 @@ func (cc CommonConfig) CopyMap(config map[string]string) map[string]string {
} }
func StartHTTPServer(port int, ready chan bool) { func StartHTTPServer(port int, ready chan bool) {
s = server{} l, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
srv := http.Server{ if err != nil {
Addr: fmt.Sprintf(":%d", port), testLogger.Errorf("Error starting test HTTP serer: %v", err)
Handler: appRouter(),
return
} }
testLogger.Info(("Starting HTTP Server")) testLogger.Info(("Starting HTTP Server"))
go func() { ts := httptest.NewUnstartedServer(appRouter())
if err := srv.ListenAndServe(); err != nil { // NewUnstartedServer creates a listener. Close that listener and replace
testLogger.Errorf("Error with http server: %s", err.Error()) // with the one we created.
} ts.Listener.Close()
}() ts.Listener = l
// Start the server.
ts.Start()
defer ts.Close()
ready <- true
testLogger.Info(("Registering Signal")) testLogger.Info(("Registering Signal"))
stop := make(chan os.Signal, 1) stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt) signal.Notify(stop, os.Interrupt)
ready <- true
testLogger.Info(("Waiting to stop Server")) testLogger.Info(("Waiting to stop Server"))
<-stop <-stop
testLogger.Info(("Stopping Server")) testLogger.Info(("Stopping Server"))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
testLogger.Errorf("Error shutting down http server: %s", err.Error())
}
} }
func appRouter() *mux.Router { func appRouter() *mux.Router {