Merge branch 'master' into fix_mqtt_bindings_deadlock

This commit is contained in:
Long Dai 2021-10-27 12:04:03 +08:00 committed by GitHub
commit 5073f82e65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 143 additions and 46 deletions

View File

@ -9,13 +9,16 @@ import (
"context"
"encoding/json"
"strings"
"sync/atomic"
"time"
servicebus "github.com/Azure/azure-service-bus-go"
"github.com/cenkalti/backoff/v4"
"github.com/dapr/components-contrib/bindings"
contrib_metadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/retry"
)
const (
@ -29,10 +32,13 @@ const (
// AzureServiceBusQueues is an input/output binding reading from and sending events to Azure Service Bus queues.
type AzureServiceBusQueues struct {
metadata *serviceBusQueuesMetadata
client *servicebus.Queue
logger logger.Logger
metadata *serviceBusQueuesMetadata
ns *servicebus.Namespace
queue *servicebus.QueueEntity
shutdownSignal int32
logger logger.Logger
ctx context.Context
cancel context.CancelFunc
}
type serviceBusQueuesMetadata struct {
@ -60,6 +66,7 @@ func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) error {
if err != nil {
return err
}
a.ns = ns
qm := ns.NewQueueManager()
@ -91,18 +98,16 @@ func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) error {
if !ok {
ttl = a.metadata.ttl
}
entity, err = qm.Put(ctx, a.metadata.QueueName, servicebus.QueueEntityWithMessageTimeToLive(&ttl))
if err != nil {
return err
}
}
a.queue = entity
client, err := ns.NewQueue(entity.Name)
if err != nil {
return err
}
a.client = client
a.clearShutdown()
a.ctx, a.cancel = context.WithCancel(context.Background())
return nil
}
@ -145,6 +150,12 @@ func (a *AzureServiceBusQueues) Invoke(req *bindings.InvokeRequest) (*bindings.I
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
client, err := a.ns.NewQueue(a.queue.Name)
if err != nil {
return nil, err
}
defer client.Close(context.Background())
msg := servicebus.NewMessage(req.Data)
if val, ok := req.Metadata[id]; ok && val != "" {
msg.ID = val
@ -162,7 +173,7 @@ func (a *AzureServiceBusQueues) Invoke(req *bindings.InvokeRequest) (*bindings.I
msg.TTL = &ttl
}
return nil, a.client.Send(ctx, msg)
return nil, client.Send(ctx, msg)
}
func (a *AzureServiceBusQueues) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error {
@ -178,13 +189,64 @@ func (a *AzureServiceBusQueues) Read(handler func(*bindings.ReadResponse) ([]byt
return msg.Abandon(ctx)
}
if err := a.client.Receive(context.Background(), sbHandler); err != nil {
return err
}
// Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling.
connConfig := retry.DefaultConfig()
connConfig.Policy = retry.PolicyExponential
connConfig.MaxInterval, _ = time.ParseDuration("5m")
connBackoff := connConfig.NewBackOffWithContext(a.ctx)
for !a.isShutdown() {
client := a.attemptConnectionForever(connBackoff)
if client == nil {
a.logger.Errorf("Failed to connect to Azure Service Bus Queue.")
continue
}
defer client.Close(context.Background())
if err := client.Receive(a.ctx, sbHandler); err != nil {
a.logger.Warnf("Error reading from Azure Service Bus Queue binding: %s", err.Error())
}
}
return nil
}
func (a *AzureServiceBusQueues) Close() error {
return a.client.Close(context.Background())
func (a *AzureServiceBusQueues) attemptConnectionForever(backoff backoff.BackOff) *servicebus.Queue {
var client *servicebus.Queue
retry.NotifyRecover(func() error {
clientAttempt, err := a.ns.NewQueue(a.queue.Name)
if err != nil {
return err
}
client = clientAttempt
return nil
}, backoff,
func(err error, d time.Duration) {
a.logger.Debugf("Failed to connect to Azure Service Bus Queue Binding with error: %s", err.Error())
},
func() {
a.logger.Debug("Successfully reconnected to Azure Service Bus.")
backoff.Reset()
})
return client
}
func (a *AzureServiceBusQueues) Close() error {
defer a.cancel()
a.logger.Info("Shutdown called!")
a.setShutdown()
return nil
}
func (a *AzureServiceBusQueues) setShutdown() {
atomic.CompareAndSwapInt32(&a.shutdownSignal, 0, 1)
}
func (a *AzureServiceBusQueues) clearShutdown() {
atomic.CompareAndSwapInt32(&a.shutdownSignal, 1, 0)
}
func (a *AzureServiceBusQueues) isShutdown() bool {
val := atomic.LoadInt32(&a.shutdownSignal)
return val == 1
}

View File

@ -103,7 +103,7 @@ func (m *Middleware) GetHandler(metadata middleware.Metadata) (func(h fasthttp.R
return func(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(ctx *fasthttp.RequestCtx) {
if handled := m.evalRequest(ctx, meta, &query); handled {
if allow := m.evalRequest(ctx, meta, &query); !allow {
return
}
h(ctx)

View File

@ -12,7 +12,11 @@ import (
"github.com/dapr/kit/logger"
)
func mockedRequestHandler(ctx *fh.RequestCtx) {}
// mockedRequestHandler acts like an upstream service returns success status code 200 and a fixed response body.
func mockedRequestHandler(ctx *fh.RequestCtx) {
ctx.Response.SetStatusCode(200)
ctx.Response.SetBody([]byte("from mock"))
}
type RequestConfiguator func(*fh.RequestCtx)

View File

@ -228,7 +228,38 @@ func (r *StateStore) writeFile(req *state.SetRequest) error {
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key))
// this is for backward compatibility where it might have come from http request
blobHTTPHeaders, err := createBlobHTTPHeadersFromRequest(req)
if err != nil {
return err
}
_, err = azblob.UploadBufferToBlockBlob(context.Background(), r.marshal(req), blobURL, azblob.UploadToBlockBlobOptions{
Parallelism: 16,
Metadata: req.Metadata,
AccessConditions: accessConditions,
BlobHTTPHeaders: blobHTTPHeaders,
})
if err != nil {
r.logger.Debugf("write file %s, err %s", req.Key, err)
if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err)
}
return err
}
return nil
}
func createBlobHTTPHeadersFromRequest(req *state.SetRequest) (azblob.BlobHTTPHeaders, error) {
var blobHTTPHeaders azblob.BlobHTTPHeaders
if req.ContentType != "" {
blobHTTPHeaders.ContentType = req.ContentType
}
if val, ok := req.Metadata[contentType]; ok && val != "" {
blobHTTPHeaders.ContentType = val
delete(req.Metadata, contentType)
@ -236,7 +267,7 @@ func (r *StateStore) writeFile(req *state.SetRequest) error {
if val, ok := req.Metadata[contentMD5]; ok && val != "" {
sDec, err := b64.StdEncoding.DecodeString(val)
if err != nil || len(sDec) != 16 {
return fmt.Errorf("the MD5 value specified in Content MD5 is invalid, MD5 value must be 128 bits and base64 encoded")
return azblob.BlobHTTPHeaders{}, fmt.Errorf("the MD5 value specified in Content MD5 is invalid, MD5 value must be 128 bits and base64 encoded")
}
blobHTTPHeaders.ContentMD5 = sDec
delete(req.Metadata, contentMD5)
@ -257,24 +288,7 @@ func (r *StateStore) writeFile(req *state.SetRequest) error {
blobHTTPHeaders.CacheControl = val
delete(req.Metadata, cacheControl)
}
_, err := azblob.UploadBufferToBlockBlob(context.Background(), r.marshal(req), blobURL, azblob.UploadToBlockBlobOptions{
Parallelism: 16,
Metadata: req.Metadata,
AccessConditions: accessConditions,
BlobHTTPHeaders: blobHTTPHeaders,
})
if err != nil {
r.logger.Debugf("write file %s, err %s", req.Key, err)
if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err)
}
return err
}
return nil
return blobHTTPHeaders, nil
}
func (r *StateStore) deleteFile(req *state.DeleteRequest) error {

View File

@ -71,3 +71,15 @@ func TestFileName(t *testing.T) {
assert.Equal(t, "key", key)
})
}
func TestBlobHTTPHeaderGeneration(t *testing.T) {
t.Run("Content type is set from request", func(t *testing.T) {
req := &state.SetRequest{
ContentType: "application/json",
}
blobHeaders, err := createBlobHTTPHeadersFromRequest(req)
assert.Nil(t, err)
assert.Equal(t, "application/json", blobHeaders.ContentType)
})
}

View File

@ -45,11 +45,12 @@ type DeleteStateOption struct {
// SetRequest is the object describing an upsert request.
type SetRequest struct {
Key string `json:"key"`
Value interface{} `json:"value"`
ETag *string `json:"etag,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
Options SetStateOption `json:"options,omitempty"`
Key string `json:"key"`
Value interface{} `json:"value"`
ETag *string `json:"etag,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
Options SetStateOption `json:"options,omitempty"`
ContentType string `json:"contentType,omitempty"`
}
// GetKey gets the Key on a SetRequest.

View File

@ -7,9 +7,10 @@ package state
// GetResponse is the response object for getting state.
type GetResponse struct {
Data []byte `json:"data"`
ETag *string `json:"etag,omitempty"`
Metadata map[string]string `json:"metadata"`
Data []byte `json:"data"`
ETag *string `json:"etag,omitempty"`
Metadata map[string]string `json:"metadata"`
ContentType string `json:"contentType,omitempty"`
}
// BulkGetResponse is the response object for bulk get response.

View File

@ -78,9 +78,11 @@ func InterruptNetwork(duration time.Duration, ipv4s []string, ipv6s []string, po
DryRun: false,
})
alreadyCleanedUp := false
t := time.NewTimer(duration)
defer func() {
if !t.Stop() {
if !t.Stop() && !alreadyCleanedUp {
<-t.C
}
}()
@ -89,6 +91,7 @@ func InterruptNetwork(duration time.Duration, ipv4s []string, ipv6s []string, po
case <-ctx.Done():
case <-t.C:
}
alreadyCleanedUp = true
throttler.Run(&throttler.Config{
Device: "",
Stop: true,