Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
fabf7fe5a7
commit
308f3c987c
|
@ -32,7 +32,7 @@ import (
|
|||
|
||||
// Link to the documentation for the component
|
||||
// TODO: Add link to docs
|
||||
const componentDocsUrl = "https://TODO"
|
||||
const componentDocsURL = "https://TODO"
|
||||
|
||||
// CFQueues is a binding for publishing messages on Cloudflare Queues
|
||||
type CFQueues struct {
|
||||
|
@ -68,11 +68,11 @@ func (q *CFQueues) Init(metadata bindings.Metadata) error {
|
|||
}
|
||||
infoResponseValidate := func(data *workers.InfoEndpointResponse) error {
|
||||
if !slices.Contains(data.Queues, q.metadata.QueueName) {
|
||||
return fmt.Errorf("the worker is not bound to the queue '%s'; please re-deploy the worker with the correct bindings per instructions in the documentation at %s", q.metadata.QueueName, componentDocsUrl)
|
||||
return fmt.Errorf("the worker is not bound to the queue '%s'; please re-deploy the worker with the correct bindings per instructions in the documentation at %s", q.metadata.QueueName, componentDocsURL)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return q.Base.Init(workerBindings, componentDocsUrl, infoResponseValidate)
|
||||
return q.Base.Init(workerBindings, componentDocsURL, infoResponseValidate)
|
||||
}
|
||||
|
||||
// Operations returns the supported operations for this binding.
|
||||
|
@ -105,7 +105,7 @@ func (q *CFQueues) invokePublish(parentCtx context.Context, ir *bindings.InvokeR
|
|||
ir.Data = []byte(d)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", q.metadata.WorkerURL+"queues/"+q.metadata.QueueName, bytes.NewReader(ir.Data))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, q.metadata.WorkerURL+"queues/"+q.metadata.QueueName, bytes.NewReader(ir.Data))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating network request: %w", err)
|
||||
}
|
||||
|
|
|
@ -18,13 +18,13 @@ import (
|
|||
_ "embed"
|
||||
)
|
||||
|
||||
// Contains the source code for the worker.
|
||||
// WorkerScript contains the source code for the worker.
|
||||
//
|
||||
//go:embed worker.js
|
||||
var WorkerScript []byte
|
||||
|
||||
// Value for compatibility_date.
|
||||
// CompatibilityDate is the value for compatibility_date.
|
||||
const CompatibilityDate = "2022-12-09"
|
||||
|
||||
// Value for usage_model.
|
||||
// UsageModel is the value for usage_model.
|
||||
const UsageModel = "bundled"
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -42,8 +41,6 @@ type BaseMetadata struct {
|
|||
privKey ed25519.PrivateKey
|
||||
}
|
||||
|
||||
var queueNameValidation = regexp.MustCompile("^([a-zA-Z0-9_\\-\\.]+)$")
|
||||
|
||||
// Validate the metadata object.
|
||||
func (m *BaseMetadata) Validate() error {
|
||||
// Option 1: check if we have a workerURL
|
||||
|
|
|
@ -38,7 +38,7 @@ const (
|
|||
// Minimum version required for the running Worker.
|
||||
minWorkerVersion = 20221209
|
||||
// Issuer for JWTs
|
||||
tokenIssuer = "dapr.io/cloudflare"
|
||||
tokenIssuer = "dapr.io/cloudflare" //nolint:gosec
|
||||
// JWT token expiration
|
||||
tokenExpiration = 5 * time.Minute
|
||||
)
|
||||
|
@ -47,7 +47,7 @@ const (
|
|||
type Base struct {
|
||||
metadata *BaseMetadata
|
||||
infoResponseValidate func(*InfoEndpointResponse) error
|
||||
componentDocsUrl string
|
||||
componentDocsURL string
|
||||
client *http.Client
|
||||
logger logger.Logger
|
||||
ctx context.Context
|
||||
|
@ -55,12 +55,12 @@ type Base struct {
|
|||
}
|
||||
|
||||
// Init the base class.
|
||||
func (w *Base) Init(workerBindings []Binding, componentDocsUrl string, infoResponseValidate func(*InfoEndpointResponse) error) (err error) {
|
||||
func (w *Base) Init(workerBindings []Binding, componentDocsURL string, infoResponseValidate func(*InfoEndpointResponse) error) (err error) {
|
||||
w.ctx, w.cancel = context.WithCancel(context.Background())
|
||||
w.client = &http.Client{
|
||||
Timeout: time.Second * 30,
|
||||
}
|
||||
w.componentDocsUrl = componentDocsUrl
|
||||
w.componentDocsURL = componentDocsURL
|
||||
w.infoResponseValidate = infoResponseValidate
|
||||
|
||||
// Check if we're using an externally-managed worker
|
||||
|
@ -109,10 +109,10 @@ func (w *Base) setupWorker(workerBindings []Binding) error {
|
|||
|
||||
// Check if the worker exists and it's the supported version
|
||||
// In case of error, any error, we will re-deploy the worker
|
||||
workerUrl := fmt.Sprintf("https://%s.%s.workers.dev/", w.metadata.WorkerName, subdomain)
|
||||
err = w.checkWorker(workerUrl)
|
||||
workerURL := fmt.Sprintf("https://%s.%s.workers.dev/", w.metadata.WorkerName, subdomain)
|
||||
err = w.checkWorker(workerURL)
|
||||
if err != nil {
|
||||
w.logger.Infof("Deploying updated worker at URL '%s'", workerUrl)
|
||||
w.logger.Infof("Deploying updated worker at URL '%s'", workerURL)
|
||||
err = w.deployWorker(workerBindings)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error deploying or updating the worker with the Cloudflare APIs: %w", err)
|
||||
|
@ -125,10 +125,10 @@ func (w *Base) setupWorker(workerBindings []Binding) error {
|
|||
}
|
||||
|
||||
// Wait for the worker to be deplopyed, which can take up to 30 seconds (but let's give it 1 minute)
|
||||
w.logger.Debugf("Deployed a new version of the worker at '%s' - waiting for propagation", workerUrl)
|
||||
w.logger.Debugf("Deployed a new version of the worker at '%s' - waiting for propagation", workerURL)
|
||||
start := time.Now()
|
||||
for time.Since(start) < time.Minute {
|
||||
err = w.checkWorker(workerUrl)
|
||||
err = w.checkWorker(workerURL)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
@ -140,11 +140,11 @@ func (w *Base) setupWorker(workerBindings []Binding) error {
|
|||
}
|
||||
w.logger.Debug("Worker is ready")
|
||||
} else {
|
||||
w.logger.Infof("Using worker at URL '%s'", workerUrl)
|
||||
w.logger.Infof("Using worker at URL '%s'", workerURL)
|
||||
}
|
||||
|
||||
// Update the URL of the worker
|
||||
w.metadata.WorkerURL = workerUrl
|
||||
w.metadata.WorkerURL = workerURL
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -160,7 +160,7 @@ func (w *Base) getWorkersSubdomain() (string, error) {
|
|||
defer cancel()
|
||||
|
||||
u := fmt.Sprintf("https://api.cloudflare.com/client/v4/accounts/%s/workers/subdomain", w.metadata.CfAccountID)
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", u, nil)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error creating network request: %w", err)
|
||||
}
|
||||
|
@ -277,7 +277,7 @@ func (w *Base) deployWorker(workerBindings []Binding) error {
|
|||
defer cancel()
|
||||
|
||||
u := fmt.Sprintf("https://api.cloudflare.com/client/v4/accounts/%s/workers/scripts/%s", w.metadata.CfAccountID, w.metadata.WorkerName)
|
||||
req, err := http.NewRequestWithContext(ctx, "PUT", u, buf)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, buf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating network request: %w", err)
|
||||
}
|
||||
|
@ -305,7 +305,7 @@ func (w *Base) enableWorkersDevRoute() error {
|
|||
defer cancel()
|
||||
|
||||
u := fmt.Sprintf("https://api.cloudflare.com/client/v4/accounts/%s/workers/scripts/%s/subdomain", w.metadata.CfAccountID, w.metadata.WorkerName)
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", u, strings.NewReader(`{"enabled": true}`))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, strings.NewReader(`{"enabled": true}`))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating network request: %w", err)
|
||||
}
|
||||
|
@ -345,7 +345,7 @@ func (w *Base) checkWorker(workerURL string) error {
|
|||
ctx, cancel := context.WithTimeout(w.ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", workerURL+".well-known/dapr/info", nil)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, workerURL+".well-known/dapr/info", nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating network request: %w", err)
|
||||
}
|
||||
|
@ -372,7 +372,7 @@ func (w *Base) checkWorker(workerURL string) error {
|
|||
|
||||
version, _ := strconv.Atoi(data.Version)
|
||||
if version < minWorkerVersion {
|
||||
return fmt.Errorf("the worker is running an outdated version '%d'; please upgrade the worker per instructions in the documentation at %s", version, w.componentDocsUrl)
|
||||
return fmt.Errorf("the worker is running an outdated version '%d'; please upgrade the worker per instructions in the documentation at %s", version, w.componentDocsURL)
|
||||
}
|
||||
|
||||
if w.infoResponseValidate != nil {
|
||||
|
@ -386,10 +386,10 @@ func (w *Base) checkWorker(workerURL string) error {
|
|||
}
|
||||
|
||||
// Close the base component.
|
||||
func (q *Base) Close() error {
|
||||
if q.cancel != nil {
|
||||
q.cancel()
|
||||
q.cancel = nil
|
||||
func (w *Base) Close() error {
|
||||
if w.cancel != nil {
|
||||
w.cancel()
|
||||
w.cancel = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ import (
|
|||
|
||||
// Link to the documentation for the component
|
||||
// TODO: Add link to docs
|
||||
const componentDocsUrl = "https://TODO"
|
||||
const componentDocsURL = "https://TODO"
|
||||
|
||||
// CFKV is a state store backed by Cloudflare KV.
|
||||
type CFKV struct {
|
||||
|
@ -73,11 +73,11 @@ func (q *CFKV) Init(metadata state.Metadata) error {
|
|||
}
|
||||
infoResponseValidate := func(data *workers.InfoEndpointResponse) error {
|
||||
if !slices.Contains(data.KV, q.metadata.KVNamespaceID) {
|
||||
return fmt.Errorf("the worker is not bound to the namespace with ID '%s'; please re-deploy the worker with the correct bindings per instructions in the documentation at %s", q.metadata.KVNamespaceID, componentDocsUrl)
|
||||
return fmt.Errorf("the worker is not bound to the namespace with ID '%s'; please re-deploy the worker with the correct bindings per instructions in the documentation at %s", q.metadata.KVNamespaceID, componentDocsURL)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return q.Base.Init(workerBindings, componentDocsUrl, infoResponseValidate)
|
||||
return q.Base.Init(workerBindings, componentDocsURL, infoResponseValidate)
|
||||
}
|
||||
|
||||
func (q *CFKV) GetComponentMetadata() map[string]string {
|
||||
|
@ -102,7 +102,7 @@ func (q *CFKV) Delete(stateReq *state.DeleteRequest) error {
|
|||
defer cancel()
|
||||
|
||||
u := q.metadata.WorkerURL + "kv/" + q.metadata.KVNamespaceID + "/" + url.PathEscape(stateReq.Key)
|
||||
req, err := http.NewRequestWithContext(ctx, "DELETE", u, nil)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, u, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating network request: %w", err)
|
||||
}
|
||||
|
@ -133,7 +133,7 @@ func (q *CFKV) Get(stateReq *state.GetRequest) (*state.GetResponse, error) {
|
|||
defer cancel()
|
||||
|
||||
u := q.metadata.WorkerURL + "kv/" + q.metadata.KVNamespaceID + "/" + url.PathEscape(stateReq.Key)
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", u, nil)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating network request: %w", err)
|
||||
}
|
||||
|
@ -176,7 +176,7 @@ func (q *CFKV) Set(stateReq *state.SetRequest) error {
|
|||
defer cancel()
|
||||
|
||||
u := q.metadata.WorkerURL + "kv/" + q.metadata.KVNamespaceID + "/" + url.PathEscape(stateReq.Key)
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", u, bytes.NewReader(q.marshalData(stateReq.Value)))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, bytes.NewReader(q.marshalData(stateReq.Value)))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating network request: %w", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue