Configurable timeouts
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
6aa9ccc364
commit
135d460b05
|
@ -20,7 +20,6 @@ import (
|
|||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"golang.org/x/exp/slices"
|
||||
|
@ -31,7 +30,6 @@ import (
|
|||
)
|
||||
|
||||
// Link to the documentation for the component
|
||||
// TODO: Add link to docs
|
||||
const componentDocsURL = "https://docs.dapr.io/reference/components-reference/supported-bindings/cfqueues/"
|
||||
|
||||
// CFQueues is a binding for publishing messages on Cloudflare Queues
|
||||
|
@ -97,7 +95,7 @@ func (q *CFQueues) invokePublish(parentCtx context.Context, ir *bindings.InvokeR
|
|||
return nil, fmt.Errorf("failed to create authorization token: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second)
|
||||
ctx, cancel := context.WithTimeout(parentCtx, q.metadata.Timeout)
|
||||
defer cancel()
|
||||
|
||||
d, err := strconv.Unquote(string(ir.Data))
|
||||
|
|
|
@ -27,16 +27,20 @@ import (
|
|||
"github.com/lestrrat-go/jwx/v2/jwt"
|
||||
)
|
||||
|
||||
// Default timeout for network requests.
|
||||
const defaultTimeout = 20 * time.Second
|
||||
|
||||
// Base metadata struct, common to all components
|
||||
// The components can be initialized in two ways:
|
||||
// - Instantiate the component with a "workerURL": assumes a worker that has been pre-deployed and it's ready to be used; we will not need API tokens
|
||||
// - Instantiate the component with a "cfAPIToken" and "cfAccountID": Dapr will take care of creating the worker if it doesn't exist (or upgrade it if needed)
|
||||
type BaseMetadata struct {
|
||||
WorkerURL string `mapstructure:"workerUrl"`
|
||||
CfAPIToken string `mapstructure:"cfAPIToken"`
|
||||
CfAccountID string `mapstructure:"cfAccountID"`
|
||||
Key string `mapstructure:"key"`
|
||||
WorkerName string `mapstructure:"workerName"`
|
||||
WorkerURL string `mapstructure:"workerUrl"`
|
||||
CfAPIToken string `mapstructure:"cfAPIToken"`
|
||||
CfAccountID string `mapstructure:"cfAccountID"`
|
||||
Key string `mapstructure:"key"`
|
||||
WorkerName string `mapstructure:"workerName"`
|
||||
Timeout time.Duration `mapstructure:"timeout"`
|
||||
|
||||
privKey ed25519.PrivateKey
|
||||
}
|
||||
|
@ -62,6 +66,11 @@ func (m *BaseMetadata) Validate() error {
|
|||
return errors.New("invalid component metadata: either 'workerUrl' or the combination of 'cfAPIToken'/'cfAccountID' is required")
|
||||
}
|
||||
|
||||
// Timeout
|
||||
if m.Timeout < time.Second {
|
||||
m.Timeout = defaultTimeout
|
||||
}
|
||||
|
||||
// WorkerName
|
||||
if m.WorkerName == "" {
|
||||
return errors.New("property 'workerName' is required")
|
||||
|
|
|
@ -59,7 +59,7 @@ type Base struct {
|
|||
func (w *Base) Init(workerBindings []CFBinding, componentDocsURL string, infoResponseValidate func(*InfoEndpointResponse) error) (err error) {
|
||||
w.ctx, w.cancel = context.WithCancel(context.Background())
|
||||
w.client = &http.Client{
|
||||
Timeout: time.Second * 30,
|
||||
Timeout: w.metadata.Timeout,
|
||||
}
|
||||
w.componentDocsURL = componentDocsURL
|
||||
w.infoResponseValidate = infoResponseValidate
|
||||
|
@ -343,7 +343,7 @@ func (w *Base) checkWorker(workerURL string) error {
|
|||
return fmt.Errorf("failed to create authorization token: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(w.ctx, 30*time.Second)
|
||||
ctx, cancel := context.WithTimeout(w.ctx, w.metadata.Timeout)
|
||||
defer cancel()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, workerURL+".well-known/dapr/info", nil)
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"golang.org/x/exp/slices"
|
||||
|
@ -97,7 +96,7 @@ func (q *CFWorkersKV) Delete(parentCtx context.Context, stateReq *state.DeleteRe
|
|||
return fmt.Errorf("failed to create authorization token: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second)
|
||||
ctx, cancel := context.WithTimeout(parentCtx, q.metadata.Timeout)
|
||||
defer cancel()
|
||||
|
||||
u := q.metadata.WorkerURL + "kv/" + q.metadata.KVNamespaceID + "/" + url.PathEscape(stateReq.Key)
|
||||
|
@ -128,7 +127,7 @@ func (q *CFWorkersKV) Get(parentCtx context.Context, stateReq *state.GetRequest)
|
|||
return nil, fmt.Errorf("failed to create authorization token: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second)
|
||||
ctx, cancel := context.WithTimeout(parentCtx, q.metadata.Timeout)
|
||||
defer cancel()
|
||||
|
||||
u := q.metadata.WorkerURL + "kv/" + q.metadata.KVNamespaceID + "/" + url.PathEscape(stateReq.Key)
|
||||
|
@ -171,7 +170,7 @@ func (q *CFWorkersKV) Set(parentCtx context.Context, stateReq *state.SetRequest)
|
|||
return fmt.Errorf("failed to create authorization token: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second)
|
||||
ctx, cancel := context.WithTimeout(parentCtx, q.metadata.Timeout)
|
||||
defer cancel()
|
||||
|
||||
u := q.metadata.WorkerURL + "kv/" + q.metadata.KVNamespaceID + "/" + url.PathEscape(stateReq.Key)
|
||||
|
|
Loading…
Reference in New Issue