Fix existing integration tests (#1173)
* Fix servicebusqueues_integration_test.go compile errors * Fix postgresql_integration_test.go deleteItemThatDoesNotExist expectations * Fix mysql_integration_test.go deleteItemThatDoesNotExist expectations * Fix mysql_integration_test.go for bindings * Increase TTL in ServiceBusQueues tests for stability
This commit is contained in:
parent
e41acc494c
commit
9a075cb43a
|
|
@ -1,3 +1,4 @@
|
||||||
|
//go:build integration_test
|
||||||
// +build integration_test
|
// +build integration_test
|
||||||
|
|
||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
|
@ -25,6 +26,7 @@ import (
|
||||||
const (
|
const (
|
||||||
// Environment variable containing the connection string to Azure Service Bus
|
// Environment variable containing the connection string to Azure Service Bus
|
||||||
testServiceBusEnvKey = "DAPR_TEST_AZURE_SERVICEBUS"
|
testServiceBusEnvKey = "DAPR_TEST_AZURE_SERVICEBUS"
|
||||||
|
ttlInSeconds = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
func getTestServiceBusConnectionString() string {
|
func getTestServiceBusConnectionString() string {
|
||||||
|
|
@ -60,12 +62,12 @@ func getMessageWithRetries(queue *servicebus.Queue, maxDuration time.Duration) (
|
||||||
|
|
||||||
func TestQueueWithTTL(t *testing.T) {
|
func TestQueueWithTTL(t *testing.T) {
|
||||||
serviceBusConnectionString := getTestServiceBusConnectionString()
|
serviceBusConnectionString := getTestServiceBusConnectionString()
|
||||||
assert.NotEmpty(serviceBusConnectionString, fmt.Sprintf("Azure ServiceBus connection string must set in environment variable '%s'", testServiceBusEnvKey))
|
assert.NotEmpty(t, serviceBusConnectionString, fmt.Sprintf("Azure ServiceBus connection string must set in environment variable '%s'", testServiceBusEnvKey))
|
||||||
|
|
||||||
queueName := uuid.New().String()
|
queueName := uuid.New().String()
|
||||||
a := NewAzureServiceBusQueues(logger.NewLogger("test"))
|
a := NewAzureServiceBusQueues(logger.NewLogger("test"))
|
||||||
m := bindings.Metadata{}
|
m := bindings.Metadata{}
|
||||||
m.Properties = map[string]string{"connectionString": serviceBusConnectionString, "queueName": queueName, metadata.TTLMetadataKey: "1"}
|
m.Properties = map[string]string{"connectionString": serviceBusConnectionString, "queueName": queueName, metadata.TTLMetadataKey: fmt.Sprintf("%d", ttlInSeconds)}
|
||||||
err := a.Init(m)
|
err := a.Init(m)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
|
@ -80,16 +82,15 @@ func TestQueueWithTTL(t *testing.T) {
|
||||||
|
|
||||||
queueEntity, err := qmr.Get(context.Background(), queueName)
|
queueEntity, err := qmr.Get(context.Background(), queueName)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.Equal(t, "PT1S", *queueEntity.DefaultMessageTimeToLive)
|
assert.Equal(t, fmt.Sprintf("PT%dS", ttlInSeconds), *queueEntity.DefaultMessageTimeToLive)
|
||||||
|
|
||||||
// Assert that if waited too long, we won't see any message
|
// Assert that if waited too long, we won't see any message
|
||||||
const tooLateMsgContent = "too_late_msg"
|
const tooLateMsgContent = "too_late_msg"
|
||||||
err = a.Write(&bindings.InvokeRequest{Data: []byte(tooLateMsgContent)})
|
_, err = a.Invoke(&bindings.InvokeRequest{Data: []byte(tooLateMsgContent)})
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
time.Sleep(time.Second * 2)
|
time.Sleep(time.Second * (ttlInSeconds + 2))
|
||||||
|
|
||||||
const ttlInSeconds = 1
|
|
||||||
const maxGetDuration = ttlInSeconds * time.Second
|
const maxGetDuration = ttlInSeconds * time.Second
|
||||||
|
|
||||||
_, ok, err := getMessageWithRetries(queue, maxGetDuration)
|
_, ok, err := getMessageWithRetries(queue, maxGetDuration)
|
||||||
|
|
@ -98,7 +99,7 @@ func TestQueueWithTTL(t *testing.T) {
|
||||||
|
|
||||||
// Getting before it is expired, should return it
|
// Getting before it is expired, should return it
|
||||||
const testMsgContent = "test_msg"
|
const testMsgContent = "test_msg"
|
||||||
err = a.Write(&bindings.InvokeRequest{Data: []byte(testMsgContent)})
|
_, err = a.Invoke(&bindings.InvokeRequest{Data: []byte(testMsgContent)})
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
msg, ok, err := getMessageWithRetries(queue, maxGetDuration)
|
msg, ok, err := getMessageWithRetries(queue, maxGetDuration)
|
||||||
|
|
@ -107,12 +108,12 @@ func TestQueueWithTTL(t *testing.T) {
|
||||||
msgBody := string(msg.Data)
|
msgBody := string(msg.Data)
|
||||||
assert.Equal(t, testMsgContent, msgBody)
|
assert.Equal(t, testMsgContent, msgBody)
|
||||||
assert.NotNil(t, msg.TTL)
|
assert.NotNil(t, msg.TTL)
|
||||||
assert.Equal(t, time.Second, *msg.TTL)
|
assert.Equal(t, ttlInSeconds*time.Second, *msg.TTL)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPublishingWithTTL(t *testing.T) {
|
func TestPublishingWithTTL(t *testing.T) {
|
||||||
serviceBusConnectionString := getTestServiceBusConnectionString()
|
serviceBusConnectionString := getTestServiceBusConnectionString()
|
||||||
assert.NotEmpty(serviceBusConnectionString, fmt.Sprintf("Azure ServiceBus connection string must set in environment variable '%s'", testServiceBusEnvKey))
|
assert.NotEmpty(t, serviceBusConnectionString, fmt.Sprintf("Azure ServiceBus connection string must set in environment variable '%s'", testServiceBusEnvKey))
|
||||||
|
|
||||||
queueName := uuid.New().String()
|
queueName := uuid.New().String()
|
||||||
queueBinding1 := NewAzureServiceBusQueues(logger.NewLogger("test"))
|
queueBinding1 := NewAzureServiceBusQueues(logger.NewLogger("test"))
|
||||||
|
|
@ -140,15 +141,14 @@ func TestPublishingWithTTL(t *testing.T) {
|
||||||
writeRequest := bindings.InvokeRequest{
|
writeRequest := bindings.InvokeRequest{
|
||||||
Data: []byte(tooLateMsgContent),
|
Data: []byte(tooLateMsgContent),
|
||||||
Metadata: map[string]string{
|
Metadata: map[string]string{
|
||||||
metadata.TTLMetadataKey: "1",
|
metadata.TTLMetadataKey: fmt.Sprintf("%d", ttlInSeconds),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err = queueBinding1.Write(&writeRequest)
|
_, err = queueBinding1.Invoke(&writeRequest)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
time.Sleep(time.Second * 5)
|
time.Sleep(time.Second * (ttlInSeconds + 2))
|
||||||
|
|
||||||
const ttlInSeconds = 1
|
|
||||||
const maxGetDuration = ttlInSeconds * time.Second
|
const maxGetDuration = ttlInSeconds * time.Second
|
||||||
|
|
||||||
_, ok, err := getMessageWithRetries(queue, maxGetDuration)
|
_, ok, err := getMessageWithRetries(queue, maxGetDuration)
|
||||||
|
|
@ -164,10 +164,10 @@ func TestPublishingWithTTL(t *testing.T) {
|
||||||
writeRequest = bindings.InvokeRequest{
|
writeRequest = bindings.InvokeRequest{
|
||||||
Data: []byte(testMsgContent),
|
Data: []byte(testMsgContent),
|
||||||
Metadata: map[string]string{
|
Metadata: map[string]string{
|
||||||
metadata.TTLMetadataKey: "1",
|
metadata.TTLMetadataKey: fmt.Sprintf("%d", ttlInSeconds),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err = queueBinding2.Write(&writeRequest)
|
_, err = queueBinding2.Invoke(&writeRequest)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
msg, ok, err := getMessageWithRetries(queue, maxGetDuration)
|
msg, ok, err := getMessageWithRetries(queue, maxGetDuration)
|
||||||
|
|
@ -177,5 +177,5 @@ func TestPublishingWithTTL(t *testing.T) {
|
||||||
assert.Equal(t, testMsgContent, msgBody)
|
assert.Equal(t, testMsgContent, msgBody)
|
||||||
assert.NotNil(t, msg.TTL)
|
assert.NotNil(t, msg.TTL)
|
||||||
|
|
||||||
assert.Equal(t, time.Second, *msg.TTL)
|
assert.Equal(t, ttlInSeconds*time.Second, *msg.TTL)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
// MySQL doesn't accept RFC3339 formatted time, rejects trailing 'Z' for UTC indicator.
|
||||||
|
mySQLDateTimeFormat = "2006-01-02 15:04:05"
|
||||||
|
|
||||||
testCreateTable = `CREATE TABLE IF NOT EXISTS foo (
|
testCreateTable = `CREATE TABLE IF NOT EXISTS foo (
|
||||||
id bigint NOT NULL,
|
id bigint NOT NULL,
|
||||||
v1 character varying(50) NOT NULL,
|
v1 character varying(50) NOT NULL,
|
||||||
|
|
@ -85,7 +88,7 @@ func TestMysqlIntegration(t *testing.T) {
|
||||||
t.Run("Invoke insert", func(t *testing.T) {
|
t.Run("Invoke insert", func(t *testing.T) {
|
||||||
req.Operation = execOperation
|
req.Operation = execOperation
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
req.Metadata[commandSQLKey] = fmt.Sprintf(testInsert, i, i, true, time.Now().Format(time.RFC3339))
|
req.Metadata[commandSQLKey] = fmt.Sprintf(testInsert, i, i, true, time.Now().Format(mySQLDateTimeFormat))
|
||||||
res, err := b.Invoke(req)
|
res, err := b.Invoke(req)
|
||||||
assertResponse(t, res, err)
|
assertResponse(t, res, err)
|
||||||
}
|
}
|
||||||
|
|
@ -94,7 +97,7 @@ func TestMysqlIntegration(t *testing.T) {
|
||||||
t.Run("Invoke update", func(t *testing.T) {
|
t.Run("Invoke update", func(t *testing.T) {
|
||||||
req.Operation = execOperation
|
req.Operation = execOperation
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
req.Metadata[commandSQLKey] = fmt.Sprintf(testUpdate, time.Now().Format(time.RFC3339), i)
|
req.Metadata[commandSQLKey] = fmt.Sprintf(testUpdate, time.Now().Format(mySQLDateTimeFormat), i)
|
||||||
res, err := b.Invoke(req)
|
res, err := b.Invoke(req)
|
||||||
assertResponse(t, res, err)
|
assertResponse(t, res, err)
|
||||||
}
|
}
|
||||||
|
|
@ -154,5 +157,7 @@ func TestMysqlIntegration(t *testing.T) {
|
||||||
func assertResponse(t *testing.T, res *bindings.InvokeResponse, err error) {
|
func assertResponse(t *testing.T, res *bindings.InvokeResponse, err error) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.NotNil(t, res)
|
assert.NotNil(t, res)
|
||||||
assert.NotNil(t, res.Metadata)
|
if res != nil {
|
||||||
|
assert.NotNil(t, res.Metadata)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -255,13 +255,13 @@ func multiWithDeleteAndSet(t *testing.T, mys *MySQL) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteItemThatDoesNotExist(t *testing.T, mys *MySQL) {
|
func deleteItemThatDoesNotExist(t *testing.T, mys *MySQL) {
|
||||||
// Delete the item with a fake eTag
|
// Delete the item with a key not in the store
|
||||||
deleteReq := &state.DeleteRequest{
|
deleteReq := &state.DeleteRequest{
|
||||||
Key: randomKey(),
|
Key: randomKey(),
|
||||||
}
|
}
|
||||||
|
|
||||||
err := mys.Delete(deleteReq)
|
err := mys.Delete(deleteReq)
|
||||||
assert.NotNil(t, err)
|
assert.Nil(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteWithNoKeyFails(t *testing.T, mys *MySQL) {
|
func deleteWithNoKeyFails(t *testing.T, mys *MySQL) {
|
||||||
|
|
|
||||||
|
|
@ -177,12 +177,12 @@ func dropTable(t *testing.T, db *sql.DB, tableName string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteItemThatDoesNotExist(t *testing.T, pgs *PostgreSQL) {
|
func deleteItemThatDoesNotExist(t *testing.T, pgs *PostgreSQL) {
|
||||||
// Delete the item with a fake etag
|
// Delete the item with a key not in the store
|
||||||
deleteReq := &state.DeleteRequest{
|
deleteReq := &state.DeleteRequest{
|
||||||
Key: randomKey(),
|
Key: randomKey(),
|
||||||
}
|
}
|
||||||
err := pgs.Delete(deleteReq)
|
err := pgs.Delete(deleteReq)
|
||||||
assert.NotNil(t, err)
|
assert.Nil(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func multiWithSetOnly(t *testing.T, pgs *PostgreSQL) {
|
func multiWithSetOnly(t *testing.T, pgs *PostgreSQL) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue