Some fixes
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
5ad0388de3
commit
a84ab2d0f6
|
@ -78,7 +78,7 @@ func parseEventHubsMetadata(meta map[string]string, isBinding bool, log logger.L
|
|||
if isBinding {
|
||||
if m.ConnectionString == "" {
|
||||
if m.EventHub == "" {
|
||||
return nil, errors.New("property eventHub is required when connecting with a connection string")
|
||||
return nil, errors.New("property 'eventHub' is required when connecting with Azure AD")
|
||||
}
|
||||
m.hubName = m.EventHub
|
||||
} else {
|
||||
|
@ -88,7 +88,7 @@ func parseEventHubsMetadata(meta map[string]string, isBinding bool, log logger.L
|
|||
} else if m.EventHub != "" {
|
||||
m.hubName = m.EventHub
|
||||
} else {
|
||||
return nil, errors.New("the provided connection string does not contain a value for 'EntityPath' and no eventHub property was passed")
|
||||
return nil, errors.New("the provided connection string does not contain a value for 'EntityPath' and no 'eventHub' property was passed")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,5 @@ spec:
|
|||
secretKeyRef:
|
||||
name: AzureEventHubsBindingsContainer
|
||||
value: AzureEventHubsBindingsContainer
|
||||
- name: partitionID
|
||||
value: 0
|
||||
auth:
|
||||
secretStore: envvar-secret-store
|
|
@ -29,6 +29,8 @@ components:
|
|||
outputData: '{"id": "$((uuid))", "orderid": "abcdef-test", "partitionKey": "partitionValue", "nestedproperty": {"subproperty": "something of value for testing"}, "description": "conformance test item"}'
|
||||
- component: azure.eventhubs
|
||||
operations: ["create", "operations", "read"]
|
||||
config:
|
||||
readBindingWait: 15s
|
||||
- component: azure.eventgrid
|
||||
operations: ["create", "operations", "read"]
|
||||
config:
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -189,16 +190,17 @@ func ConformanceTests(t *testing.T, props map[string]string, inputBinding bindin
|
|||
})
|
||||
}
|
||||
|
||||
inputBindingCall := 0
|
||||
readChan := make(chan int)
|
||||
inputBindingCall := atomic.Int32{}
|
||||
readChan := make(chan int, 1)
|
||||
readCtx, readCancel := context.WithCancel(context.Background())
|
||||
defer readCancel()
|
||||
if config.HasOperation("read") {
|
||||
t.Run("read", func(t *testing.T) {
|
||||
testLogger.Info("Read test running ...")
|
||||
err := inputBinding.Read(readCtx, func(ctx context.Context, r *bindings.ReadResponse) ([]byte, error) {
|
||||
inputBindingCall++
|
||||
readChan <- inputBindingCall
|
||||
t.Logf("Read message: %s", string(r.Data))
|
||||
v := inputBindingCall.Add(1)
|
||||
readChan <- int(v)
|
||||
|
||||
return nil, nil
|
||||
})
|
||||
|
@ -208,6 +210,7 @@ func ConformanceTests(t *testing.T, props map[string]string, inputBinding bindin
|
|||
// Need a small wait here because with brokers like MQTT
|
||||
// if you publish before there is a consumer, the message is thrown out
|
||||
// Currently, there is no way to know when Read is successfully subscribed.
|
||||
t.Logf("Sleeping for %v", config.ReadBindingWait)
|
||||
time.Sleep(config.ReadBindingWait)
|
||||
}
|
||||
|
||||
|
@ -262,7 +265,7 @@ func ConformanceTests(t *testing.T, props map[string]string, inputBinding bindin
|
|||
assert.Greater(t, inputBindingCall, 0)
|
||||
testLogger.Info("Read channel signalled.")
|
||||
case <-time.After(config.ReadBindingTimeout):
|
||||
assert.Greater(t, inputBindingCall, 0)
|
||||
assert.Greaterf(t, inputBindingCall, 0, "Timed out after %v while reading", config.ReadBindingTimeout)
|
||||
testLogger.Info("Read timeout.")
|
||||
}
|
||||
testLogger.Info("Verify Read test done.")
|
||||
|
|
Loading…
Reference in New Issue