diff --git a/bindings/cloudflare/queues/cfqueues.go b/bindings/cloudflare/queues/cfqueues.go index 840bf3143..5c0bbb4b6 100644 --- a/bindings/cloudflare/queues/cfqueues.go +++ b/bindings/cloudflare/queues/cfqueues.go @@ -14,6 +14,7 @@ limitations under the License. package cfqueues import ( + "bytes" "context" "crypto/ed25519" "crypto/x509" @@ -37,11 +38,14 @@ import ( "github.com/dapr/kit/logger" ) -// Minimum version required for the running Worker. -const minWorkerVersion = 20221209 - -// Issuer for JWTs -const tokenIssuer = "dapr.io/cloudflare" +const ( + // Minimum version required for the running Worker. + minWorkerVersion = 20221209 + // Issuer for JWTs + tokenIssuer = "dapr.io/cloudflare" + // JWT token expiration + tokenExpiration = 5 * time.Minute +) // CFQueues is a binding for publishing messages on Cloudflare Queues type CFQueues struct { @@ -169,7 +173,12 @@ func (q *CFQueues) invokePublish(parentCtx context.Context, ir *bindings.InvokeR ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second) defer cancel() - req, err := http.NewRequestWithContext(ctx, "POST", q.metadata.WorkerURL+"publish/"+md.QueueName, nil) + d, err := strconv.Unquote(string(ir.Data)) + if err == nil { + ir.Data = []byte(d) + } + + req, err := http.NewRequestWithContext(ctx, "POST", q.metadata.WorkerURL+"publish/"+md.QueueName, bytes.NewReader(ir.Data)) if err != nil { return nil, fmt.Errorf("error creating network request: %w", err) } @@ -198,7 +207,7 @@ func (q CFQueues) createToken() (string, error) { Audience([]string{q.metadata.WorkerName}). Issuer(tokenIssuer). IssuedAt(now). - Expiration(now.Add(5 * time.Minute)). + Expiration(now.Add(tokenExpiration)). Build() if err != nil { return "", fmt.Errorf("failed to build token: %w", err) diff --git a/internal/component/cloudflare/worker-src/lib/environment.ts b/internal/component/cloudflare/worker-src/lib/environment.ts new file mode 100644 index 000000000..9e2ba182b --- /dev/null +++ b/internal/component/cloudflare/worker-src/lib/environment.ts @@ -0,0 +1,19 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +export type Environment = { + PUBLIC_KEY: string + TOKEN_AUDIENCE: string + // Other values are assumed to be bindings: Queues, KV, R2 + readonly [x: string]: string | Queue | KVNamespace | R2Bucket +} diff --git a/internal/component/cloudflare/worker-src/lib/jwt-auth.ts b/internal/component/cloudflare/worker-src/lib/jwt-auth.ts new file mode 100644 index 000000000..227589708 --- /dev/null +++ b/internal/component/cloudflare/worker-src/lib/jwt-auth.ts @@ -0,0 +1,47 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { importSPKI, jwtVerify } from 'jose' + +import type { Environment } from '$lib/environment' + +const tokenHeaderMatch = + /^(?:Bearer )?([A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+)/i + +export async function AuthorizeRequest( + req: Request, + env: Environment +): Promise { + // Ensure we have an Authorization header with a bearer JWT token + const match = tokenHeaderMatch.exec(req.headers.get('authorization') || '') + if (!match || !match[1]) { + return false + } + + // Validate the JWT + const pk = await importSPKI(env.PUBLIC_KEY, 'EdDSA') + try { + await jwtVerify(match[1], pk, { + issuer: 'dapr.io/cloudflare', + audience: env.TOKEN_AUDIENCE, + algorithms: ['EdDSA'], + // Allow 5 mins of clock skew + clockTolerance: 300, + }) + } catch (err) { + console.error('Failed to validate JWT: ' + err) + return false + } + + return true +} diff --git a/internal/component/cloudflare/worker-src/package-lock.json b/internal/component/cloudflare/worker-src/package-lock.json index 7cb9a2708..f29969b63 100644 --- a/internal/component/cloudflare/worker-src/package-lock.json +++ b/internal/component/cloudflare/worker-src/package-lock.json @@ -16,7 +16,7 @@ "@cloudflare/workers-types": "^4.20221111.1", "prettier": "^2.8.1", "typescript": "^4.9.4", - "wrangler": "^2.6.1" + "wrangler": "^2.6.2" } }, "node_modules/@cloudflare/kv-asset-handler": { @@ -1506,9 +1506,9 @@ } }, "node_modules/wrangler": { - "version": "2.6.1", - "resolved": "https://registry.npmjs.org/wrangler/-/wrangler-2.6.1.tgz", - "integrity": "sha512-v0Kh8KQC33xP82pGXZ+67pJMQIx0CHI+gF2nLIcdZbrB0wAQLQ6lsQYS9UpuTupQg4+RetHToDwk+lzRM0J0cQ==", + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/wrangler/-/wrangler-2.6.2.tgz", + "integrity": "sha512-+in4oEQXDs6+vE+1c6niBd3IrW1DMRTbauR6G0u3TpD6UaXOLwLdBxRLEbN3m82dN+WNm7l1MbFZrKc/TnWjhw==", "dev": true, "dependencies": { "@cloudflare/kv-asset-handler": "^0.2.0", @@ -2589,9 +2589,9 @@ } }, "wrangler": { - "version": "2.6.1", - "resolved": "https://registry.npmjs.org/wrangler/-/wrangler-2.6.1.tgz", - "integrity": "sha512-v0Kh8KQC33xP82pGXZ+67pJMQIx0CHI+gF2nLIcdZbrB0wAQLQ6lsQYS9UpuTupQg4+RetHToDwk+lzRM0J0cQ==", + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/wrangler/-/wrangler-2.6.2.tgz", + "integrity": "sha512-+in4oEQXDs6+vE+1c6niBd3IrW1DMRTbauR6G0u3TpD6UaXOLwLdBxRLEbN3m82dN+WNm7l1MbFZrKc/TnWjhw==", "dev": true, "requires": { "@cloudflare/kv-asset-handler": "^0.2.0", diff --git a/internal/component/cloudflare/worker-src/package.json b/internal/component/cloudflare/worker-src/package.json index d43e82bf6..a634f9887 100644 --- a/internal/component/cloudflare/worker-src/package.json +++ b/internal/component/cloudflare/worker-src/package.json @@ -14,7 +14,7 @@ "@cloudflare/workers-types": "^4.20221111.1", "prettier": "^2.8.1", "typescript": "^4.9.4", - "wrangler": "^2.6.1" + "wrangler": "^2.6.2" }, "dependencies": { "itty-router": "^2.6.6", diff --git a/internal/component/cloudflare/worker-src/tsconfig.json b/internal/component/cloudflare/worker-src/tsconfig.json index fde6272c9..24f6ddfde 100644 --- a/internal/component/cloudflare/worker-src/tsconfig.json +++ b/internal/component/cloudflare/worker-src/tsconfig.json @@ -11,6 +11,10 @@ "noEmit": true, "isolatedModules": true, "allowSyntheticDefaultImports": true, - "strict": true + "strict": true, + "baseUrl": ".", + "paths": { + "$lib/*": ["lib/*"] + } } } diff --git a/internal/component/cloudflare/worker-src/worker.ts b/internal/component/cloudflare/worker-src/worker.ts index f2d3b1b7c..6a6d936c4 100644 --- a/internal/component/cloudflare/worker-src/worker.ts +++ b/internal/component/cloudflare/worker-src/worker.ts @@ -1,12 +1,30 @@ -import { Router, type Request as RequestI } from 'itty-router' -import { importSPKI, jwtVerify } from 'jose' +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +import { Router, type Request as RequestI } from 'itty-router' + +import type { Environment } from '$lib/environment' +import { AuthorizeRequest } from '$lib/jwt-auth' import { version } from './package.json' const router = Router() + // Handle the info endpoint .get( '/.well-known/dapr/info', - async (req: Request & RequestI, env: Environment): Promise => { + async ( + req: Request & RequestI, + env: Environment + ): Promise => { const auth = await AuthorizeRequest(req, env) if (!auth) { return new Response('Unauthorized', { status: 401 }) @@ -90,45 +108,4 @@ const router = Router() export default { fetch: router.handle, - - async queue(batch: MessageBatch, env: Environment) { - for (let i = 0; i < batch.messages.length; i++) { - console.log(`Received message ${JSON.stringify(batch.messages[i])}`) - } - }, -} - -const tokenHeaderMatch = - /^(?:Bearer )?([A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+)/i - -async function AuthorizeRequest( - req: Request, - env: Environment -): Promise { - // Ensure we have an Authorization header with a bearer JWT token - const match = tokenHeaderMatch.exec(req.headers.get('authorization') || '') - if (!match || !match[1]) { - return false - } - - // Validate the JWT - const pk = await importSPKI(env.PUBLIC_KEY, 'EdDSA') - try { - await jwtVerify(match[1], pk, { - issuer: 'dapr.io/cloudflare', - audience: env.TOKEN_AUDIENCE, - }) - } catch (err) { - console.error('Failed to validate JWT: ' + err) - return false - } - - return true -} - -type Environment = { - PUBLIC_KEY: string - TOKEN_AUDIENCE: string - // Other values are assumed to be bindings: Queues, KV, R2 - readonly [x: string]: string | Queue | KVNamespace | R2Bucket } diff --git a/internal/component/cloudflare/worker-src/wrangler.toml b/internal/component/cloudflare/worker-src/wrangler.toml index b5f64e684..a84b662ce 100644 --- a/internal/component/cloudflare/worker-src/wrangler.toml +++ b/internal/component/cloudflare/worker-src/wrangler.toml @@ -15,10 +15,3 @@ TOKEN_AUDIENCE = "daprdemo" [[queues.producers]] queue = "daprdemo" binding = "daprdemo" - -# Worker also receives messages from the Queue, named "daprdemo". -[[queues.consumers]] - queue = "daprdemo" - max_batch_size = 10 # Max messages per batch - max_batch_timeout = 1 # Max seconds to wait before batch is full - max_retries = 3 # Max retries per batch