--- title: "Event-driven Image and BigQuery processing pipelines with Knative on Google Cloud" date: 2020-06-19 description: "Using Knative Eventing to build event-driven image and BigQuery processing pipelines on Google Cloud" type: "blog" --- In this blog post, I will outline two event-driven processing pipelines that I recently built with Knative Eventing. Along the way, I will explain event sources, custom events and other components provided by Knative, that greatly simplify the development of event-driven architectures. Both of these pipelines are available on GitHub, including source code, configurations, and detailed instructions, as part of my [Knative Tutorial](https://github.com/meteatamel/knative-tutorial). ## Knative components used When creating these example pipelines, I relied on a few Knative components that greatly simplified my development. More specifially: 1. [Event sources](https://knative.dev/docs/eventing/sources/) allow you to read external events in your cluster. [Knative-GCP Sources](https://github.com/google/knative-gcp#knative-gcp-sources) provide a number of eventing sources ready to read events from various Google Cloud sources. 2. [Broker and triggers](https://knative.dev/docs/eventing/broker/) provide event delivery without producers or consumers needing to know about how the events are routed. 3. **Custom events and event replies**: In Knative, all events are [CloudEvents](https://cloudevents.io/), so it's useful to have a standard format for events and various SDKs to read/write them. Knative supports custom events and event replies. Any service can receive an event, do some processing, create a custom event with new data, and reply back to the broker so that other services can read the custom event. This is useful in pipelines, where each service does a little bit of work and passes the message forward to the next service. ## Image Processing Pipeline In this image processing pipeline example, users upload an image to a storage bucket on Google Cloud, process the image with a number of different Knative services, and save the processed image to an output bucket. I defined two requirements for the pipeline: 1. Uploaded images are filtered before they are sent through the pipeline. For example, no adult themed or violent images are allowed. 2. The pipeline can contain any number of processing services that can be added or removed as needed. ### Architecture This section explains the architecture of the image processing pipeline. The pipeline is deployed to Google Kubernetes Engine (GKE) on Google Cloud. ![Image processing pipeline architecture](https://atamel.dev/img/2020/image-processing-pipeline.png) 1. An image is saved to an input Cloud Storage bucket. 2. A Cloud Storage update event is read into Knative by [CloudStorageSource](https://github.com/google/knative-gcp/blob/master/docs/examples/cloudstoragesource/README.md). 3. A filter service receives the Cloud Storage event. It uses the Vision API to determine whether the image is safe or should be filtered. If the image is safe, the filter service creates a custom CloudEvent of type `dev.knative.samples.fileuploaded` and passes it back to the broker. 4. The resizer service receives the `fileuploaded` event, and then resizes the image using the [ImageSharp](https://github.com/SixLabors/ImageSharp) library. The service then saves the resized image to the output bucket, creates a custom CloudEvent of type `dev.knative.samples.fileresized`, and passes the event back to the broker. 5. The watermark service receives the `fileresized` event, adds a watermark to the image using the [ImageSharp](https://github.com/SixLabors/ImageSharp) library, and saves the image to the output bucket. 6. The labeler receives the `fileuploaded` event, extracts labels from the image using the Vision API, and saves the labels to the output bucket. ### Test the pipeline To test the pipeline, I uploaded a picture from my favorite beach, Ipanema in Rio de Janeiro, to the bucket: ![Beach with sunset](https://atamel.dev/img/2020/beach.jpg) After a few seconds, I saw 3 files in my output bucket: ```sh gsutil ls gs://knative-atamel-images-output gs://knative-atamel-images-output/beach-400x400-watermark.jpeg gs://knative-atamel-images-output/beach-400x400.png gs://knative-atamel-images-output/beach-labels.txt ``` We can see the labels `Sky,Body of water,Sea,Nature,Coast,Water,Sunset,Horizon,Cloud,Shore` in the text file, and the resized and watermarked image: ![Beach with sunset](https://atamel.dev/img/2020/beach-400x400-watermark.jpeg) ## BigQuery Processing Pipeline This pipeline example is a schedule driven pipeline, which queries and finds the daily number of COVID-19 cases for the UK and Cyprus. I used a public COVID-19 dataset on BigQuery to get the data, generate charts, and send myself one email for each country, once a day, containing those charts. ### Architecture Here's the architecture of the pipeline. ![BigQuery processing pipeline architecture](https://atamel.dev/img/2020/bigquery-processing-pipeline.png) 1. I setup two `CloudSchedulerSources` for two countries (United Kingdom and Cyprus) to call the `QueryRunner` service once a day. 2. The QueryRunner service receives the scheduler events for the UK and Cyprus, queries COVID-19 cases for each of them using BigQuery's public COVID-19 data set, and saves the results in a separate BigQuery table. After this is done, the QueryRunner service returns a custom CloudEvent of type `dev.knative.samples.querycompleted`. 3. The ChartCreator service receives the `querycompleted` CloudEvent, creates a chart from BigQuery data using `Matplotlib`, and saves it to a Cloud Storage bucket. 4. The notifier service is another receives the `com.google.cloud.storage.object.finalize` CloudEvent from the bucket through a CloudStorageSource, and sends an email notification to users using SendGrid. ### Test the pipeline The CloudSchedulerSource creates CloudScheduler jobs: ```bash gcloud scheduler jobs list ID LOCATION SCHEDULE (TZ) TARGET_TYPE STATE cre-scheduler-2bcb33d8-3165-4eca-9428-feb99bc320e2 europe-west1 0 16 * * * (UTC) Pub/Sub ENABLED cre-scheduler-714c0b82-c441-42f4-8f99-0e2eac9a5869 europe-west1 0 17 * * * (UTC) Pub/Sub ENABLED ``` Trigger the jobs: ```bash gcloud scheduler jobs run cre-scheduler-2bcb33d8-3165-4eca-9428-feb99bc320e2 ``` You should get an email with with a chart similar to this in a few minutes: ![Chart - United Kingdom](https://atamel.dev/img/2020/chart-unitedkingdom.png) This wraps up my post. As I already mentioned, if you want more detailed instructions, you can check out [image-processing-pipeline](https://github.com/meteatamel/knative-tutorial/blob/master/docs/image-processing-pipeline.md) and [bigquery-processing-pipeline](https://github.com/meteatamel/knative-tutorial/blob/master/docs/bigquery-processing-pipeline.md) as part of my [Knative Tutorial](https://github.com/meteatamel/knative-tutorial) If you have questions/comments, feel free to reach out to me on Twitter [@meteatamel](https://twitter.com/meteatamel)). --- By [Mete Atamel](https://twitter.com/meteatamel) - Developer Advocate, Google Cloud