diff --git a/bindings/cloudflare/queues/cfqueues.go b/bindings/cloudflare/queues/cfqueues.go index 3cc823886..db8c89a62 100644 --- a/bindings/cloudflare/queues/cfqueues.go +++ b/bindings/cloudflare/queues/cfqueues.go @@ -20,7 +20,6 @@ import ( "io" "net/http" "strconv" - "time" "github.com/mitchellh/mapstructure" "golang.org/x/exp/slices" @@ -31,7 +30,6 @@ import ( ) // Link to the documentation for the component -// TODO: Add link to docs const componentDocsURL = "https://docs.dapr.io/reference/components-reference/supported-bindings/cfqueues/" // CFQueues is a binding for publishing messages on Cloudflare Queues @@ -97,7 +95,7 @@ func (q *CFQueues) invokePublish(parentCtx context.Context, ir *bindings.InvokeR return nil, fmt.Errorf("failed to create authorization token: %w", err) } - ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second) + ctx, cancel := context.WithTimeout(parentCtx, q.metadata.Timeout) defer cancel() d, err := strconv.Unquote(string(ir.Data)) diff --git a/internal/component/cloudflare/workers/metadata.go b/internal/component/cloudflare/workers/metadata.go index 443b98364..65356c2d5 100644 --- a/internal/component/cloudflare/workers/metadata.go +++ b/internal/component/cloudflare/workers/metadata.go @@ -27,16 +27,20 @@ import ( "github.com/lestrrat-go/jwx/v2/jwt" ) +// Default timeout for network requests. +const defaultTimeout = 20 * time.Second + // Base metadata struct, common to all components // The components can be initialized in two ways: // - Instantiate the component with a "workerURL": assumes a worker that has been pre-deployed and it's ready to be used; we will not need API tokens // - Instantiate the component with a "cfAPIToken" and "cfAccountID": Dapr will take care of creating the worker if it doesn't exist (or upgrade it if needed) type BaseMetadata struct { - WorkerURL string `mapstructure:"workerUrl"` - CfAPIToken string `mapstructure:"cfAPIToken"` - CfAccountID string `mapstructure:"cfAccountID"` - Key string `mapstructure:"key"` - WorkerName string `mapstructure:"workerName"` + WorkerURL string `mapstructure:"workerUrl"` + CfAPIToken string `mapstructure:"cfAPIToken"` + CfAccountID string `mapstructure:"cfAccountID"` + Key string `mapstructure:"key"` + WorkerName string `mapstructure:"workerName"` + Timeout time.Duration `mapstructure:"timeout"` privKey ed25519.PrivateKey } @@ -62,6 +66,11 @@ func (m *BaseMetadata) Validate() error { return errors.New("invalid component metadata: either 'workerUrl' or the combination of 'cfAPIToken'/'cfAccountID' is required") } + // Timeout + if m.Timeout < time.Second { + m.Timeout = defaultTimeout + } + // WorkerName if m.WorkerName == "" { return errors.New("property 'workerName' is required") diff --git a/internal/component/cloudflare/workers/workers.go b/internal/component/cloudflare/workers/workers.go index 2200f0b57..7859467d2 100644 --- a/internal/component/cloudflare/workers/workers.go +++ b/internal/component/cloudflare/workers/workers.go @@ -59,7 +59,7 @@ type Base struct { func (w *Base) Init(workerBindings []CFBinding, componentDocsURL string, infoResponseValidate func(*InfoEndpointResponse) error) (err error) { w.ctx, w.cancel = context.WithCancel(context.Background()) w.client = &http.Client{ - Timeout: time.Second * 30, + Timeout: w.metadata.Timeout, } w.componentDocsURL = componentDocsURL w.infoResponseValidate = infoResponseValidate @@ -343,7 +343,7 @@ func (w *Base) checkWorker(workerURL string) error { return fmt.Errorf("failed to create authorization token: %w", err) } - ctx, cancel := context.WithTimeout(w.ctx, 30*time.Second) + ctx, cancel := context.WithTimeout(w.ctx, w.metadata.Timeout) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodGet, workerURL+".well-known/dapr/info", nil) diff --git a/state/cloudflare/workerskv/workerskv.go b/state/cloudflare/workerskv/workerskv.go index 1aecd64d4..efc9d06ba 100644 --- a/state/cloudflare/workerskv/workerskv.go +++ b/state/cloudflare/workerskv/workerskv.go @@ -22,7 +22,6 @@ import ( "net/http" "net/url" "reflect" - "time" "github.com/mitchellh/mapstructure" "golang.org/x/exp/slices" @@ -97,7 +96,7 @@ func (q *CFWorkersKV) Delete(parentCtx context.Context, stateReq *state.DeleteRe return fmt.Errorf("failed to create authorization token: %w", err) } - ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second) + ctx, cancel := context.WithTimeout(parentCtx, q.metadata.Timeout) defer cancel() u := q.metadata.WorkerURL + "kv/" + q.metadata.KVNamespaceID + "/" + url.PathEscape(stateReq.Key) @@ -128,7 +127,7 @@ func (q *CFWorkersKV) Get(parentCtx context.Context, stateReq *state.GetRequest) return nil, fmt.Errorf("failed to create authorization token: %w", err) } - ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second) + ctx, cancel := context.WithTimeout(parentCtx, q.metadata.Timeout) defer cancel() u := q.metadata.WorkerURL + "kv/" + q.metadata.KVNamespaceID + "/" + url.PathEscape(stateReq.Key) @@ -171,7 +170,7 @@ func (q *CFWorkersKV) Set(parentCtx context.Context, stateReq *state.SetRequest) return fmt.Errorf("failed to create authorization token: %w", err) } - ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second) + ctx, cancel := context.WithTimeout(parentCtx, q.metadata.Timeout) defer cancel() u := q.metadata.WorkerURL + "kv/" + q.metadata.KVNamespaceID + "/" + url.PathEscape(stateReq.Key)