Add Cloudflare KV state store

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2022-12-13 01:52:06 +00:00
parent d4b0980cec
commit 47db769066
13 changed files with 514 additions and 43 deletions

View File

@ -72,7 +72,7 @@ func (q *CFQueues) Init(metadata bindings.Metadata) error {
}
return nil
}
return q.Base.Init(metadata, workerBindings, componentDocsUrl, infoResponseValidate)
return q.Base.Init(workerBindings, componentDocsUrl, infoResponseValidate)
}
// Operations returns the supported operations for this binding.
@ -105,7 +105,7 @@ func (q *CFQueues) invokePublish(parentCtx context.Context, ir *bindings.InvokeR
ir.Data = []byte(d)
}
req, err := http.NewRequestWithContext(ctx, "POST", q.metadata.WorkerURL+"publish/"+q.metadata.QueueName, bytes.NewReader(ir.Data))
req, err := http.NewRequestWithContext(ctx, "POST", q.metadata.WorkerURL+"queues/"+q.metadata.QueueName, bytes.NewReader(ir.Data))
if err != nil {
return nil, fmt.Errorf("error creating network request: %w", err)
}

View File

@ -21,9 +21,6 @@ import (
)
// Component metadata struct.
// The component can be initialized in two ways:
// - Instantiate the component with a "workerURL": assumes a worker that has been pre-deployed and it's ready to be used; we will not need API tokens
// - Instantiate the component with a "cfAPIToken" and "cfAccountID": Dapr will take care of creating the worker if it doesn't exist (or upgrade it if needed)
type componentMetadata struct {
workers.BaseMetadata `mapstructure:",squash"`
QueueName string `mapstructure:"queueName"`

View File

@ -1,20 +1,98 @@
# Dapr connector for Cloudflare Workers
This folder contains the source code for the Worker that is used by Dapr components to interact with Cloudflare services such as KV and Queues.
## Build code
The built Worker resides in `../workers/code`. You can build it with:
```sh
npm ci
npm run build
```
> Important: do not publish this worker (e.g. with `npx wrangler publish`), as it should not use the config in the `wrangler.toml` file!
## Develop locally
Note that when running locally, authorization is not required and all Authorization headers are ignored. Settings for development are read from the `wrangler.toml` file, which is not used by Dapr.
### Create a Queue
The default configuration in `wrangler.toml` (used for development only) includes a binding to a Queue called `daprdemo`. If you don't have it already, make sure to create it with:
```sh
npm install
npx wrangler queues create daprdemo
```
### Create a KV namespace
To test with KV, you need to first create a namespace with Wrangler, for example:
```sh
npx wrangler kv:namespace create daprkv
npx wrangler kv:namespace create daprkv --preview
```
The output contains something like:
```text
Add the following to your configuration file in your kv_namespaces array:
{ binding = "daprkv", id = "......" }
Add the following to your configuration file in your kv_namespaces array:
{ binding = "daprkv", preview_id = "......" }
```
Make sure to add the values of `id` and `preview_id` above to the `wrangler.toml` file.
### Start the application locally
Start the application locally, using Wrangler
```sh
npm ci
npm run start
```
## Info endpoint
### Info endpoint
```sh
curl "http://localhost:8787/.well-known/dapr/info"
```
## Publish a message
### Using KV
Store a value:
```sh
curl -X POST -d 'hello world' "http://localhost:8787/publish/daprdemo"
# Format is /kv/<KV namespace>/<key>
curl -X POST -d 'Hello world!' "http://localhost:8787/kv/daprkv/mykey"
# Success: 201 (Created), empty body
```
Retrieve a value:
```sh
# Format is /kv/<KV namespace>/<key>
curl "http://localhost:8787/kv/daprkv/mykey"
# Success: 200 (OK), value in body
# No key: 404 (Not found), empty body
```
Delete a value:
```sh
# Format is /kv/<KV namespace>/<key>
curl -X DELETE "http://localhost:8787/kv/daprkv/mykey"
# Success: 204 (No content), empty body
```
### Using Queues
Publish a message:
```sh
# Format is /queues/<queue name>
curl -X POST -d 'orders.42' "http://localhost:8787/queues/daprdemo"
# Success: 201 (Accepted), empty body
```

View File

@ -12,8 +12,12 @@ limitations under the License.
*/
export type Environment = {
// PEM-encoded Ed25519 public key used to verify JWT tokens
PUBLIC_KEY: string
// Audience for the token - this is normally the worker's name
TOKEN_AUDIENCE: string
// Skips authorization - used for development
SKIP_AUTH: string
// Other values are assumed to be bindings: Queues, KV, R2
readonly [x: string]: string | Queue<string> | KVNamespace | R2Bucket
}

View File

@ -22,6 +22,11 @@ export async function AuthorizeRequest(
req: Request,
env: Environment
): Promise<boolean> {
// If "SKIP_AUTH" is set, we can allow skipping authorization
if (env.SKIP_AUTH === 'true') {
return true
}
// Ensure we have an Authorization header with a bearer JWT token
const match = tokenHeaderMatch.exec(req.headers.get('authorization') || '')
if (!match || !match[1]) {

View File

@ -1,12 +1,12 @@
{
"name": "dapr-cfworkers-client",
"version": "20221209",
"version": "20221212",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "dapr-cfworkers-client",
"version": "20221209",
"version": "20221212",
"license": "Apache2",
"dependencies": {
"itty-router": "^2.6.6",

View File

@ -74,38 +74,143 @@ const router = Router()
})
}
)
.post(
'/publish/:queue',
// Retrieve a value from KV
.get(
'/kv/:namespace/:key',
async (
req: Request & RequestI,
env: Environment
): Promise<Response> => {
if (!req?.text || !req.params?.queue) {
return new Response('Bad request', { status: 400 })
}
const queue = env[req.params.queue] as Queue<string>
if (!queue || typeof queue.send != 'function') {
return new Response(
`Not subscribed to queue '${req.params.queue}'`,
{ status: 412 }
)
const { namespace, key, errorRes } = await setupKVRequest(req, env)
if (errorRes) {
return errorRes
}
const auth = await AuthorizeRequest(req, env)
if (!auth) {
return new Response('Unauthorized', { status: 401 })
const val = await namespace!.get(key!, 'stream')
if (!val) {
return new Response('', { status: 404 })
}
let message = await req.text()
await queue.send(message)
return new Response(val, { status: 200 })
}
)
// Store a value in KV
.post(
'/kv/:namespace/:key',
async (
req: Request & RequestI,
env: Environment
): Promise<Response> => {
const { namespace, key, errorRes } = await setupKVRequest(req, env)
if (errorRes) {
return errorRes
}
await namespace!.put(key!, req.body!)
return new Response('', { status: 201 })
}
)
// Delete a value from KV
.delete(
'/kv/:namespace/:key',
async (
req: Request & RequestI,
env: Environment
): Promise<Response> => {
const { namespace, key, errorRes } = await setupKVRequest(req, env)
if (errorRes) {
return errorRes
}
await namespace!.delete(key!)
return new Response('', { status: 204 })
}
)
// Publish a message in a queue
.post(
'/queues/:queue',
async (
req: Request & RequestI,
env: Environment
): Promise<Response> => {
const { queue, errorRes } = await setupQueueRequest(req, env)
if (errorRes) {
return errorRes
}
let message = await req.text()
await queue!.send(message)
return new Response('', { status: 201 })
}
)
// Catch-all route to handle 404s
.all('*', (): Response => {
return new Response('Not found', { status: 404 })
})
// Performs the init setps for a KV request. Returns a Response object in case of error.
async function setupKVRequest(
req: Request & RequestI,
env: Environment
): Promise<{
namespace?: KVNamespace<string>
key?: string
errorRes?: Response
}> {
if (!req?.text || !req.params?.namespace || !req.params?.key) {
return { errorRes: new Response('Bad request', { status: 400 }) }
}
const namespace = env[req.params.namespace] as KVNamespace<string>
if (!namespace || typeof namespace.getWithMetadata != 'function') {
return {
errorRes: new Response(
`Worker is not bound to KV '${req.params.kv}'`,
{ status: 412 }
),
}
}
const auth = await AuthorizeRequest(req, env)
if (!auth) {
return { errorRes: new Response('Unauthorized', { status: 401 }) }
}
return { namespace, key: req.params.key }
}
// Performs the init setps for a Queue request. Returns a Response object in case of error.
async function setupQueueRequest(
req: Request & RequestI,
env: Environment
): Promise<{ queue?: Queue<string>; errorRes?: Response }> {
if (!req?.text || !req.params?.queue) {
return { errorRes: new Response('Bad request', { status: 400 }) }
}
const queue = env[req.params.queue] as Queue<string>
if (!queue || typeof queue.send != 'function') {
return {
errorRes: new Response(
`Worker is not bound to queue '${req.params.queue}'`,
{ status: 412 }
),
}
}
const auth = await AuthorizeRequest(req, env)
if (!auth) {
return { errorRes: new Response('Unauthorized', { status: 401 }) }
}
return { queue }
}
export default {
fetch: router.handle,
}

View File

@ -1,17 +1,21 @@
# This wrangler.toml is only used during local development
# Dapr interacts with the Cloudflare APIs directly and doesn't use Wrangler, hence it doesn't use this file
name = "daprdemo"
main = "worker.ts"
compatibility_date = "2022-12-09"
usage_model = "bundled"
[vars]
PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEAyutaN0GSzNkI3N/E/J9aCqpBE/UvLD4pd7tirq5f4tw=
-----END PUBLIC KEY-----
"""
TOKEN_AUDIENCE = "daprdemo"
PUBLIC_KEY = ""
TOKEN_AUDIENCE = ""
SKIP_AUTH = "true"
[[kv_namespaces]]
binding = "daprkv"
# Fill these with the namespace you create
id = "..."
preview_id = "..."
# Worker defines a binding, named "QUEUE", which gives it a capability
# to send messages to a Queue, named "daprdemo".
[[queues.producers]]
queue = "daprdemo"
binding = "daprdemo"

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -29,7 +29,6 @@ import (
"strings"
"time"
"github.com/dapr/components-contrib/bindings"
cfworkerscode "github.com/dapr/components-contrib/internal/component/cloudflare/workers/code"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
@ -56,7 +55,7 @@ type Base struct {
}
// Init the base class.
func (w *Base) Init(metadata bindings.Metadata, workerBindings []Binding, componentDocsUrl string, infoResponseValidate func(*InfoEndpointResponse) error) (err error) {
func (w *Base) Init(workerBindings []Binding, componentDocsUrl string, infoResponseValidate func(*InfoEndpointResponse) error) (err error) {
w.ctx, w.cancel = context.WithCancel(context.Background())
w.client = &http.Client{
Timeout: time.Second * 30,
@ -203,9 +202,13 @@ type deployWorkerMetadata struct {
// Binding contains a binding that is attached to the worker
type Binding struct {
Name string `json:"name"`
Type string `json:"type"`
Text *string `json:"text,omitempty"`
Name string `json:"name"`
Type string `json:"type"`
// For variables
Text *string `json:"text,omitempty"`
// For KV namespaces
KVNamespaceID *string `json:"namespace_id,omitempty"`
// For queues
QueueName *string `json:"queue_name,omitempty"`
}
@ -329,6 +332,7 @@ func (w *Base) enableWorkersDevRoute() error {
type InfoEndpointResponse struct {
Version string `json:"version"`
Queues []string `json:"queues"`
KV []string `json:"kv"`
}
// Check a worker to ensure it's available and it's using a supported version.

217
state/cloudflare/kv/cfkv.go Normal file
View File

@ -0,0 +1,217 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cfkv
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"reflect"
"time"
"github.com/mitchellh/mapstructure"
"golang.org/x/exp/slices"
"github.com/dapr/components-contrib/internal/component/cloudflare/workers"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
)
// Link to the documentation for the component
// TODO: Add link to docs
const componentDocsUrl = "https://TODO"
// CFKV is a state store backed by Cloudflare KV.
type CFKV struct {
*workers.Base
state.DefaultBulkStore
metadata componentMetadata
}
// NewCFKV returns a new CFKV.
func NewCFKV(logger logger.Logger) state.Store {
q := &CFKV{
Base: &workers.Base{},
}
q.DefaultBulkStore = state.NewDefaultBulkStore(q)
q.SetLogger(logger)
return q
}
// Init the component.
func (q *CFKV) Init(metadata state.Metadata) error {
// Decode the metadata
err := mapstructure.Decode(metadata.Properties, &q.metadata)
if err != nil {
return fmt.Errorf("failed to parse metadata: %w", err)
}
err = q.metadata.Validate()
if err != nil {
return fmt.Errorf("metadata is invalid: %w", err)
}
q.SetMetadata(&q.metadata.BaseMetadata)
// Init the base component
workerBindings := []workers.Binding{
{Type: "kv_namespace", Name: q.metadata.KVNamespaceName, KVNamespaceID: &q.metadata.KVNamespaceID},
}
infoResponseValidate := func(data *workers.InfoEndpointResponse) error {
if !slices.Contains(data.KV, q.metadata.KVNamespaceName) {
return fmt.Errorf("the worker is not bound to the namespace with name '%s'; please re-deploy the worker with the correct bindings per instructions in the documentation at %s", q.metadata.KVNamespaceName, componentDocsUrl)
}
return nil
}
return q.Base.Init(workerBindings, componentDocsUrl, infoResponseValidate)
}
func (q *CFKV) GetComponentMetadata() map[string]string {
metadataStruct := componentMetadata{}
metadataInfo := map[string]string{}
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo)
return metadataInfo
}
// Features returns the features supported by this state store.
func (q CFKV) Features() []state.Feature {
return []state.Feature{}
}
func (q *CFKV) Delete(stateReq *state.DeleteRequest) error {
token, err := q.metadata.CreateToken()
if err != nil {
return fmt.Errorf("failed to create authorization token: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
u := q.metadata.WorkerURL + "kv/" + q.metadata.KVNamespaceName + "/" + url.PathEscape(stateReq.Key)
req, err := http.NewRequestWithContext(ctx, "DELETE", u, nil)
if err != nil {
return fmt.Errorf("error creating network request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+token)
res, err := q.Client().Do(req)
if err != nil {
return fmt.Errorf("error invoking the worker: %w", err)
}
defer func() {
// Drain the body before closing it
_, _ = io.ReadAll(res.Body)
res.Body.Close()
}()
if res.StatusCode != http.StatusNoContent {
return fmt.Errorf("invalid response status code: %d", res.StatusCode)
}
return nil
}
func (q *CFKV) Get(stateReq *state.GetRequest) (*state.GetResponse, error) {
token, err := q.metadata.CreateToken()
if err != nil {
return nil, fmt.Errorf("failed to create authorization token: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
u := q.metadata.WorkerURL + "kv/" + q.metadata.KVNamespaceName + "/" + url.PathEscape(stateReq.Key)
req, err := http.NewRequestWithContext(ctx, "GET", u, nil)
if err != nil {
return nil, fmt.Errorf("error creating network request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+token)
res, err := q.Client().Do(req)
if err != nil {
return nil, fmt.Errorf("error invoking the worker: %w", err)
}
defer func() {
// Drain the body before closing it
_, _ = io.ReadAll(res.Body)
res.Body.Close()
}()
if res.StatusCode == http.StatusNotFound {
return &state.GetResponse{}, nil
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("invalid response status code: %d", res.StatusCode)
}
// Read the response
data, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response data: %w", err)
}
return &state.GetResponse{
Data: data,
}, nil
}
func (q *CFKV) Set(stateReq *state.SetRequest) error {
token, err := q.metadata.CreateToken()
if err != nil {
return fmt.Errorf("failed to create authorization token: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
u := q.metadata.WorkerURL + "kv/" + q.metadata.KVNamespaceName + "/" + url.PathEscape(stateReq.Key)
req, err := http.NewRequestWithContext(ctx, "POST", u, bytes.NewReader(q.marshalData(stateReq.Value)))
if err != nil {
return fmt.Errorf("error creating network request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+token)
res, err := q.Client().Do(req)
if err != nil {
return fmt.Errorf("error invoking the worker: %w", err)
}
defer func() {
// Drain the body before closing it
_, _ = io.ReadAll(res.Body)
res.Body.Close()
}()
if res.StatusCode != http.StatusCreated {
return fmt.Errorf("invalid response status code: %d", res.StatusCode)
}
return nil
}
func (q *CFKV) marshalData(value any) []byte {
switch x := value.(type) {
case []byte:
return x
default:
b, _ := json.Marshal(x)
return b
}
}
// Close the component
func (q *CFKV) Close() error {
err := q.Base.Close()
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,57 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cfkv
import (
"errors"
"regexp"
"github.com/dapr/components-contrib/internal/component/cloudflare/workers"
)
// Component metadata struct.
type componentMetadata struct {
workers.BaseMetadata `mapstructure:",squash"`
KVNamespaceName string `mapstructure:"kvNamespaceName"`
KVNamespaceID string `mapstructure:"kvNamespaceID"`
}
var kvNamespaceValidation = regexp.MustCompile("^([a-zA-Z0-9_\\-\\.]+)$")
// Validate the metadata object.
func (m *componentMetadata) Validate() error {
// Start by validating the base metadata, then validate the properties specific to this component
err := m.BaseMetadata.Validate()
if err != nil {
return err
}
// KVNamespaceName
if m.KVNamespaceName == "" {
return errors.New("property 'kvNamespaceName' is required")
}
if !kvNamespaceValidation.MatchString(m.KVNamespaceName) {
return errors.New("metadata property 'kvNamespaceName' is invalid")
}
// KVNamespaceID
if m.KVNamespaceID == "" {
return errors.New("property 'kvNamespaceID' is required")
}
if !kvNamespaceValidation.MatchString(m.KVNamespaceID) {
return errors.New("metadata property 'kvNamespaceID' is invalid")
}
return nil
}