Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2022-12-12 20:06:04 +00:00
parent 385d0480a7
commit 456caaef6a
8 changed files with 116 additions and 67 deletions

View File

@ -14,6 +14,7 @@ limitations under the License.
package cfqueues
import (
"bytes"
"context"
"crypto/ed25519"
"crypto/x509"
@ -37,11 +38,14 @@ import (
"github.com/dapr/kit/logger"
)
// Minimum version required for the running Worker.
const minWorkerVersion = 20221209
// Issuer for JWTs
const tokenIssuer = "dapr.io/cloudflare"
const (
// Minimum version required for the running Worker.
minWorkerVersion = 20221209
// Issuer for JWTs
tokenIssuer = "dapr.io/cloudflare"
// JWT token expiration
tokenExpiration = 5 * time.Minute
)
// CFQueues is a binding for publishing messages on Cloudflare Queues
type CFQueues struct {
@ -169,7 +173,12 @@ func (q *CFQueues) invokePublish(parentCtx context.Context, ir *bindings.InvokeR
ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", q.metadata.WorkerURL+"publish/"+md.QueueName, nil)
d, err := strconv.Unquote(string(ir.Data))
if err == nil {
ir.Data = []byte(d)
}
req, err := http.NewRequestWithContext(ctx, "POST", q.metadata.WorkerURL+"publish/"+md.QueueName, bytes.NewReader(ir.Data))
if err != nil {
return nil, fmt.Errorf("error creating network request: %w", err)
}
@ -198,7 +207,7 @@ func (q CFQueues) createToken() (string, error) {
Audience([]string{q.metadata.WorkerName}).
Issuer(tokenIssuer).
IssuedAt(now).
Expiration(now.Add(5 * time.Minute)).
Expiration(now.Add(tokenExpiration)).
Build()
if err != nil {
return "", fmt.Errorf("failed to build token: %w", err)

View File

@ -0,0 +1,19 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
export type Environment = {
PUBLIC_KEY: string
TOKEN_AUDIENCE: string
// Other values are assumed to be bindings: Queues, KV, R2
readonly [x: string]: string | Queue<string> | KVNamespace | R2Bucket
}

View File

@ -0,0 +1,47 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import { importSPKI, jwtVerify } from 'jose'
import type { Environment } from '$lib/environment'
const tokenHeaderMatch =
/^(?:Bearer )?([A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+)/i
export async function AuthorizeRequest(
req: Request,
env: Environment
): Promise<boolean> {
// Ensure we have an Authorization header with a bearer JWT token
const match = tokenHeaderMatch.exec(req.headers.get('authorization') || '')
if (!match || !match[1]) {
return false
}
// Validate the JWT
const pk = await importSPKI(env.PUBLIC_KEY, 'EdDSA')
try {
await jwtVerify(match[1], pk, {
issuer: 'dapr.io/cloudflare',
audience: env.TOKEN_AUDIENCE,
algorithms: ['EdDSA'],
// Allow 5 mins of clock skew
clockTolerance: 300,
})
} catch (err) {
console.error('Failed to validate JWT: ' + err)
return false
}
return true
}

View File

@ -16,7 +16,7 @@
"@cloudflare/workers-types": "^4.20221111.1",
"prettier": "^2.8.1",
"typescript": "^4.9.4",
"wrangler": "^2.6.1"
"wrangler": "^2.6.2"
}
},
"node_modules/@cloudflare/kv-asset-handler": {
@ -1506,9 +1506,9 @@
}
},
"node_modules/wrangler": {
"version": "2.6.1",
"resolved": "https://registry.npmjs.org/wrangler/-/wrangler-2.6.1.tgz",
"integrity": "sha512-v0Kh8KQC33xP82pGXZ+67pJMQIx0CHI+gF2nLIcdZbrB0wAQLQ6lsQYS9UpuTupQg4+RetHToDwk+lzRM0J0cQ==",
"version": "2.6.2",
"resolved": "https://registry.npmjs.org/wrangler/-/wrangler-2.6.2.tgz",
"integrity": "sha512-+in4oEQXDs6+vE+1c6niBd3IrW1DMRTbauR6G0u3TpD6UaXOLwLdBxRLEbN3m82dN+WNm7l1MbFZrKc/TnWjhw==",
"dev": true,
"dependencies": {
"@cloudflare/kv-asset-handler": "^0.2.0",
@ -2589,9 +2589,9 @@
}
},
"wrangler": {
"version": "2.6.1",
"resolved": "https://registry.npmjs.org/wrangler/-/wrangler-2.6.1.tgz",
"integrity": "sha512-v0Kh8KQC33xP82pGXZ+67pJMQIx0CHI+gF2nLIcdZbrB0wAQLQ6lsQYS9UpuTupQg4+RetHToDwk+lzRM0J0cQ==",
"version": "2.6.2",
"resolved": "https://registry.npmjs.org/wrangler/-/wrangler-2.6.2.tgz",
"integrity": "sha512-+in4oEQXDs6+vE+1c6niBd3IrW1DMRTbauR6G0u3TpD6UaXOLwLdBxRLEbN3m82dN+WNm7l1MbFZrKc/TnWjhw==",
"dev": true,
"requires": {
"@cloudflare/kv-asset-handler": "^0.2.0",

View File

@ -14,7 +14,7 @@
"@cloudflare/workers-types": "^4.20221111.1",
"prettier": "^2.8.1",
"typescript": "^4.9.4",
"wrangler": "^2.6.1"
"wrangler": "^2.6.2"
},
"dependencies": {
"itty-router": "^2.6.6",

View File

@ -11,6 +11,10 @@
"noEmit": true,
"isolatedModules": true,
"allowSyntheticDefaultImports": true,
"strict": true
"strict": true,
"baseUrl": ".",
"paths": {
"$lib/*": ["lib/*"]
}
}
}

View File

@ -1,12 +1,30 @@
import { Router, type Request as RequestI } from 'itty-router'
import { importSPKI, jwtVerify } from 'jose'
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import { Router, type Request as RequestI } from 'itty-router'
import type { Environment } from '$lib/environment'
import { AuthorizeRequest } from '$lib/jwt-auth'
import { version } from './package.json'
const router = Router()
// Handle the info endpoint
.get(
'/.well-known/dapr/info',
async (req: Request & RequestI, env: Environment): Promise<Response> => {
async (
req: Request & RequestI,
env: Environment
): Promise<Response> => {
const auth = await AuthorizeRequest(req, env)
if (!auth) {
return new Response('Unauthorized', { status: 401 })
@ -90,45 +108,4 @@ const router = Router()
export default {
fetch: router.handle,
async queue(batch: MessageBatch<string>, env: Environment) {
for (let i = 0; i < batch.messages.length; i++) {
console.log(`Received message ${JSON.stringify(batch.messages[i])}`)
}
},
}
const tokenHeaderMatch =
/^(?:Bearer )?([A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+)/i
async function AuthorizeRequest(
req: Request,
env: Environment
): Promise<boolean> {
// Ensure we have an Authorization header with a bearer JWT token
const match = tokenHeaderMatch.exec(req.headers.get('authorization') || '')
if (!match || !match[1]) {
return false
}
// Validate the JWT
const pk = await importSPKI(env.PUBLIC_KEY, 'EdDSA')
try {
await jwtVerify(match[1], pk, {
issuer: 'dapr.io/cloudflare',
audience: env.TOKEN_AUDIENCE,
})
} catch (err) {
console.error('Failed to validate JWT: ' + err)
return false
}
return true
}
type Environment = {
PUBLIC_KEY: string
TOKEN_AUDIENCE: string
// Other values are assumed to be bindings: Queues, KV, R2
readonly [x: string]: string | Queue<string> | KVNamespace | R2Bucket
}

View File

@ -15,10 +15,3 @@ TOKEN_AUDIENCE = "daprdemo"
[[queues.producers]]
queue = "daprdemo"
binding = "daprdemo"
# Worker also receives messages from the Queue, named "daprdemo".
[[queues.consumers]]
queue = "daprdemo"
max_batch_size = 10 # Max messages per batch
max_batch_timeout = 1 # Max seconds to wait before batch is full
max_retries = 3 # Max retries per batch