Add files via upload

This commit is contained in:
Ivan Nikittin 2018-07-19 17:11:09 -07:00 committed by GitHub
parent 73a2cae3e4
commit 2bb9073b0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 685 additions and 0 deletions

View File

@ -0,0 +1,117 @@
# Demo: Binding running services to an IoT core (PubSub)
> For the ease of the demo, a few variables here are hard-coded.
## Setup
Define environment variables:
```shell
export IOTCORE_PROJECT="s9-demo"
export IOTCORE_REG="next18-demo"
export IOTCORE_DEVICE="next18-demo-client"
export IOTCORE_REGION="us-central1"
export IOTCORE_TOPIC_DATA="iot-demo"
export IOTCORE_TOPIC_DEVICE="iot-demo-device"
```
## Creating a device registry
Run the following command to create a device registry:
```shell
gcloud iot registries create $IOTCORE_REG \
--project=$IOTCORE_PROJECT \
--region=$IOTCORE_REGION \
--event-notification-config=$IOTCORE_TOPIC_DATA \
--state-pubsub-topic=$IOTCORE_TOPIC_DEVICE
```
## Creating device certificates
Create certificates to connect the device to the IoT Core gateway:
```shell
openssl genrsa -out rsa_private.pem 2048
openssl rsa -in rsa_private.pem -pubout -out rsa_public.pem
```
## Registering the IoT device
Once created, add the public key to the IoT core registry:
```shell
gcloud iot devices create $IOTCORE_DEVICE \
--project=$IOTCORE_PROJECT \
--region=$IOTCORE_REGION \
--registry=$IOTCORE_REG \
--public-key path=./rsa_public.pem,type=rs256
```
## Generating Data
To mimic an IoT device sending data to the IoT gateway, run the provided
Node.js client with the following parameters:
```shell
node send-data.js \
--projectId=$IOTCORE_PROJECT \
--cloudRegion=$IOTCORE_REGION \
--registryId=$IOTCORE_REG \
--deviceId=$IOTCORE_DEVICE \
--privateKeyFile=./iot_demo_private.pem \
--algorithm=RS256
```
This "device" will publish one event per second to the IoT Core gateway.
The gateway will automatically publish the received events to the configured
PubSub topic (`iot-demo`).
The following payload is sent by this simulated IoT client:
```shell
{
source_id: 'next18-demo-client',
event_id: '41e13421-25aa-4e93-bca8-0ffeb5c040c8',
event_ts: 1531515192370,
metric: 9
}
```
The `event_id` value here is a unique UUIDv4 ID, `event_ts` is UNIX Epoch time, and `metric`
is a random number 1-10.
## Create a function that handles events
Now we want to consume these IoT events, so let's create the function to handle the events:
```shell
kubectl apply -f event-flow/route.yaml
kubectl apply -f event-flow/configuration.yaml
```
## Create an event source
Before we can bind an action to an event source, we have to create an event source
that knows how to wire events into actions for that particular event type.
First let's create a ServiceAccount, so that we can run the local receive adapter
in Pull mode to poll for the events from this topic.
Then let's create a GCP PubSub as an event source that we can bind to.
```shell
kubectl apply -f event-flow/serviceaccount.yaml
kubectl apply -f event-flow/serviceaccountbinding.yaml
kubectl apply -f event-flow/eventsource.yaml
kubectl apply -f event-flow/eventtype.yaml
```
## Bind IoT events to our function
We have now created a function that we want to consume our IoT events, and we have an event
source that's sending events via GCP PubSub, so let's wire the two together:
```shell
kubectl apply -f event-flow/flow.yaml
```

View File

@ -0,0 +1,27 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: serving.knative.dev/v1alpha1
kind: Configuration
metadata:
name: iot-function
namespace: default
spec:
revisionTemplate:
metadata:
labels:
knative.dev/type: container
spec:
container:
image: github.com/mchmarny/next18/event-flow

View File

@ -0,0 +1,20 @@
{
"name": "demo-device",
"version": "0.0.1",
"description": "MQTT Demo Device for Google Cloud IoT Core using NodeJS",
"main": "device.js",
"license": "Apache-2.0",
"author": "Google Inc.",
"dependencies": {
"@google-cloud/nodejs-repo-tools": "2.2.1",
"@google-cloud/pubsub": "0.16.4",
"MD5": "^1.3.0",
"ava": "0.25.0",
"jsonwebtoken": "8.2.0",
"mqtt": "2.16.0",
"node-uuid": "1.4.8",
"uuid": "3.2.1",
"yargs": "11.0.0"
},
"devDependencies": {}
}

View File

@ -0,0 +1,296 @@
/**
* Copyright 2017, Google, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
// [START iot_mqtt_include]
const fs = require('fs');
const jwt = require('jsonwebtoken');
const mqtt = require('mqtt');
const uuid = require('node-uuid');
const os = require('os');
// [END iot_mqtt_include]
// The initial backoff time after a disconnection occurs, in seconds.
var MINIMUM_BACKOFF_TIME = 1;
// The maximum backoff time before giving up, in seconds.
var MAXIMUM_BACKOFF_TIME = 59;
// Whether to wait with exponential backoff before publishing.
var shouldBackoff = false;
// The current backoff time.
var backoffTime = 1;
// Whether an asynchronous publish chain is in progress.
var publishChainInProgress = false;
console.log('Knative IoT Example');
var argv = require(`yargs`)
.options({
projectId: {
default: process.env.GCLOUD_PROJECT || process.env.GOOGLE_CLOUD_PROJECT,
description: 'The Project ID to use. Defaults to the value of the GCLOUD_PROJECT or GOOGLE_CLOUD_PROJECT environment variables.',
requiresArg: true,
type: 'string'
},
cloudRegion: {
default: 'us-central1',
description: 'GCP cloud region.',
requiresArg: true,
type: 'string'
},
registryId: {
description: 'Cloud IoT registry ID.',
requiresArg: true,
demandOption: true,
type: 'string'
},
deviceId: {
description: 'Cloud IoT device ID.',
requiresArg: true,
demandOption: true,
type: 'string'
},
privateKeyFile: {
description: 'Path to private key file.',
requiresArg: true,
demandOption: true,
type: 'string'
},
algorithm: {
description: 'Encryption algorithm to generate the JWT.',
requiresArg: true,
demandOption: true,
choices: ['RS256', 'ES256'],
type: 'string'
},
numMessages: {
default: 100,
description: 'Number of messages to publish.',
requiresArg: true,
type: 'number'
},
tokenExpMins: {
default: 20,
description: 'Minutes to JWT token expiration.',
requiresArg: true,
type: 'number'
},
mqttBridgeHostname: {
default: 'mqtt.googleapis.com',
description: 'MQTT bridge hostname.',
requiresArg: true,
type: 'string'
},
mqttBridgePort: {
default: 8883,
description: 'MQTT bridge port.',
requiresArg: true,
type: 'number'
},
messageType: {
default: 'events',
description: 'Message type to publish.',
requiresArg: true,
choices: ['events', 'state'],
type: 'string'
}
})
.example(`node $0 send-data.js --projectId=blue-jet-123 \\\n\t--registryId=my-registry --deviceId=my-node-device \\\n\t--privateKeyFile=../rsa_private.pem --algorithm=RS256 \\\n\t --cloudRegion=us-west1`)
.wrap(120)
.recommendCommands()
.epilogue(`For more information, see https://cloud.google.com/iot-core/docs`)
.help()
.strict()
.argv;
// Create a Cloud IoT Core JWT for the given project id, signed with the given
// private key.
// [START iot_mqtt_jwt]
function createJwt(projectId, privateKeyFile, algorithm) {
// Create a JWT to authenticate this device. The device will be disconnected
// after the token expires, and will have to reconnect with a new token. The
// audience field should always be set to the GCP project id.
const token = {
'iat': parseInt(Date.now() / 1000),
'exp': parseInt(Date.now() / 1000) + 20 * 60, // 20 minutes
'aud': projectId
};
const privateKey = fs.readFileSync(privateKeyFile);
return jwt.sign(token, privateKey, { algorithm: algorithm });
}
// [END iot_mqtt_jwt]
// Mock some data
var mockData = function () {
var load = os.loadavg();
var msg = {
'source_id': "next18-demo-client",
'event_id': uuid.v4(),
'event_ts': new Date().getTime(),
'metric': Math.floor(Math.random() * 10) + 1
};
console.dir(msg);
return Buffer.from(JSON.stringify(msg)).toString('base64');
}
// Publish numMessages messages asynchronously, starting from message
// messagesSent.
// [START iot_mqtt_publish]
function publishAsync(messagesSent, numMessages) {
// If we have published enough messages or backed off too many times, stop.
if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) {
if (backoffTime >= MAXIMUM_BACKOFF_TIME) {
console.log('Backoff time is too high. Giving up.');
}
console.log('Closing connection to MQTT. Goodbye!');
client.end();
publishChainInProgress = false;
return;
}
// Publish and schedule the next publish.
publishChainInProgress = true;
var publishDelayMs = 0;
if (shouldBackoff) {
publishDelayMs = 1000 * (backoffTime + Math.random());
backoffTime *= 2;
console.log(`Backing off for ${publishDelayMs}ms before publishing.`);
}
setTimeout(function () {
// const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`;
const payload = mockData();
// Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
// Cloud IoT Core also supports qos=0 for at most once delivery.
console.log('Publishing message:', payload);
client.publish(mqttTopic, payload, { qos: 1 }, function (err) {
if (!err) {
shouldBackoff = false;
backoffTime = MINIMUM_BACKOFF_TIME;
}
});
var schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000;
setTimeout(function () {
// [START iot_mqtt_jwt_refresh]
let secsFromIssue = parseInt(Date.now() / 1000) - iatTime;
if (secsFromIssue > argv.tokenExpMins * 60) {
iatTime = parseInt(Date.now() / 1000);
console.log(`\tRefreshing token after ${secsFromIssue} seconds.`);
client.end();
connectionArgs.password = createJwt(argv.projectId, argv.privateKeyFile, argv.algorithm);
client = mqtt.connect(connectionArgs);
client.on('connect', (success) => {
console.log('connect');
if (!success) {
console.log('Client not connected...');
} else if (!publishChainInProgress) {
publishAsync(1, argv.numMessages);
}
});
client.on('close', () => {
console.log('close');
shouldBackoff = true;
});
client.on('error', (err) => {
console.log('error', err);
});
client.on('message', (topic, message, packet) => {
console.log('message received: ', Buffer.from(message, 'base64').toString('ascii'));
});
client.on('packetsend', () => {
// Note: logging packet send is very verbose
});
}
// [END iot_mqtt_jwt_refresh]
publishAsync(messagesSent + 1, numMessages);
}, schedulePublishDelayMs);
}, publishDelayMs);
}
// [END iot_mqtt_publish]
// [START iot_mqtt_run]
// The mqttClientId is a unique string that identifies this device. For Google
// Cloud IoT Core, it must be in the format below.
const mqttClientId = `projects/${argv.projectId}/locations/${argv.cloudRegion}/registries/${argv.registryId}/devices/${argv.deviceId}`;
// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
let connectionArgs = {
host: argv.mqttBridgeHostname,
port: argv.mqttBridgePort,
clientId: mqttClientId,
username: 'unused',
password: createJwt(argv.projectId, argv.privateKeyFile, argv.algorithm),
protocol: 'mqtts',
secureProtocol: 'TLSv1_2_method'
};
// Create a client, and connect to the Google MQTT bridge.
let iatTime = parseInt(Date.now() / 1000);
let client = mqtt.connect(connectionArgs);
// Subscribe to the /devices/{device-id}/config topic to receive config updates.
client.subscribe(`/devices/${argv.deviceId}/config`);
// The MQTT topic that this device will publish data to. The MQTT
// topic name is required to be in the format below. The topic name must end in
// 'state' to publish state and 'events' to publish telemetry. Note that this is
// not the same as the device registry's Cloud Pub/Sub topic.
const mqttTopic = `/devices/${argv.deviceId}/${argv.messageType}`;
client.on('connect', (success) => {
console.log('connect');
if (!success) {
console.log('Client not connected...');
} else if (!publishChainInProgress) {
publishAsync(1, argv.numMessages);
}
});
client.on('close', () => {
console.log('close');
shouldBackoff = true;
});
client.on('error', (err) => {
console.log('error', err);
});
client.on('message', (topic, message, packet) => {
console.log('message received: ', Buffer.from(message, 'base64').toString('ascii'));
});
client.on('packetsend', () => {
// Note: logging packet send is very verbose
});
// Once all of the messages have been published, the connection to Google Cloud
// IoT will be closed and the process will exit. See the publishAsync method.
// [END iot_mqtt_run]

View File

@ -0,0 +1,25 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: feeds.knative.dev/v1alpha1
kind: EventSource
metadata:
name: gcppubsub
namespace: default
spec:
type: gcppubsub
source: gcppubsub
image: github.com/knative/eventing/pkg/sources/gcppubsub
parameters:
image: github.com/knative/eventing/pkg/sources/gcppubsub/receive_adapter

View File

@ -0,0 +1,22 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: feeds.knative.dev/v1alpha1
kind: EventType
metadata:
name: receive
namespace: default
spec:
eventSource: gcppubsub
description: "subscription for receiving pubsub messages"

View File

@ -0,0 +1,34 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: flows.knative.dev/v1alpha1
kind: Flow
metadata:
name: iot-example
namespace: default
spec:
serviceAccountName: binder
trigger:
service: gcppubsub
eventType: receive
resource: gcppubsub/receive
parameters:
projectID: s9-demo
topic: iot-demo
action:
target:
kind: Route
apiVersion: serving.knative.dev/v1alpha1
name: iot-function

View File

@ -0,0 +1,78 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"encoding/base64"
"encoding/json"
"io/ioutil"
"log"
"net/http"
)
// Event represents PubSub payload
type Event struct {
ID string `json:"ID"`
Data string `json:"Data"`
Attributes map[string]string `json:"Attributes"`
}
// EventPayload represents PubSub Data payload
type EventPayload struct {
ID string `json:"event_id"`
SourceID string `json:"source_id"`
SentOn int `json:"event_ts"`
Metric int `json:"metric"`
}
func handlePost(rw http.ResponseWriter, req *http.Request) {
body, err := ioutil.ReadAll(req.Body)
if err != nil {
panic(err)
}
//log.Println(string(body))
// decode the pubsub message
// var event map[string]string
var event Event
if err := json.Unmarshal(body, &event); err != nil {
log.Printf("Failed to unmarshal event: %s", err)
return
}
// decode pubsub payload
rawEvent, _ := base64.StdEncoding.DecodeString(event.Data)
// decode iot data
data, _ := base64.StdEncoding.DecodeString(string(rawEvent))
// decode the pubsub message payload
var payload EventPayload
if err := json.Unmarshal(data, &payload); err != nil {
log.Printf("Failed to unmarshal payload: %s", err)
return
}
log.Printf("Data sent by device %q: [metric: %d, on: %v]",
event.Attributes["deviceId"], payload.Metric, payload.SentOn)
}
func main() {
http.HandleFunc("/", handlePost)
log.Fatal(http.ListenAndServe(":8080", nil))
}

View File

@ -0,0 +1,23 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: serving.knative.dev/v1alpha1
kind: Route
metadata:
name: iot-function
namespace: default
spec:
traffic:
- configurationName: iot-function
percent: 100

View File

@ -0,0 +1,18 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: v1
kind: ServiceAccount
metadata:
name: binder
namespace: default

View File

@ -0,0 +1,25 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: RoleBinding
metadata:
name: bind-admin
subjects:
- kind: ServiceAccount
name: binder
namespace: default
roleRef:
kind: ClusterRole
name: cluster-admin
apiGroup: rbac.authorization.k8s.io