Auto-update dependencies (#128)

Produced via:
  `dep ensure -update knative.dev/test-infra knative.dev/pkg`
/assign n3wscott
This commit is contained in:
Matt Moore 2019-11-13 17:19:29 -08:00 committed by Knative Prow Robot
parent 313aaa60e5
commit 0fc39123e1
69 changed files with 1245 additions and 2206 deletions

8
Gopkg.lock generated
View File

@ -933,7 +933,7 @@
[[projects]]
branch = "master"
digest = "1:5ca4167264dc0212a6d38d700ec305125f38af525371465b5ad3367178bc20b4"
digest = "1:0b3cb53ae4d837bb424680941581ec7a2828c9d530db0b11e0cf365ead4e6403"
name = "knative.dev/pkg"
packages = [
"apis",
@ -952,18 +952,18 @@
"metrics/metricskey",
]
pruneopts = "T"
revision = "340e3aefcd4b731170fc27a72ebdc273c87ae93a"
revision = "4deb5d83d26170faeef8e54e9ae4cd9b04ed81f8"
[[projects]]
branch = "master"
digest = "1:5299d75a2b08a91c54ffb8b76afe21eb5f1ecf7bc4d67b859fc081f9415452bc"
digest = "1:e4a2fd481bd2d6dcac5ae03c55d6104a0953d0a78cb45fdd4d20a116ed2a5218"
name = "knative.dev/test-infra"
packages = [
"scripts",
"tools/dep-collector",
]
pruneopts = "UT"
revision = "6c8da588aaa3d2bff76a9c37fd6ac5c9a6c02c09"
revision = "e381f11dc722330fe082158e2f22cd39f8fe8375"
[[projects]]
digest = "1:8730e0150dfb2b7e173890c8b9868e7a273082ef8e39f4940e3506a481cf895c"

6
vendor/knative.dev/pkg/Gopkg.lock generated vendored
View File

@ -1260,14 +1260,14 @@
[[projects]]
branch = "master"
digest = "1:d98cbeb7a88cc866d8353ffec3116baf6397669bde5ceb4caccf540e8cacc496"
digest = "1:5299d75a2b08a91c54ffb8b76afe21eb5f1ecf7bc4d67b859fc081f9415452bc"
name = "knative.dev/test-infra"
packages = [
"scripts",
"tools/dep-collector",
]
pruneopts = "UT"
revision = "027b4ff36fdb3a671f12e4fb1d386a742ee12183"
revision = "6c8da588aaa3d2bff76a9c37fd6ac5c9a6c02c09"
[[projects]]
digest = "1:8730e0150dfb2b7e173890c8b9868e7a273082ef8e39f4940e3506a481cf895c"
@ -1326,7 +1326,6 @@
"go.uber.org/zap/zaptest",
"golang.org/x/net/context",
"golang.org/x/oauth2",
"golang.org/x/oauth2/google",
"golang.org/x/sync/errgroup",
"google.golang.org/api/container/v1beta1",
"google.golang.org/api/iterator",
@ -1383,6 +1382,7 @@
"k8s.io/client-go/kubernetes/fake",
"k8s.io/client-go/kubernetes/scheme",
"k8s.io/client-go/kubernetes/typed/core/v1",
"k8s.io/client-go/listers/admissionregistration/v1beta1",
"k8s.io/client-go/listers/apps/v1",
"k8s.io/client-go/listers/autoscaling/v2beta1",
"k8s.io/client-go/listers/core/v1",

View File

@ -2,12 +2,10 @@ aliases:
pkg-approvers:
- evankanderson
- mattmoor
- vaikas-google
- vaikas
apis-approvers:
- mattmoor
- vaikas-google
- vaikas
- n3wscott
@ -16,12 +14,6 @@ aliases:
apis-duck-approvers:
- mattmoor
- vaikas-google
- vaikas
cloudevents-approvers:
- n3wscott
- vaikas-google
- vaikas
configmap-approvers:
@ -68,7 +60,6 @@ aliases:
source-approvers:
- n3wscott
- vaikas-google
- vaikas
webhook-approvers:

View File

@ -88,3 +88,13 @@ their own release branches, so to update the `knative/pkg` dependency we run:
dep ensure -update knative.dev/pkg
./hack/update-deps.sh
```
## Revert to Master
Post release, reverse the process. `Gopkg.toml` should look like:
```toml
[[override]]
name = "knative.dev/pkg"
branch = "master"
```

View File

@ -55,7 +55,6 @@ const (
// Conditions defines a readiness condition for a Knative resource.
// See: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties
// +k8s:deepcopy-gen=true
// +k8s:openapi-gen=true
type Condition struct {
// Type of condition.
// +required

View File

@ -0,0 +1,93 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"context"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
)
// Destination represents a target of an invocation over HTTP.
type Destination struct {
// Ref points to an Addressable.
// +optional
Ref *corev1.ObjectReference `json:"ref,omitempty"`
// URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
// +optional
URI *apis.URL `json:"uri,omitempty"`
}
func (dest *Destination) Validate(ctx context.Context) *apis.FieldError {
if dest == nil {
return nil
}
return ValidateDestination(*dest).ViaField(apis.CurrentField)
}
// ValidateDestination validates Destination.
func ValidateDestination(dest Destination) *apis.FieldError {
var ref *corev1.ObjectReference
if dest.Ref != nil {
ref = dest.Ref
}
if ref == nil && dest.URI == nil {
return apis.ErrGeneric("expected at least one, got none", "ref", "uri")
}
if ref != nil && dest.URI != nil && dest.URI.URL().IsAbs() {
return apis.ErrGeneric("Absolute URI is not allowed when Ref or [apiVersion, kind, name] is present", "[apiVersion, kind, name]", "ref", "uri")
}
// IsAbs() check whether the URL has a non-empty scheme. Besides the non-empty scheme, we also require dest.URI has a non-empty host
if ref == nil && dest.URI != nil && (!dest.URI.URL().IsAbs() || dest.URI.Host == "") {
return apis.ErrInvalidValue("Relative URI is not allowed when Ref and [apiVersion, kind, name] is absent", "uri")
}
if ref != nil && dest.URI == nil {
if dest.Ref != nil {
return validateDestinationRef(*ref).ViaField("ref")
}
}
return nil
}
// GetRef gets the ObjectReference from this Destination, if one is present. If no ref is present,
// then nil is returned.
func (dest *Destination) GetRef() *corev1.ObjectReference {
if dest == nil {
return nil
}
return dest.Ref
}
func validateDestinationRef(ref corev1.ObjectReference) *apis.FieldError {
// Check the object.
var errs *apis.FieldError
// Required Fields
if ref.Name == "" {
errs = errs.Also(apis.ErrMissingField("name"))
}
if ref.APIVersion == "" {
errs = errs.Also(apis.ErrMissingField("apiVersion"))
}
if ref.Kind == "" {
errs = errs.Also(apis.ErrMissingField("kind"))
}
return errs
}

View File

@ -25,7 +25,6 @@ import (
"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck"
apisv1alpha1 "knative.dev/pkg/apis/v1alpha1"
)
// Source is an Implementable "duck type".
@ -51,7 +50,7 @@ type Source struct {
type SourceSpec struct {
// Sink is a reference to an object that will resolve to a domain name or a
// URI directly to use as the sink.
Sink apisv1alpha1.Destination `json:"sink,omitempty"`
Sink Destination `json:"sink,omitempty"`
// CloudEventOverrides defines overrides to control the output format and
// modifications of the event sent to the sink.
@ -117,7 +116,7 @@ func (*Source) GetFullType() duck.Populatable {
// Populate implements duck.Populatable
func (s *Source) Populate() {
s.Spec.Sink = apisv1alpha1.Destination{
s.Spec.Sink = Destination{
URI: &apis.URL{
Scheme: "https",
Host: "tableflip.dev",

View File

@ -36,7 +36,6 @@ var _ duck.Implementable = (*Conditions)(nil)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:openapi-gen=true
// KResource is a skeleton type wrapping Conditions in the manner we expect
// resource writers defining compatible resources to embed it. We will

View File

@ -21,6 +21,7 @@ limitations under the License.
package v1
import (
corev1 "k8s.io/api/core/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
apis "knative.dev/pkg/apis"
)
@ -172,6 +173,32 @@ func (in Conditions) DeepCopy() Conditions {
return *out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Destination) DeepCopyInto(out *Destination) {
*out = *in
if in.Ref != nil {
in, out := &in.Ref, &out.Ref
*out = new(corev1.ObjectReference)
**out = **in
}
if in.URI != nil {
in, out := &in.URI, &out.URI
*out = new(apis.URL)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Destination.
func (in *Destination) DeepCopy() *Destination {
if in == nil {
return nil
}
out := new(Destination)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *KResource) DeepCopyInto(out *KResource) {
*out = *in

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
package v1beta1
import (
"context"

View File

@ -25,7 +25,6 @@ import (
"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck"
apisv1alpha1 "knative.dev/pkg/apis/v1alpha1"
)
// Source is an Implementable "duck type".
@ -51,7 +50,7 @@ type Source struct {
type SourceSpec struct {
// Sink is a reference to an object that will resolve to a domain name or a
// URI directly to use as the sink.
Sink apisv1alpha1.Destination `json:"sink,omitempty"`
Sink Destination `json:"sink,omitempty"`
// CloudEventOverrides defines overrides to control the output format and
// modifications of the event sent to the sink.
@ -117,7 +116,7 @@ func (*Source) GetFullType() duck.Populatable {
// Populate implements duck.Populatable
func (s *Source) Populate() {
s.Spec.Sink = apisv1alpha1.Destination{
s.Spec.Sink = Destination{
URI: &apis.URL{
Scheme: "https",
Host: "tableflip.dev",

View File

@ -36,7 +36,6 @@ var _ duck.Implementable = (*Conditions)(nil)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:openapi-gen=true
// KResource is a skeleton type wrapping Conditions in the manner we expect
// resource writers defining compatible resources to embed it. We will

View File

@ -21,6 +21,7 @@ limitations under the License.
package v1beta1
import (
v1 "k8s.io/api/core/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
apis "knative.dev/pkg/apis"
)
@ -172,6 +173,32 @@ func (in Conditions) DeepCopy() Conditions {
return *out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Destination) DeepCopyInto(out *Destination) {
*out = *in
if in.Ref != nil {
in, out := &in.Ref, &out.Ref
*out = new(v1.ObjectReference)
**out = **in
}
if in.URI != nil {
in, out := &in.URI, &out.URI
*out = new(apis.URL)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Destination.
func (in *Destination) DeepCopy() *Destination {
if in == nil {
return nil
}
out := new(Destination)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *KResource) DeepCopyInto(out *KResource) {
*out = *in

View File

@ -1,18 +0,0 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +k8s:deepcopy-gen=package
package v1alpha1

View File

@ -22,7 +22,6 @@ import (
)
// VolatileTime wraps metav1.Time
// +k8s:openapi-gen=true
type VolatileTime struct {
Inner metav1.Time
}

View File

@ -1,151 +0,0 @@
# Knative CloudEvents SDK
This library produces CloudEvents in version 0.1 compatible form. To learn more
about CloudEvents, see the [Specification](https://github.com/cloudevents/spec).
There are two roles the SDK fulfills: the [producer](#producer) and the
[consumer](#consumer). The producer creates a cloud event in either
[Binary](#binary) or [Structured](#structured) request format. The producer
assembles and sends the event through an HTTP endpoint. The consumer will
inspect the incoming HTTP request and select the correct decode format.
This SDK should be wire-compatible with any other producer or consumer of the
supported versions of CloudEvents.
## Getting Started
CloudEvents acts as the envelope in which to send a custom object. Define a
CloudEvent type for the events you will be producing.
Example CloudEvent Type: `dev.knative.cloudevent.example`
Select a source to identify the originator of this CloudEvent. It should be a
valid URI which represents the subject which created the CloudEvent (cloud
bucket, git repo, etc).
Example CloudEvent Source: `https://github.com/knative/pkg#cloudevents-example`
And finally, create a struct that will be the data inside the CloudEvent,
example:
```go
type Example struct {
Sequence int `json:"id"`
Message string `json:"message"`
}
```
### Producer
The producer creates a new `cloudevent.Client,` and then sends 10 `Example`
events to `"http://localhost:8080"`.
```go
package main
import (
"knative.dev/pkg/cloudevents"
"log"
)
type Example struct {
Sequence int `json:"id"`
Message string `json:"message"`
}
func main() {
c := cloudevents.NewClient(
"http://localhost:8080",
cloudevents.Builder{
Source: "https://github.com/knative/pkg#cloudevents-example",
EventType: "dev.knative.cloudevent.example",
},
)
for i := 0; i < 10; i++ {
data := Example{
Message: "hello, world!",
Sequence: i,
}
if err := c.Send(data); err != nil {
log.Printf("error sending: %v", err)
}
}
}
```
### Consumer
The consumer will listen for a post and then inspect the headers to understand
how to decode the request.
```go
package main
import (
"context"
"log"
"net/http"
"time"
"knative.dev/pkg/cloudevents"
)
type Example struct {
Sequence int `json:"id"`
Message string `json:"message"`
}
func handler(ctx context.Context, data *Example) {
metadata := cloudevents.FromContext(ctx)
log.Printf("[%s] %s %s: %d, %q", metadata.EventTime.Format(time.RFC3339), metadata.ContentType, metadata.Source, data.Sequence, data.Message)
}
func main() {
log.Print("listening on port 8080")
log.Fatal(http.ListenAndServe(":8080", cloudevents.Handler(handler)))
}
```
## Request Formats
### CloudEvents Version 0.1
#### Binary
This is default, but to leverage binary request format:
```go
c := cloudevents.NewClient(
"http://localhost:8080",
cloudevents.Builder{
Source: "https://github.com/knative/pkg#cloudevents-example",
EventType: "dev.knative.cloudevent.example",
Encoding: cloudevents.BinaryV01,
},
)
```
#### Structured
To leverage structured request format:
```go
c := cloudevents.NewClient(
"http://localhost:8080",
cloudevents.Builder{
Source: "https://github.com/knative/pkg#cloudevents-example",
EventType: "dev.knative.cloudevent.example",
Encoding: cloudevents.StructuredV01,
},
)
```

View File

@ -1,135 +0,0 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudevents
import (
"fmt"
"net/http"
"time"
"github.com/google/uuid"
)
// CloudEventEncoding is used to tell the builder which encoding to select.
// the default is Binary.
type CloudEventEncoding int
const (
// Binary v0.1
BinaryV01 CloudEventEncoding = iota
// Structured v0.1
StructuredV01
)
// Builder holds settings that do not change over CloudEvents. It is intended
// to represent a builder of only a single CloudEvent type.
type Builder struct {
// A URI describing the event producer.
Source string
// Type of occurrence which has happened.
EventType string
// The version of the `eventType`; this is producer-specific.
EventTypeVersion string
// A link to the schema that the `data` attribute adheres to.
SchemaURL string
// Additional metadata without a well-defined structure.
Extensions map[string]interface{}
// Encoding specifies the requested output encoding of the CloudEvent.
Encoding CloudEventEncoding
}
// Build produces a http request with the constant data embedded in the builder
// merged with the new data provided in the build function. The request will
// send a pre-assembled cloud event to the given target. The target is assumed
// to be a URL with a scheme, ie: "http://localhost:8080"
func (b *Builder) Build(target string, data interface{}, overrides ...SendContext) (*http.Request, error) {
if len(overrides) > 1 {
return nil, fmt.Errorf("Build was called with more than one override")
}
var overridesV01 *V01EventContext
if len(overrides) == 1 {
switch t := overrides[0].(type) {
case V01EventContext:
o := overrides[0].(V01EventContext)
overridesV01 = &o
default:
return nil, fmt.Errorf("Build was called with unknown override type %v", t)
}
}
// TODO: when V02 is supported this will have to shuffle a little.
ctx := b.cloudEventsContextV01(overridesV01)
if ctx.Source == "" {
return nil, fmt.Errorf("ctx.Source resolved empty")
}
if ctx.EventType == "" {
return nil, fmt.Errorf("ctx.EventType resolved empty")
}
switch b.Encoding {
case BinaryV01:
return Binary.NewRequest(target, data, ctx)
case StructuredV01:
return Structured.NewRequest(target, data, ctx)
default:
return nil, fmt.Errorf("unsupported encoding: %v", b.Encoding)
}
}
// cloudEventsContext creates a CloudEvent context object, assumes
// application/json as the content type.
func (b *Builder) cloudEventsContextV01(overrides *V01EventContext) V01EventContext {
ctx := V01EventContext{
CloudEventsVersion: CloudEventsVersion,
EventType: b.EventType,
EventID: uuid.New().String(),
EventTypeVersion: b.EventTypeVersion,
SchemaURL: b.SchemaURL,
Source: b.Source,
ContentType: "application/json",
EventTime: time.Now(),
Extensions: b.Extensions,
}
if overrides != nil {
if overrides.Source != "" {
ctx.Source = overrides.Source
}
if overrides.EventID != "" {
ctx.EventID = overrides.EventID
}
if overrides.EventType != "" {
ctx.EventType = overrides.EventType
}
if !overrides.EventTime.IsZero() {
ctx.EventTime = overrides.EventTime
}
if overrides.ContentType != "" {
ctx.ContentType = overrides.ContentType
}
if len(overrides.Extensions) > 0 {
if ctx.Extensions == nil {
ctx.Extensions = make(map[string]interface{})
}
for k, v := range overrides.Extensions {
ctx.Extensions[k] = v
}
}
}
return ctx
}

View File

@ -1,81 +0,0 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudevents
import (
"fmt"
"io/ioutil"
"net/http"
)
// Client wraps Builder, and is intended to be configured for a single event
// type and target
type Client struct {
builder Builder
Target string
}
// NewClient returns a CloudEvent Client used to send CloudEvents. It is
// intended that a user would create a new client for each tuple of eventType
// and target. This is an optional helper method to avoid the tricky creation
// of the embedded Builder struct.
func NewClient(target string, builder Builder) *Client {
c := &Client{
builder: builder,
Target: target,
}
return c
}
// Send creates a request based on the client's settings and sends the data
// struct to the target set for this client. It returns error if there was an
// issue sending the event, otherwise nil means the event was accepted.
func (c *Client) Send(data interface{}, overrides ...SendContext) error {
req, err := c.builder.Build(c.Target, data, overrides...)
if err != nil {
return err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if accepted(resp) {
return nil
}
return fmt.Errorf("error sending cloudevent: %s", status(resp))
}
// accepted is a helper method to understand if the response from the target
// accepted the CloudEvent.
func accepted(resp *http.Response) bool {
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return true
}
return false
}
// status is a helper method to read the response of the target.
func status(resp *http.Response) string {
status := resp.Status
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Sprintf("Status[%s] error reading response body: %v", status, err)
}
return fmt.Sprintf("Status[%s] %s", status, body)
}

View File

@ -1,22 +0,0 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package cloudevents implements utilities for handling CloudEvents.
// For information on the spec, see
// https://github.com/cloudevents/spec/blob/v0.1/http-transport-binding.md
// and
// https://github.com/cloudevents/spec/blob/v0.1/spec.md
package cloudevents

View File

@ -1,125 +0,0 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudevents
// TODO(inlined): must add header encoding/decoding
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/url"
)
const (
// HeaderCloudEventsVersion is the header for the version of Cloud Events
// used.
HeaderCloudEventsVersion = "CE-CloudEventsVersion"
// HeaderEventID is the header for the unique ID of this event.
HeaderEventID = "CE-EventID"
// HeaderEventTime is the OPTIONAL header for the time at which an event
// occurred.
HeaderEventTime = "CE-EventTime"
// HeaderEventType is the header for type of event represented. Value SHOULD
// be in reverse-dns form.
HeaderEventType = "CE-EventType"
// HeaderEventTypeVersion is the OPTIONAL header for the version of the
// scheme for the event type.
HeaderEventTypeVersion = "CE-EventTypeVersion"
// HeaderSchemaURL is the OPTIONAL header for the schema of the event data.
HeaderSchemaURL = "CE-SchemaURL"
// HeaderSource is the header for the source which emitted this event.
HeaderSource = "CE-Source"
// HeaderExtensionsPrefix is the OPTIONAL header prefix for CloudEvents extensions
HeaderExtensionsPrefix = "CE-X-"
// Binary implements Binary encoding/decoding
Binary binary = 0
)
type binary int
// BinarySender implements an interface for sending an EventContext as
// (possibly one of several versions) as a binary encoding HTTP request.
type BinarySender interface {
// AsHeaders converts this EventContext to a set of HTTP headers.
AsHeaders() (http.Header, error)
}
// BinaryLoader implements an interface for translating a binary encoding HTTP
// request or response to a an EventContext (possibly one of several versions).
type BinaryLoader interface {
// FromHeaders copies data from the supplied HTTP headers into the object.
// Values will be defaulted if necessary.
FromHeaders(in http.Header) error
}
// FromRequest parses event data and context from an HTTP request.
func (binary) FromRequest(data interface{}, r *http.Request) (LoadContext, error) {
var ec LoadContext
switch {
case r.Header.Get("CE-SpecVersion") == V02CloudEventsVersion:
ec = &V02EventContext{}
case r.Header.Get("CE-CloudEventsVersion") == V01CloudEventsVersion:
ec = &V01EventContext{}
default:
return nil, fmt.Errorf("Could not determine Cloud Events version from header: %+v", r.Header)
}
if err := ec.FromHeaders(r.Header); err != nil {
return nil, err
}
if err := unmarshalEventData(ec.DataContentType(), r.Body, data); err != nil {
return nil, err
}
return ec, nil
}
// NewRequest creates an HTTP request for Binary content encoding.
func (t binary) NewRequest(urlString string, data interface{}, context SendContext) (*http.Request, error) {
url, err := url.Parse(urlString)
if err != nil {
return nil, err
}
h, err := context.AsHeaders()
if err != nil {
return nil, err
}
b, err := marshalEventData(h.Get("Content-Type"), data)
if err != nil {
return nil, err
}
return &http.Request{
Method: http.MethodPost,
URL: url,
Header: h,
Body: ioutil.NopCloser(bytes.NewReader(b)),
}, nil
}

View File

@ -1,143 +0,0 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudevents
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
)
const (
// Structured implements the JSON structured encoding/decoding
Structured structured = 0
)
type structured int
// StructuredSender implements an interface for translating an EventContext
// (possibly one of severals versions) to a structured encoding HTTP request.
type StructuredSender interface {
// AsJSON encodes the object into a map from string to JSON data, which
// allows additional keys to be encoded later.
AsJSON() (map[string]json.RawMessage, error)
}
// StructuredLoader implements an interface for translating a structured
// encoding HTTP request or response to a an EventContext (possibly one of
// several versions).
type StructuredLoader interface {
// FromJSON assumes that the object has already been decoded into a raw map
// from string to json.RawMessage, because this is needed to extract the
// CloudEvents version.
FromJSON(map[string]json.RawMessage) error
}
// FromRequest parses a CloudEvent from structured content encoding.
func (structured) FromRequest(data interface{}, r *http.Request) (LoadContext, error) {
raw := make(map[string]json.RawMessage)
if err := json.NewDecoder(r.Body).Decode(&raw); err != nil {
return nil, err
}
rawData := raw["data"]
delete(raw, "data")
var ec LoadContext
v := ""
if err := json.Unmarshal(raw["specversion"], &v); err == nil && v == V02CloudEventsVersion {
ec = &V02EventContext{}
} else if err := json.Unmarshal(raw["cloudEventsVersion"], &v); err == nil && v == V01CloudEventsVersion {
ec = &V01EventContext{}
} else {
return nil, fmt.Errorf("Could not determine Cloud Events version from payload: %q", data)
}
if err := ec.FromJSON(raw); err != nil {
return nil, err
}
contentType := ec.DataContentType()
if contentType == "" {
contentType = contentTypeJSON
}
var reader io.Reader
if !isJSONEncoding(contentType) {
var jsonDecoded string
if err := json.Unmarshal(rawData, &jsonDecoded); err != nil {
return nil, fmt.Errorf("Could not JSON decode %q value %q", contentType, rawData)
}
reader = strings.NewReader(jsonDecoded)
} else {
reader = bytes.NewReader(rawData)
}
if err := unmarshalEventData(contentType, reader, data); err != nil {
return nil, err
}
return ec, nil
}
// NewRequest creates an HTTP request for Structured content encoding.
func (structured) NewRequest(urlString string, data interface{}, context SendContext) (*http.Request, error) {
url, err := url.Parse(urlString)
if err != nil {
return nil, err
}
fields, err := context.AsJSON()
if err != nil {
return nil, err
}
// TODO: remove this defaulting?
contentType := context.DataContentType()
if contentType == "" {
contentType = contentTypeJSON
}
dataBytes, err := marshalEventData(contentType, data)
if err != nil {
return nil, err
}
if isJSONEncoding(contentType) {
fields["data"] = json.RawMessage(dataBytes)
} else {
fields["data"], err = json.Marshal(string(dataBytes))
if err != nil {
return nil, err
}
}
b, err := json.Marshal(fields)
if err != nil {
return nil, err
}
h := http.Header{}
h.Set(HeaderContentType, ContentTypeStructuredJSON)
return &http.Request{
Method: http.MethodPost,
URL: url,
Header: h,
Body: ioutil.NopCloser(bytes.NewReader(b)),
}, nil
}

View File

@ -1,205 +0,0 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudevents
import (
"context"
"encoding/json"
"encoding/xml"
"fmt"
"io"
"net/http"
"reflect"
)
const (
// ContentTypeStructuredJSON is the content-type for "Structured" encoding
// where an event envelope is written in JSON and the body is arbitrary
// data which might be an alternate encoding.
ContentTypeStructuredJSON = "application/cloudevents+json"
// ContentTypeBinaryJSON is the content-type for "Binary" encoding where
// the event context is in HTTP headers and the body is a JSON event data.
ContentTypeBinaryJSON = "application/json"
// TODO(inlined) what about charset additions?
contentTypeJSON = "application/json"
contentTypeXML = "application/xml"
// HeaderContentType is the standard HTTP header "Content-Type"
HeaderContentType = "Content-Type"
// CloudEventsVersion is a legacy alias of V01CloudEventsVersion, for compatibility.
CloudEventsVersion = V01CloudEventsVersion
)
// EventContext is a legacy un-versioned alias, from when we thought that field names would stay the same.
type EventContext = V01EventContext
// HTTPMarshaller implements a scheme for decoding CloudEvents over HTTP.
// Implementations are Binary, Structured, and Any
type HTTPMarshaller interface {
FromRequest(data interface{}, r *http.Request) (LoadContext, error)
NewRequest(urlString string, data interface{}, context SendContext) (*http.Request, error)
}
// ContextTranslator provides a set of translation methods between the
// different versions of the CloudEvents spec, which allows programs to
// interoperate with different versions of the CloudEvents spec by
// converting EventContexts to their preferred version.
type ContextTranslator interface {
// AsV01 provides a translation from whatever the "native" encoding of the
// CloudEvent was to the equivalent in v0.1 field names, moving fields to or
// from extensions as necessary.
AsV01() V01EventContext
// AsV02 provides a translation from whatever the "native" encoding of the
// CloudEvent was to the equivalent in v0.2 field names, moving fields to or
// from extensions as necessary.
AsV02() V02EventContext
// DataContentType returns the MIME content type for encoding data, which is
// needed by both encoding and decoding.
DataContentType() string
}
// SendContext provides an interface for extracting information from an
// EventContext (the set of non-data event attributes of a CloudEvent).
type SendContext interface {
ContextTranslator
StructuredSender
BinarySender
}
// LoadContext provides an interface for extracting information from an
// EventContext (the set of non-data event attributes of a CloudEvent).
type LoadContext interface {
ContextTranslator
StructuredLoader
BinaryLoader
}
// ContextType is a unified interface for both sending and loading the
// CloudEvent data across versions.
type ContextType interface {
ContextTranslator
StructuredSender
BinarySender
StructuredLoader
BinaryLoader
}
func anyError(errs ...error) error {
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}
// The Cloud-Events spec allows two forms of JSON encoding:
// 1. The overall message (Structured JSON encoding)
// 2. Just the event data, where the context will be in HTTP headers instead
//
// Case #1 actually includes case #2. In structured binary encoding the JSON
// HTTP body itself allows for cross-encoding of the "data" field.
// This method is only intended for checking that inner JSON encoding type.
func isJSONEncoding(encoding string) bool {
return encoding == contentTypeJSON || encoding == "text/json"
}
func isXMLEncoding(encoding string) bool {
return encoding == contentTypeXML || encoding == "text/xml"
}
func unmarshalEventData(encoding string, reader io.Reader, data interface{}) error {
// The Handler tools allow developers to not ask for event data;
// in this case, just don't unmarshal anything
if data == nil {
return nil
}
// If someone tried to marshal an event into an io.Reader, just assign our existing reader.
// (This is used by event.Mux to determine which type to unmarshal as)
readerPtrType := reflect.TypeOf((*io.Reader)(nil))
if reflect.TypeOf(data).ConvertibleTo(readerPtrType) {
reflect.ValueOf(data).Elem().Set(reflect.ValueOf(reader))
return nil
}
if isJSONEncoding(encoding) || encoding == "" {
return json.NewDecoder(reader).Decode(&data)
}
if isXMLEncoding(encoding) {
return xml.NewDecoder(reader).Decode(&data)
}
return fmt.Errorf("cannot decode content type %q", encoding)
}
func marshalEventData(encoding string, data interface{}) ([]byte, error) {
var b []byte
var err error
if isJSONEncoding(encoding) {
b, err = json.Marshal(data)
} else if isXMLEncoding(encoding) {
b, err = xml.Marshal(data)
} else {
err = fmt.Errorf("cannot encode content type %q", encoding)
}
if err != nil {
return nil, err
}
return b, nil
}
// FromRequest parses a CloudEvent from any known encoding.
func FromRequest(data interface{}, r *http.Request) (LoadContext, error) {
switch r.Header.Get(HeaderContentType) {
case ContentTypeStructuredJSON:
return Structured.FromRequest(data, r)
case ContentTypeBinaryJSON:
return Binary.FromRequest(data, r)
default:
// TODO: assume binary content mode
// (https://github.com/cloudevents/spec/blob/v0.1/http-transport-binding.md#3-http-message-mapping)
// and that data is ??? (io.Reader?, byte array?)
return nil, fmt.Errorf("Cannot handle encoding %q", r.Header.Get("Content-Type"))
}
}
// NewRequest craetes an HTTP request for Structured content encoding.
func NewRequest(urlString string, data interface{}, context SendContext) (*http.Request, error) {
return Structured.NewRequest(urlString, data, context)
}
// Opaque key type used to store V01EventContexts in a context.Context
type contextKeyType struct{}
var contextKey = contextKeyType{}
// FromContext loads an V01EventContext from a normal context.Context
func FromContext(ctx context.Context) LoadContext {
return ctx.Value(contextKey).(LoadContext)
}

View File

@ -1,236 +0,0 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudevents
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
)
const (
// V01CloudEventsVersion is the version of the CloudEvents spec targeted
// by this library.
V01CloudEventsVersion = "0.1"
// v0.1 field names
fieldCloudEventsVersion = "CloudEventsVersion"
fieldEventID = "EventID"
fieldEventType = "EventType"
)
// V01EventContext holds standard metadata about an event. See
// https://github.com/cloudevents/spec/blob/v0.1/spec.md#context-attributes for
// details on these fields.
type V01EventContext struct {
// The version of the CloudEvents specification used by the event.
CloudEventsVersion string `json:"cloudEventsVersion,omitempty"`
// ID of the event; must be non-empty and unique within the scope of the producer.
EventID string `json:"eventID"`
// Timestamp when the event happened.
EventTime time.Time `json:"eventTime,omitempty"`
// Type of occurrence which has happened.
EventType string `json:"eventType"`
// The version of the `eventType`; this is producer-specific.
EventTypeVersion string `json:"eventTypeVersion,omitempty"`
// A link to the schema that the `data` attribute adheres to.
SchemaURL string `json:"schemaURL,omitempty"`
// A MIME (RFC 2046) string describing the media type of `data`.
// TODO: Should an empty string assume `application/json`, or auto-detect the content?
ContentType string `json:"contentType,omitempty"`
// A URI describing the event producer.
Source string `json:"source"`
// Additional metadata without a well-defined structure.
Extensions map[string]interface{} `json:"extensions,omitempty"`
}
// AsV01 implements the ContextTranslator interface.
func (ec V01EventContext) AsV01() V01EventContext {
return ec
}
// AsV02 implements the ContextTranslator interface.
func (ec V01EventContext) AsV02() V02EventContext {
ret := V02EventContext{
SpecVersion: V02CloudEventsVersion,
Type: ec.EventType,
Source: ec.Source,
ID: ec.EventID,
Time: ec.EventTime,
SchemaURL: ec.SchemaURL,
ContentType: ec.ContentType,
Extensions: make(map[string]interface{}),
}
// eventTypeVersion was retired in v0.2, so put it in an extension.
if ec.EventTypeVersion != "" {
ret.Extensions["eventtypeversion"] = ec.EventTypeVersion
}
for k, v := range ec.Extensions {
ret.Extensions[k] = v
}
return ret
}
// AsHeaders implements the BinarySender interface.
func (ec V01EventContext) AsHeaders() (http.Header, error) {
h := http.Header{}
h.Set("CE-CloudEventsVersion", ec.CloudEventsVersion)
h.Set("CE-EventID", ec.EventID)
h.Set("CE-EventType", ec.EventType)
h.Set("CE-Source", ec.Source)
if ec.CloudEventsVersion == "" {
h.Set("CE-CloudEventsVersion", V01CloudEventsVersion)
}
if !ec.EventTime.IsZero() {
h.Set("CE-EventTime", ec.EventTime.Format(time.RFC3339Nano))
}
if ec.EventTypeVersion != "" {
h.Set("CE-EventTypeVersion", ec.EventTypeVersion)
}
if ec.SchemaURL != "" {
h.Set("CE-SchemaUrl", ec.SchemaURL)
}
if ec.ContentType != "" {
h.Set("Content-Type", ec.ContentType)
}
for k, v := range ec.Extensions {
encoded, err := json.Marshal(v)
if err != nil {
return nil, err
}
// Preserve case in v0.1, even though HTTP headers are case-insensitive.
h["CE-X-"+k] = []string{string(encoded)}
}
return h, nil
}
// FromHeaders implements the BinaryLoader interface.
func (ec *V01EventContext) FromHeaders(in http.Header) error {
missingField := func(name string) error {
if in.Get("CE-"+name) == "" {
return fmt.Errorf("Missing field %q in %v: %q", "CE-"+name, in, in.Get("CE-"+name))
}
return nil
}
if err := anyError(
missingField("CloudEventsVersion"),
missingField("EventID"),
missingField("EventType"),
missingField("Source")); err != nil {
return err
}
data := V01EventContext{
CloudEventsVersion: in.Get("CE-CloudEventsVersion"),
EventID: in.Get("CE-EventID"),
EventType: in.Get("CE-EventType"),
EventTypeVersion: in.Get("CE-EventTypeVersion"),
SchemaURL: in.Get("CE-SchemaURL"),
ContentType: in.Get("Content-Type"),
Source: in.Get("CE-Source"),
Extensions: make(map[string]interface{}),
}
if timeStr := in.Get("CE-EventTime"); timeStr != "" {
var err error
if data.EventTime, err = time.Parse(time.RFC3339Nano, timeStr); err != nil {
return err
}
}
for k, v := range in {
if strings.EqualFold(k[:len("CE-X-")], "CE-X-") {
key := k[len("CE-X-"):]
var tmp interface{}
if err := json.Unmarshal([]byte(v[0]), &tmp); err == nil {
data.Extensions[key] = tmp
} else {
// If we can't unmarshal the data, treat it as a string.
data.Extensions[key] = v[0]
}
}
}
*ec = data
return nil
}
// AsJSON implements the StructuredSender interface.
func (ec V01EventContext) AsJSON() (map[string]json.RawMessage, error) {
ret := make(map[string]json.RawMessage)
err := anyError(
encodeKey(ret, "cloudEventsVersion", ec.CloudEventsVersion),
encodeKey(ret, "eventID", ec.EventID),
encodeKey(ret, "eventTime", ec.EventTime),
encodeKey(ret, "eventType", ec.EventType),
encodeKey(ret, "eventTypeVersion", ec.EventTypeVersion),
encodeKey(ret, "schemaURL", ec.SchemaURL),
encodeKey(ret, "contentType", ec.ContentType),
encodeKey(ret, "source", ec.Source),
encodeKey(ret, "extensions", ec.Extensions))
return ret, err
}
// DataContentType implements the StructuredSender interface.
func (ec V01EventContext) DataContentType() string {
return ec.ContentType
}
// FromJSON implements the StructuredLoader interface.
func (ec *V01EventContext) FromJSON(in map[string]json.RawMessage) error {
data := V01EventContext{
CloudEventsVersion: extractKey(in, "cloudEventsVersion"),
EventID: extractKey(in, "eventID"),
EventType: extractKey(in, "eventType"),
Source: extractKey(in, "source"),
}
var err error
if timeStr := extractKey(in, "eventTime"); timeStr != "" {
if data.EventTime, err = time.Parse(time.RFC3339Nano, timeStr); err != nil {
return err
}
}
extractKeyTo(in, "eventTypeVersion", &data.EventTypeVersion)
extractKeyTo(in, "schemaURL", &data.SchemaURL)
extractKeyTo(in, "contentType", &data.ContentType)
if len(in["extensions"]) == 0 {
in["extensions"] = []byte("{}")
}
if err = json.Unmarshal(in["extensions"], &data.Extensions); err != nil {
return err
}
*ec = data
return nil
}
func encodeKey(out map[string]json.RawMessage, key string, value interface{}) (err error) {
if s, ok := value.(string); ok && s == "" {
// Skip empty strings.
return nil
}
out[key], err = json.Marshal(value)
return
}
func extractKey(in map[string]json.RawMessage, key string) (s string) {
extractKeyTo(in, key, &s)
return
}
func extractKeyTo(in map[string]json.RawMessage, key string, out *string) error {
tmp := in[key]
delete(in, key)
return json.Unmarshal(tmp, out)
}

View File

@ -1,261 +0,0 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudevents
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
)
const (
// V02CloudEventsVersion is the version of the CloudEvents spec targeted
// by this library.
V02CloudEventsVersion = "0.2"
// required attributes
fieldSpecVersion = "specversion"
fieldID = "id"
fieldType = "type"
fieldSource = "source"
fieldTime = "time"
fieldSchemaURL = "schemaurl"
fieldContentType = "contenttype"
headerContentType = "Content-Type"
)
// V02EventContext represents the non-data attributes of a CloudEvents v0.2
// event.
type V02EventContext struct {
// The version of the CloudEvents specification used by the event.
SpecVersion string `json:"specversion"`
// The type of the occurrence which has happened.
Type string `json:"type"`
// A URI describing the event producer.
Source string `json:"source"`
// ID of the event; must be non-empty and unique within the scope of the producer.
ID string `json:"id"`
// Timestamp when the event happened.
Time time.Time `json:"time,omitempty"`
// A link to the schema that the `data` attribute adheres to.
SchemaURL string `json:"schemaurl,omitempty"`
// A MIME (RFC2046) string describing the media type of `data`.
// TODO: Should an empty string assume `application/json`, `application/octet-stream`, or auto-detect the content?
ContentType string `json:"contenttype,omitempty"`
// Additional extension metadata beyond the base spec.
Extensions map[string]interface{} `json:"-,omitempty"`
}
// AsV01 implements the ContextTranslator interface.
func (ec V02EventContext) AsV01() V01EventContext {
ret := V01EventContext{
CloudEventsVersion: V01CloudEventsVersion,
EventID: ec.ID,
EventTime: ec.Time,
EventType: ec.Type,
SchemaURL: ec.SchemaURL,
ContentType: ec.ContentType,
Source: ec.Source,
Extensions: make(map[string]interface{}),
}
for k, v := range ec.Extensions {
// eventTypeVersion was retired in v0.2
if strings.EqualFold(k, "eventTypeVersion") {
etv, ok := v.(string)
if ok {
ret.EventTypeVersion = etv
}
continue
}
ret.Extensions[k] = v
}
return ret
}
// AsV02 implements the ContextTranslator interface.
func (ec V02EventContext) AsV02() V02EventContext {
return ec
}
// AsHeaders implements the BinarySender interface.
func (ec V02EventContext) AsHeaders() (http.Header, error) {
h := http.Header{}
h.Set("CE-"+fieldSpecVersion, ec.SpecVersion)
h.Set("CE-"+fieldType, ec.Type)
h.Set("CE-"+fieldSource, ec.Source)
h.Set("CE-"+fieldID, ec.ID)
if ec.SpecVersion == "" {
h.Set("CE-"+fieldSpecVersion, V02CloudEventsVersion)
}
if !ec.Time.IsZero() {
h.Set("CE-"+fieldTime, ec.Time.Format(time.RFC3339Nano))
}
if ec.SchemaURL != "" {
h.Set("CE-"+fieldSchemaURL, ec.SchemaURL)
}
if ec.ContentType != "" {
h.Set(headerContentType, ec.ContentType)
}
for k, v := range ec.Extensions {
// Per spec, map-valued extensions are converted to a list of headers as:
// CE-attrib-key
if mapVal, ok := v.(map[string]interface{}); ok {
for subkey, subval := range mapVal {
encoded, err := json.Marshal(subval)
if err != nil {
return nil, err
}
h.Set("CE-"+k+"-"+subkey, string(encoded))
}
continue
}
encoded, err := json.Marshal(v)
if err != nil {
return nil, err
}
h.Set("CE-"+k, string(encoded))
}
return h, nil
}
// FromHeaders implements the BinaryLoader interface.
func (ec *V02EventContext) FromHeaders(in http.Header) error {
missingField := func(name string) error {
if in.Get("CE-"+name) == "" {
return fmt.Errorf("Missing field %q in %v: %q", "CE-"+name, in, in.Get("CE-"+name))
}
return nil
}
err := anyError(
missingField(fieldSpecVersion),
missingField(fieldID),
missingField(fieldType),
missingField(fieldSource),
)
if err != nil {
return err
}
data := V02EventContext{
ContentType: in.Get(headerContentType),
Extensions: make(map[string]interface{}),
}
// Extensions and top-level fields are mixed under "CE-" headers.
// Extract them all here rather than trying to clear fields in headers.
for k, v := range in {
if strings.EqualFold(k[:len("CE-")], "CE-") {
key, value := strings.ToLower(string(k[len("CE-"):])), v[0]
switch key {
case fieldSpecVersion:
data.SpecVersion = value
case fieldType:
data.Type = value
case fieldSource:
data.Source = value
case fieldID:
data.ID = value
case fieldSchemaURL:
data.SchemaURL = value
case fieldTime:
if data.Time, err = time.Parse(time.RFC3339Nano, value); err != nil {
return err
}
default:
var tmp interface{}
if err = json.Unmarshal([]byte(value), &tmp); err != nil {
tmp = value
}
// Per spec, map-valued extensions are converted to a list of headers as:
// CE-attrib-key. This is where things get a bit crazy... see
// https://github.com/cloudevents/spec/issues/367 for additional notes.
if strings.Contains(key, "-") {
items := strings.SplitN(key, "-", 2)
key, subkey := items[0], items[1]
if _, ok := data.Extensions[key]; !ok {
data.Extensions[key] = make(map[string]interface{})
}
if submap, ok := data.Extensions[key].(map[string]interface{}); ok {
submap[subkey] = tmp
}
} else {
data.Extensions[key] = tmp
}
}
}
}
*ec = data
return nil
}
// AsJSON implementsn the StructuredSender interface.
func (ec V02EventContext) AsJSON() (map[string]json.RawMessage, error) {
ret := make(map[string]json.RawMessage)
err := anyError(
encodeKey(ret, fieldSpecVersion, ec.SpecVersion),
encodeKey(ret, fieldType, ec.Type),
encodeKey(ret, fieldSource, ec.Source),
encodeKey(ret, fieldID, ec.ID),
encodeKey(ret, fieldTime, ec.Time),
encodeKey(ret, fieldSchemaURL, ec.SchemaURL),
encodeKey(ret, fieldContentType, ec.ContentType),
)
if err != nil {
return nil, err
}
for k, v := range ec.Extensions {
if err = encodeKey(ret, k, v); err != nil {
return nil, err
}
}
return ret, nil
}
// DataContentType implements the StructuredSender interface.
func (ec V02EventContext) DataContentType() string {
return ec.ContentType
}
// FromJSON implements the StructuredLoader interface.
func (ec *V02EventContext) FromJSON(in map[string]json.RawMessage) error {
data := V02EventContext{
SpecVersion: extractKey(in, fieldSpecVersion),
Type: extractKey(in, fieldType),
Source: extractKey(in, fieldSource),
ID: extractKey(in, fieldID),
Extensions: make(map[string]interface{}),
}
var err error
if timeStr := extractKey(in, fieldTime); timeStr != "" {
if data.Time, err = time.Parse(time.RFC3339Nano, timeStr); err != nil {
return err
}
}
extractKeyTo(in, fieldSchemaURL, &data.SchemaURL)
extractKeyTo(in, fieldContentType, &data.ContentType)
// Extract the remaining items from in by converting to JSON and then
// unpacking into Extensions. This avoids having to do funny type
// checking/testing in the loop over values.
extensionsJSON, err := json.Marshal(in)
if err != nil {
return err
}
err = json.Unmarshal(extensionsJSON, &data.Extensions)
*ec = data
return err
}

View File

@ -1,401 +0,0 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudevents
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"reflect"
"strings"
"github.com/davecgh/go-spew/spew"
"github.com/google/uuid"
)
type handler struct {
numIn int
fnValue reflect.Value
dataType reflect.Type
}
type failedHandler struct {
err error
}
type errAndHandler interface {
http.Handler
error
}
const (
inParamUsage = "Expected a function taking either no parameters, a context.Context, or (context.Context, any)"
outParamUsage = "Expected a function returning either nothing, an error, (any, error), or (any, SendContext, error)"
)
var (
// FYI: Getting the type of an interface is a bit hard in Go because of nil is special:
// 1. Structs & pointers have concrete types, whereas interfaces are actually tuples of
// [implementation vtable, pointer].
// 2. Literals (such as nil) can be cast to any relevant type.
// Because TypeOf takes an interface{}, a nil interface reference would cast lossily when
// it leaves this stack frame. The workaround is to pass a pointer to an interface and then
// get the type of its reference.
// For example, see: https://play.golang.org/p/_dxLvdkvqvg
contextType = reflect.TypeOf((*context.Context)(nil)).Elem()
errorType = reflect.TypeOf((*error)(nil)).Elem()
sendContextType = reflect.TypeOf((*SendContext)(nil)).Elem()
)
// Verifies that the inputs to a function have a valid signature; panics otherwise.
// Valid input signatures:
// (), (context.Context), (context.Context, any)
func validateInParamSignature(fnType reflect.Type) error {
switch fnType.NumIn() {
case 2:
fallthrough
case 1:
if !fnType.In(0).ConvertibleTo(contextType) {
return fmt.Errorf("%s; cannot convert parameter 0 from %s to context.Context", inParamUsage, fnType.In(0))
}
fallthrough
case 0:
return nil
default:
return fmt.Errorf("%s; function has too many parameters (%d)", inParamUsage, fnType.NumIn())
}
}
// Verifies that the outputs of a function have a valid signature; panics otherwise.
// Valid output signatures:
// (), (error), (any, error)
func validateOutParamSignature(fnType reflect.Type) error {
switch fnType.NumOut() {
case 3:
contextType := fnType.Out(1)
if !contextType.ConvertibleTo(sendContextType) {
return fmt.Errorf("%s; cannot convert return type 1 from %s to SendContext", outParamUsage, contextType)
}
fallthrough
case 2:
fallthrough
case 1:
paramNo := fnType.NumOut() - 1
paramType := fnType.Out(paramNo)
if !paramType.ConvertibleTo(errorType) {
return fmt.Errorf("%s; cannot convert return type %d from %s to error", outParamUsage, paramNo, paramType)
}
fallthrough
case 0:
return nil
default:
return fmt.Errorf("%s; function has too many return types (%d)", outParamUsage, fnType.NumOut())
}
}
// Verifies that a function has the right number of in and out params and that they are
// of allowed types. If successful, returns the expected in-param type, otherwise panics.
func validateFunction(fnType reflect.Type) errAndHandler {
if fnType.Kind() != reflect.Func {
return &failedHandler{err: errors.New("must pass a function to handle events")}
}
err := anyError(
validateInParamSignature(fnType),
validateOutParamSignature(fnType))
if err != nil {
return &failedHandler{err: err}
}
return nil
}
// Alocates a new instance of type t and returns:
// asPtr is of type t if t is a pointer type and of type &t otherwise (used for unmarshalling)
// asValue is a Value of type t pointing to the same data as asPtr
func allocate(t reflect.Type) (asPtr interface{}, asValue reflect.Value) {
if t == nil {
return nil, reflect.Value{}
}
if t.Kind() == reflect.Ptr {
reflectPtr := reflect.New(t.Elem())
asPtr = reflectPtr.Interface()
asValue = reflectPtr
} else {
reflectPtr := reflect.New(t)
asPtr = reflectPtr.Interface()
asValue = reflectPtr.Elem()
}
return
}
func unwrapReturnValues(res []reflect.Value) (interface{}, SendContext, error) {
switch len(res) {
case 0:
return nil, nil, nil
case 1:
if res[0].IsNil() {
return nil, nil, nil
}
// Should be a safe cast due to assertEventHandler()
return nil, nil, res[0].Interface().(error)
case 2:
if res[1].IsNil() {
return res[0].Interface(), nil, nil
}
// Should be a safe cast due to assertEventHandler()
return nil, nil, res[1].Interface().(error)
case 3:
if res[2].IsNil() {
ec := res[1].Interface().(SendContext)
return res[0].Interface(), ec, nil
}
return nil, nil, res[2].Interface().(error)
default:
// Should never happen due to assertEventHandler()
panic("Cannot unmarshal more than 3 return values")
}
}
// Accepts the results from a handler functions and translates them to an HTTP response
func respondHTTP(outparams []reflect.Value, fn reflect.Value, w http.ResponseWriter) {
res, ec, err := unwrapReturnValues(outparams)
if err != nil {
log.Print("Failed to handle event: ", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(`Internal server error`))
return
}
if ec == nil {
eventType := strings.Replace(fn.Type().PkgPath(), "/", ".", -1)
if eventType != "" {
eventType += "."
}
eventType += fn.Type().Name()
if eventType == "" {
eventType = "dev.knative.pkg.cloudevents.unknown"
}
ec = &V01EventContext{
EventID: uuid.New().String(),
EventType: eventType,
Source: "unknown", // TODO: anything useful here, maybe incoming Host header?
}
}
if res != nil {
json, err := json.Marshal(res)
if err != nil {
log.Printf("Failed to marshal return value %+v: %s", res, err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(`Internal server error`))
return
}
headers, err := ec.AsHeaders()
if err != nil {
log.Printf("Failed to marshal event context %+v: %s", res, err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Internal server error"))
return
}
for k, v := range headers {
w.Header()[k] = v
}
w.Write(json)
return
}
w.WriteHeader(http.StatusNoContent)
}
// Handler creates an EventHandler that implements http.Handler
// If the fn parameter is not a valid type, will produce an http.Handler that also conforms
// to error and will respond to all HTTP requests with that error. Valid types of fn are:
//
// * func()
// * func() error
// * func() (anything, error)
// * func() (anything, EventContext, error)
// * func(context.Context)
// * func(context.Context) error
// * func(context.Context) (anything, error)
// * func(context.Context) (anything, EventContext, error)
// * func(context.Context, anything)
// * func(context.Context, anything) error
// * func(context.Context, anything) (anything, error)
// * func(context.Context, anything) (anything, EventContext, error)
//
// CloudEvent contexts are available from the context.Context parameter
// CloudEvent data will be deserialized into the "anything" parameter.
// The library supports native decoding with both XML and JSON encoding.
// To accept another advanced type, pass an io.Reader as the input parameter.
//
// HTTP responses are generated based on the return value of fn:
// * any error return value will cause a StatusInternalServerError response
// * a function with no return type or a function returning nil will cause a StatusNoContent response
// * a function that returns a value will cause a StatusOK and render the response as JSON,
// with headers from an EventContext, if appropriate
func Handler(fn interface{}) http.Handler {
fnType := reflect.TypeOf(fn)
err := validateFunction(fnType)
if err != nil {
return err
}
var dataType reflect.Type
if fnType.NumIn() == 2 {
dataType = fnType.In(1)
}
return &handler{
numIn: fnType.NumIn(),
dataType: dataType,
fnValue: reflect.ValueOf(fn),
}
}
// ServeHTTP implements http.Handler
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
args := make([]reflect.Value, 0, 2)
if h.numIn > 0 {
dataPtr, dataArg := allocate(h.dataType)
eventContext, err := FromRequest(dataPtr, r)
if err != nil {
log.Printf("Failed to handle request %s; error %s", spew.Sdump(r), err)
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(`Invalid request`))
return
}
ctx := r.Context()
ctx = context.WithValue(ctx, contextKey, eventContext)
args = append(args, reflect.ValueOf(ctx))
if h.numIn == 2 {
args = append(args, dataArg)
}
}
res := h.fnValue.Call(args)
respondHTTP(res, h.fnValue, w)
}
func (h failedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Print("Failed to handle event: ", h.Error())
w.WriteHeader(http.StatusNotImplemented)
w.Write([]byte(`Internal server error`))
}
func (h failedHandler) Error() string {
return h.err.Error()
}
// Mux allows developers to handle logically related groups of
// functionality multiplexed based on the event type.
// TODO: Consider dropping Mux or figure out how to handle non-JSON encoding.
type Mux map[string]*handler
// NewMux creates a new Mux
func NewMux() Mux {
return make(map[string]*handler)
}
// Handle adds a new handler for a specific event type
// If the fn parameter is not a valid type, the endpoint will respond to all HTTP requests
// with that error. Valid types of fn are:
//
// * func()
// * func() error
// * func() (anything, error)
// * func(context.Context)
// * func(context.Context) error
// * func(context.Context) (anything, error)
// * func(context.Context, anything)
// * func(context.Context, anything) error
// * func(context.Context, anything) (anything, error)
//
// CloudEvent contexts are available from the context.Context parameter
// CloudEvent data will be deserialized into the "anything" parameter.
// The library supports native decoding with both XML and JSON encoding.
// To accept another advanced type, pass an io.Reader as the input parameter.
//
// HTTP responses are generated based on the return value of fn:
// * any error return value will cause a StatusInternalServerError response
// * a function with no return type or a function returning nil will cause a StatusNoContent response
// * a function that returns a value will cause a StatusOK and render the response as JSON
func (m Mux) Handle(eventType string, fn interface{}) error {
fnType := reflect.TypeOf(fn)
err := validateFunction(fnType)
if err != nil {
return err
}
var dataType reflect.Type
if fnType.NumIn() == 2 {
dataType = fnType.In(1)
}
m[eventType] = &handler{
numIn: fnType.NumIn(),
dataType: dataType,
fnValue: reflect.ValueOf(fn),
}
return nil
}
// ServeHTTP implements http.Handler
func (m Mux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var rawData io.Reader
eventContext, err := FromRequest(&rawData, r)
if err != nil {
log.Printf("Failed to handle request: %s %s", err, spew.Sdump(r))
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(`Invalid request`))
return
}
c := eventContext.AsV01()
h := m[c.EventType]
if h == nil {
log.Print("Cloud not find handler for event type", c.EventType)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf("Event type %q is not supported", c.EventType)))
return
}
args := make([]reflect.Value, 0, 2)
if h.numIn > 0 {
ctx := r.Context()
ctx = context.WithValue(ctx, contextKey, eventContext)
args = append(args, reflect.ValueOf(ctx))
}
if h.numIn == 2 {
dataPtr, dataArg := allocate(h.dataType)
if err := unmarshalEventData(c.ContentType, rawData, dataPtr); err != nil {
log.Print("Failed to parse event data", err)
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(`Invalid request`))
return
}
args = append(args, dataArg)
}
res := h.fnValue.Call(args)
respondHTTP(res, h.fnValue, w)
}

View File

@ -32,7 +32,6 @@ type fakeInformerGenerator struct {
generator.DefaultGen
outputPackage string
imports namer.ImportTracker
filtered bool
typeToGenerate *types.Type
groupVersion clientgentypes.GroupVersion

View File

@ -186,6 +186,14 @@ func (c *Impl) Enqueue(obj interface{}) {
c.EnqueueKey(types.NamespacedName{Namespace: object.GetNamespace(), Name: object.GetName()})
}
// EnqueueSentinel returns a Enqueue method which will always enqueue a
// predefined key instead of the object key.
func (c *Impl) EnqueueSentinel(k types.NamespacedName) func(interface{}) {
return func(interface{}) {
c.EnqueueKey(k)
}
}
// EnqueueControllerOf takes a resource, identifies its controller resource,
// converts it into a namespace/name string, and passes that to EnqueueKey.
func (c *Impl) EnqueueControllerOf(obj interface{}) {

View File

@ -174,15 +174,9 @@ func init() {
Aggregation: reconcileDistribution,
TagKeys: []tag.Key{reconcilerTagKey, keyTagKey, successTagKey},
}}
for _, view := range wp.DefaultViews() {
views = append(views, view)
}
for _, view := range rp.DefaultViews() {
views = append(views, view)
}
for _, view := range cp.DefaultViews() {
views = append(views, view)
}
views = append(views, wp.DefaultViews()...)
views = append(views, rp.DefaultViews()...)
views = append(views, cp.DefaultViews()...)
// Create views to see our measurements. This can return an error if
// a previously-registered view has the same name with a different value.

View File

@ -20,9 +20,9 @@ set -o pipefail
source $(dirname $0)/../vendor/knative.dev/test-infra/scripts/library.sh
CODEGEN_PKG=${CODEGEN_PKG:-$(cd ${REPO_ROOT_DIR}; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}
CODEGEN_PKG=${CODEGEN_PKG:-$(cd ${REPO_ROOT_DIR}; ls -d -1 $(dirname $0)/../vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}
go install ./vendor/k8s.io/code-generator/cmd/deepcopy-gen
go install $(dirname $0)/../vendor/k8s.io/code-generator/cmd/deepcopy-gen
# generate the code with:
# --output-base because this script should also be able to run inside the vendor dir of
@ -64,7 +64,7 @@ ${CODEGEN_PKG}/generate-groups.sh "deepcopy" \
# Depends on generate-groups.sh to install bin/deepcopy-gen
${GOPATH}/bin/deepcopy-gen --input-dirs \
knative.dev/pkg/apis,knative.dev/pkg/apis/v1alpha1,knative.dev/pkg/logging,knative.dev/pkg/testing \
knative.dev/pkg/apis,knative.dev/pkg/tracker,knative.dev/pkg/logging,knative.dev/pkg/metrics,knative.dev/pkg/testing \
-O zz_generated.deepcopy \
--go-header-file ${REPO_ROOT_DIR}/hack/boilerplate/boilerplate.go.txt

View File

@ -37,6 +37,7 @@ cp -aR \
"${REPO_ROOT_DIR}/Gopkg.lock" \
"${REPO_ROOT_DIR}/apis" \
"${REPO_ROOT_DIR}/logging" \
"${REPO_ROOT_DIR}/metrics" \
"${REPO_ROOT_DIR}/testing" \
"${TMP_DIFFROOT}"
@ -53,6 +54,9 @@ diff -Naupr --no-dereference \
diff -Naupr --no-dereference \
"${REPO_ROOT_DIR}/logging" "${TMP_DIFFROOT}/logging" || ret=1
diff -Naupr --no-dereference \
"${REPO_ROOT_DIR}/metrics" "${TMP_DIFFROOT}/metrics" || ret=1
diff -Naupr --no-dereference \
"${REPO_ROOT_DIR}/testing" "${TMP_DIFFROOT}/testing" || ret=1
@ -61,6 +65,7 @@ rm -fr \
"${REPO_ROOT_DIR}/Gopkg.lock" \
"${REPO_ROOT_DIR}/apis" \
"${REPO_ROOT_DIR}/logging" \
"${REPO_ROOT_DIR}/metrics" \
"${REPO_ROOT_DIR}/testing"
cp -aR "${TMP_DIFFROOT}"/* "${REPO_ROOT_DIR}"

View File

@ -38,7 +38,7 @@ const (
fallbackLoggerName = "fallback-logger"
)
var emptyLoggerConfigError = errors.New("empty logger configuration")
var errEmptyLoggerConfig = errors.New("empty logger configuration")
// NewLogger creates a logger with the supplied configuration.
// In addition to the logger, it returns AtomicLevel that can
@ -112,7 +112,7 @@ func newLoggerFromConfig(configJSON string, levelOverride string, opts []zap.Opt
func zapConfigFromJSON(configJSON string) (*zap.Config, error) {
if configJSON == "" {
return nil, emptyLoggerConfigError
return nil, errEmptyLoggerConfig
}
loggingCfg := &zap.Config{}
@ -206,7 +206,7 @@ func UpdateLevelFromConfigMap(logger *zap.SugaredLogger, atomicLevel zap.AtomicL
// reset to global level
loggingCfg, err := zapConfigFromJSON(config.LoggingConfig)
switch {
case err == emptyLoggerConfigError:
case err == errEmptyLoggerConfig:
level = zap.NewAtomicLevel().Level()
case err != nil:
logger.With(zap.Error(err)).Errorf("Failed to parse logger configuration. "+

View File

@ -17,6 +17,7 @@ limitations under the License.
package metrics
import (
"context"
"encoding/json"
"errors"
"fmt"
@ -27,11 +28,12 @@ import (
"time"
"go.uber.org/zap"
"go.opencensus.io/stats"
"knative.dev/pkg/metrics/metricskey"
)
const (
DomainEnv = "METRICS_DOMAIN"
ConfigMapNameEnv = "CONFIG_OBSERVABILITY_NAME"
DomainEnv = "METRICS_DOMAIN"
)
// metricsBackend specifies the backend to use for metrics
@ -46,11 +48,10 @@ const (
ReportingPeriodKey = "metrics.reporting-period-seconds"
StackdriverCustomMetricSubDomainKey = "metrics.stackdriver-custom-metrics-subdomain"
// Stackdriver client configuration keys
stackdriverProjectIDKey = "metrics.stackdriver-project-id"
stackdriverGCPLocationKey = "metrics.stackdriver-gcp-location"
stackdriverClusterNameKey = "metrics.stackdriver-cluster-name"
stackdriverGCPSecretNameKey = "metrics.stackdriver-gcp-secret-name"
stackdriverGCPSecretNamespaceKey = "metrics.stackdriver-gcp-secret-namespace"
StackdriverProjectIDKey = "metrics.stackdriver-project-id"
StackdriverGCPLocationKey = "metrics.stackdriver-gcp-location"
StackdriverClusterNameKey = "metrics.stackdriver-cluster-name"
StackdriverUseSecretKey = "metrics.stackdriver-use-secret"
// Stackdriver is used for Stackdriver backend
Stackdriver metricsBackend = "stackdriver"
@ -75,22 +76,16 @@ type metricsConfig struct {
// If duration is less than or equal to zero, it enables the default behavior.
reportingPeriod time.Duration
// recorder provides a hook for performing custom transformations before
// writing the metrics to the stats.RecordWithOptions interface.
recorder func(context.Context, stats.Measurement, ...stats.Options) error
// ---- Prometheus specific below ----
// prometheusPort is the port where metrics are exposed in Prometheus
// format. It defaults to 9090.
prometheusPort int
// ---- Stackdriver specific below ----
// allowStackdriverCustomMetrics indicates whether it is allowed to send metrics to
// Stackdriver using "global" resource type and custom metric type if the
// metrics are not supported by the registered monitored resource types. Setting this
// flag to "true" could cause extra Stackdriver charge.
// If backendDestination is not Stackdriver, this is ignored.
allowStackdriverCustomMetrics bool
// stackdriverCustomMetricsSubDomain is the subdomain to use when sending custom metrics to StackDriver.
// If not specified, the default is `knative.dev`.
// If backendDestination is not Stackdriver, this is ignored.
stackdriverCustomMetricsSubDomain string
// True if backendDestination equals to "stackdriver". Store this in a variable
// to reduce string comparison operations.
isStackdriverBackend bool
@ -103,11 +98,11 @@ type metricsConfig struct {
// Store this in a variable to reduce string join operations.
stackdriverCustomMetricTypePrefix string
// stackdriverClientConfig is the metadata to configure the metrics exporter's Stackdriver client.
stackdriverClientConfig stackdriverClientConfig
stackdriverClientConfig StackdriverClientConfig
}
// stackdriverClientConfig encapsulates the metadata required to configure a Stackdriver client.
type stackdriverClientConfig struct {
// StackdriverClientConfig encapsulates the metadata required to configure a Stackdriver client.
type StackdriverClientConfig struct {
// ProjectID is the stackdriver project ID to which data is uploaded.
// This is not necessarily the GCP project ID where the Kubernetes cluster is hosted.
// Required when the Kubernetes cluster is not hosted on GCE.
@ -119,27 +114,33 @@ type stackdriverClientConfig struct {
// ClusterName is the cluster name with which the data will be associated in Stackdriver.
// Required when the Kubernetes cluster is not hosted on GCE.
ClusterName string
// GCPSecretName is the optional GCP service account key which will be used to
// authenticate with Stackdriver. If not provided, Google Application Default Credentials
// UseSecret is whether the credentials stored in a Kubernetes Secret should be used to
// authenticate with Stackdriver. The Secret name and namespace can be specified by calling
// metrics.SetStackdriverSecretLocation.
// If UseSecret is false, Google Application Default Credentials
// will be used (https://cloud.google.com/docs/authentication/production).
GCPSecretName string
// GCPSecretNamespace is the Kubernetes namespace where GCPSecretName is located.
// The Kubernetes ServiceAccount used by the pod that is exporting data to
// Stackdriver should have access to Secrets in this namespace.
GCPSecretNamespace string
UseSecret bool
}
// newStackdriverClientConfigFromMap creates a stackdriverClientConfig from the given map
func newStackdriverClientConfigFromMap(config map[string]string) *stackdriverClientConfig {
return &stackdriverClientConfig{
ProjectID: config[stackdriverProjectIDKey],
GCPLocation: config[stackdriverGCPLocationKey],
ClusterName: config[stackdriverClusterNameKey],
GCPSecretName: config[stackdriverGCPSecretNameKey],
GCPSecretNamespace: config[stackdriverGCPSecretNamespaceKey],
// NewStackdriverClientConfigFromMap creates a stackdriverClientConfig from the given map
func NewStackdriverClientConfigFromMap(config map[string]string) *StackdriverClientConfig {
return &StackdriverClientConfig{
ProjectID: config[StackdriverProjectIDKey],
GCPLocation: config[StackdriverGCPLocationKey],
ClusterName: config[StackdriverClusterNameKey],
UseSecret: strings.EqualFold(config[StackdriverUseSecretKey], "true"),
}
}
// Record applies the `ros` Options to `ms` and then records the resulting
// measurements in the metricsConfig's designated backend.
func (mc *metricsConfig) Record(ctx context.Context, ms stats.Measurement, ros ...stats.Options) error {
if mc == nil || mc.recorder == nil {
return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(ms))...)
}
return mc.recorder(ctx, ms, ros...)
}
func createMetricsConfig(ops ExporterOptions, logger *zap.SugaredLogger) (*metricsConfig, error) {
var mc metricsConfig
@ -190,22 +191,37 @@ func createMetricsConfig(ops ExporterOptions, logger *zap.SugaredLogger) (*metri
// use the application default credentials. If that is not available, Opencensus would fail to create the
// metrics exporter.
if mc.backendDestination == Stackdriver {
scc := newStackdriverClientConfigFromMap(m)
scc := NewStackdriverClientConfigFromMap(m)
mc.stackdriverClientConfig = *scc
mc.isStackdriverBackend = true
var allowCustomMetrics bool
var err error
mc.stackdriverMetricTypePrefix = path.Join(mc.domain, mc.component)
mc.stackdriverCustomMetricsSubDomain = defaultCustomMetricSubDomain
if sdcmd, ok := m[StackdriverCustomMetricSubDomainKey]; ok && sdcmd != "" {
mc.stackdriverCustomMetricsSubDomain = sdcmd
customMetricsSubDomain := m[StackdriverCustomMetricSubDomainKey]
if customMetricsSubDomain == "" {
customMetricsSubDomain = defaultCustomMetricSubDomain
}
mc.stackdriverCustomMetricTypePrefix = path.Join(customMetricTypePrefix, mc.stackdriverCustomMetricsSubDomain, mc.component)
if ascmStr, ok := m[AllowStackdriverCustomMetricsKey]; ok && ascmStr != "" {
ascmBool, err := strconv.ParseBool(ascmStr)
mc.stackdriverCustomMetricTypePrefix = path.Join(customMetricTypePrefix, customMetricsSubDomain, mc.component)
if ascmStr := m[AllowStackdriverCustomMetricsKey]; ascmStr != "" {
allowCustomMetrics, err = strconv.ParseBool(ascmStr)
if err != nil {
return nil, fmt.Errorf("invalid %s value %q", AllowStackdriverCustomMetricsKey, ascmStr)
}
mc.allowStackdriverCustomMetrics = ascmBool
}
if !allowCustomMetrics {
servingOrEventing := metricskey.KnativeRevisionMetrics.Union(
metricskey.KnativeTriggerMetrics)
mc.recorder = func(ctx context.Context, ms stats.Measurement, ros... stats.Options) error {
metricType := path.Join(mc.stackdriverMetricTypePrefix, ms.Measure().Name())
if servingOrEventing.Has(metricType) {
return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(ms))...)
}
// Otherwise, skip (because it won't be accepted)
return nil
}
}
}
@ -231,15 +247,6 @@ func createMetricsConfig(ops ExporterOptions, logger *zap.SugaredLogger) (*metri
return &mc, nil
}
// ConfigMapName gets the name of the metrics ConfigMap
func ConfigMapName() string {
cm := os.Getenv(ConfigMapNameEnv)
if cm == "" {
return "config-observability"
}
return cm
}
// Domain holds the metrics domain to use for surfacing metrics.
func Domain() string {
if domain := os.Getenv(DomainEnv); domain != "" {

View File

@ -0,0 +1,103 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"os"
"strings"
texttemplate "text/template"
corev1 "k8s.io/api/core/v1"
)
const (
// The following is used to set the default log url template
DefaultLogURLTemplate = "http://localhost:8001/api/v1/namespaces/knative-monitoring/services/kibana-logging/proxy/app/kibana#/discover?_a=(query:(match:(kubernetes.labels.knative-dev%2FrevisionUID:(query:'${REVISION_UID}',type:phrase))))"
// The env var name for config-observability
ConfigMapNameEnv = "CONFIG_OBSERVABILITY_NAME"
)
// ObservabilityConfig contains the configuration defined in the observability ConfigMap.
// +k8s:deepcopy-gen=true
type ObservabilityConfig struct {
// EnableVarLogCollection specifies whether the logs under /var/log/ should be available
// for collection on the host node by the fluentd daemon set.
EnableVarLogCollection bool
// LoggingURLTemplate is a string containing the logging url template where
// the variable REVISION_UID will be replaced with the created revision's UID.
LoggingURLTemplate string
// RequestLogTemplate is the go template to use to shape the request logs.
RequestLogTemplate string
// EnableProbeRequestLog enables queue-proxy to write health check probe request logs.
EnableProbeRequestLog bool
// RequestMetricsBackend specifies the request metrics destination, e.g. Prometheus,
// Stackdriver.
RequestMetricsBackend string
// EnableProfiling indicates whether it is allowed to retrieve runtime profiling data from
// the pods via an HTTP server in the format expected by the pprof visualization tool.
EnableProfiling bool
}
// NewObservabilityConfigFromConfigMap creates a ObservabilityConfig from the supplied ConfigMap
func NewObservabilityConfigFromConfigMap(configMap *corev1.ConfigMap) (*ObservabilityConfig, error) {
oc := &ObservabilityConfig{}
if evlc, ok := configMap.Data["logging.enable-var-log-collection"]; ok {
oc.EnableVarLogCollection = strings.EqualFold(evlc, "true")
}
if rut, ok := configMap.Data["logging.revision-url-template"]; ok {
oc.LoggingURLTemplate = rut
} else {
oc.LoggingURLTemplate = DefaultLogURLTemplate
}
if rlt, ok := configMap.Data["logging.request-log-template"]; ok {
// Verify that we get valid templates.
if _, err := texttemplate.New("requestLog").Parse(rlt); err != nil {
return nil, err
}
oc.RequestLogTemplate = rlt
}
if eprl, ok := configMap.Data["logging.enable-probe-request-log"]; ok {
oc.EnableProbeRequestLog = strings.EqualFold(eprl, "true")
}
if mb, ok := configMap.Data["metrics.request-metrics-backend-destination"]; ok {
oc.RequestMetricsBackend = mb
}
if prof, ok := configMap.Data["profiling.enable"]; ok {
oc.EnableProfiling = strings.EqualFold(prof, "true")
}
return oc, nil
}
// ConfigMapName gets the name of the metrics ConfigMap
func ConfigMapName() string {
cm := os.Getenv(ConfigMapNameEnv)
if cm == "" {
return "config-observability"
}
return cm
}

View File

@ -36,7 +36,10 @@ const (
// LabelResourceGroup is the name of the resource CRD.
LabelResourceGroup = "resource_group"
// LabelBrokerName is the label for the name of the Trigger's broker.
// LabelTriggerName is the label for the name of the Trigger.
LabelTriggerName = "trigger_name"
// LabelBrokerName is the label for the name of the Broker.
LabelBrokerName = "broker_name"
// LabelEventType is the label for the name of the event type.
@ -47,9 +50,6 @@ const (
// LabelFilterType is the label for the Trigger filter attribute "type".
LabelFilterType = "filter_type"
// LabelFilterSource is the label for the Trigger filter attribute "source".
LabelFilterSource = "filter_source"
)
var (
@ -59,16 +59,16 @@ var (
LabelLocation,
LabelClusterName,
LabelNamespaceName,
LabelName,
LabelBrokerName,
LabelTriggerName,
)
// KnativeTriggerMetrics stores a set of metric types which are supported
// by resource type knative_trigger.
KnativeTriggerMetrics = sets.NewString(
"knative.dev/eventing/trigger/event_count",
"knative.dev/eventing/trigger/event_processing_latencies",
"knative.dev/eventing/trigger/event_dispatch_latencies",
"knative.dev/internal/eventing/trigger/event_count",
"knative.dev/internal/eventing/trigger/event_processing_latencies",
"knative.dev/internal/eventing/trigger/event_dispatch_latencies",
)
// KnativeBrokerLabels stores the set of resource labels for resource type knative_broker.
@ -77,13 +77,13 @@ var (
LabelLocation,
LabelClusterName,
LabelNamespaceName,
LabelName,
LabelBrokerName,
)
// KnativeBrokerMetrics stores a set of metric types which are supported
// by resource type knative_trigger.
KnativeBrokerMetrics = sets.NewString(
"knative.dev/eventing/broker/event_count",
"knative.dev/internal/eventing/broker/event_count",
)
// KnativeSourceLabels stores the set of resource labels for resource type knative_source.

View File

@ -143,6 +143,7 @@ func checkExactlyOneRow(t *testing.T, name string) *view.Row {
}
if len(d) != 1 {
t.Errorf("For metric %s: Reporter.Report() len(d)=%v, want 1", name, len(d))
return nil
}
return d[0]

View File

@ -59,8 +59,8 @@ func (kt *KnativeTrigger) MonitoredResource() (resType string, labels map[string
metricskey.LabelLocation: kt.Location,
metricskey.LabelClusterName: kt.ClusterName,
metricskey.LabelNamespaceName: kt.NamespaceName,
metricskey.LabelName: kt.TriggerName,
metricskey.LabelBrokerName: kt.BrokerName,
metricskey.LabelTriggerName: kt.TriggerName,
}
return metricskey.ResourceTypeKnativeTrigger, labels
}
@ -71,7 +71,7 @@ func (kb *KnativeBroker) MonitoredResource() (resType string, labels map[string]
metricskey.LabelLocation: kb.Location,
metricskey.LabelClusterName: kb.ClusterName,
metricskey.LabelNamespaceName: kb.NamespaceName,
metricskey.LabelName: kb.BrokerName,
metricskey.LabelBrokerName: kb.BrokerName,
}
return metricskey.ResourceTypeKnativeBroker, labels
}
@ -98,7 +98,7 @@ func GetKnativeBrokerMonitoredResource(
ClusterName: gm.cluster,
// The rest resource labels are from metrics labels.
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap),
BrokerName: valueOrUnknown(metricskey.LabelName, tagsMap),
BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tagsMap),
}
var newTags []tag.Tag
@ -122,8 +122,8 @@ func GetKnativeTriggerMonitoredResource(
ClusterName: gm.cluster,
// The rest resource labels are from metrics labels.
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap),
TriggerName: valueOrUnknown(metricskey.LabelName, tagsMap),
BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tagsMap),
TriggerName: valueOrUnknown(metricskey.LabelTriggerName, tagsMap),
}
var newTags []tag.Tag

View File

@ -18,52 +18,18 @@ package metrics
import (
"context"
"path"
"go.opencensus.io/stats"
"knative.dev/pkg/metrics/metricskey"
)
// TODO should be properly refactored and pieces should move to eventing and serving, as appropriate.
// See https://github.com/knative/pkg/issues/608
// Record decides whether to record one measurement via OpenCensus based on the
// following conditions:
// 1) No package level metrics config. In this case it just proxies to OpenCensus
// based on the assumption that users expect the metrics to be recorded when
// they call this function. Users must ensure metrics config are set before
// using this function to get expected behavior.
// 2) The backend is not Stackdriver.
// 3) The backend is Stackdriver and it is allowed to use custom metrics.
// 4) The backend is Stackdriver and the metric is one of the built-in metrics: "knative_revision", "knative_broker",
// "knative_trigger", "knative_source".
// Record stores the given Measurement from `ms` in the current metrics backend.
func Record(ctx context.Context, ms stats.Measurement, ros ...stats.Options) {
mc := getCurMetricsConfig()
ros = append(ros, stats.WithMeasurements(ms))
// Condition 1)
if mc == nil {
stats.RecordWithOptions(ctx, ros...)
return
}
// Condition 2) and 3)
if !mc.isStackdriverBackend || mc.allowStackdriverCustomMetrics {
stats.RecordWithOptions(ctx, ros...)
return
}
// Condition 4)
metricType := path.Join(mc.stackdriverMetricTypePrefix, ms.Measure().Name())
isServingBuiltIn := metricskey.KnativeRevisionMetrics.Has(metricType)
isEventingBuiltIn := metricskey.KnativeTriggerMetrics.Has(metricType) ||
metricskey.KnativeBrokerMetrics.Has(metricType) ||
metricskey.KnativeSourceMetrics.Has(metricType)
if isServingBuiltIn || isEventingBuiltIn {
stats.RecordWithOptions(ctx, ros...)
}
mc.Record(ctx, ms, ros...)
}
// Buckets125 generates an array of buckets with approximate powers-of-two

View File

@ -40,22 +40,63 @@ const (
// defaultCustomMetricSubDomain is the default subdomain to use for unsupported metrics by monitored resource types.
// See: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.metricDescriptors#MetricDescriptor
defaultCustomMetricSubDomain = "knative.dev"
// secretNamespaceDefault is the namespace to search for a k8s Secret to pass to Stackdriver client to authenticate with Stackdriver.
secretNamespaceDefault = "default"
// StackdriverSecretNamespaceDefault is the default namespace to search for a k8s Secret to pass to Stackdriver client to authenticate with Stackdriver.
StackdriverSecretNamespaceDefault = "default"
// StackdriverSecretNameDefault is the default name of the k8s Secret to pass to Stackdriver client to authenticate with Stackdriver.
StackdriverSecretNameDefault = "stackdriver-service-account-key"
// secretDataFieldKey is the name of the k8s Secret field that contains the Secret's key.
secretDataFieldKey = "key.json"
)
var (
// gcpMetadataFunc is the function used to fetch GCP metadata.
// In product usage, this is always set to function retrieveGCPMetadata.
// In unit tests this is set to a fake one to avoid calling GCP metadata
// service.
gcpMetadataFunc func() *gcpMetadata
// newStackdriverExporterFunc is the function used to create new stackdriver
// exporter.
// In product usage, this is always set to function newOpencensusSDExporter.
// In unit tests this is set to a fake one to avoid calling actual Google API
// service.
newStackdriverExporterFunc func(stackdriver.Options) (view.Exporter, error)
// kubeclient is the in-cluster Kubernetes kubeclient, which is lazy-initialized on first use.
kubeclient *kubernetes.Clientset
// initClientOnce is the lazy initializer for kubeclient.
initClientOnce sync.Once
// kubeclientInitErr capture an error during initClientOnce
kubeclientInitErr error
// stackdriverMtx protects setting secretNamespace and secretName and useStackdriverSecretEnabled
stackdriverMtx sync.RWMutex
// secretName is the name of the k8s Secret to pass to Stackdriver client to authenticate with Stackdriver.
secretName = StackdriverSecretNameDefault
// secretNamespace is the namespace to search for a k8s Secret to pass to Stackdriver client to authenticate with Stackdriver.
secretNamespace = StackdriverSecretNamespaceDefault
// useStackdriverSecretEnabled specifies whether or not the exporter can be configured with a Secret.
// Consuming packages must do explicitly enable this by calling SetStackdriverSecretLocation.
useStackdriverSecretEnabled = false
)
// SetStackdriverSecretLocation sets the name and namespace of the Secret that can be used to authenticate with Stackdriver.
// The Secret is only used if both:
// 1. This function has been explicitly called to set the name and namespace
// 2. Users set metricsConfig.stackdriverClientConfig.UseSecret to "true"
func SetStackdriverSecretLocation(name string, namespace string) {
stackdriverMtx.Lock()
defer stackdriverMtx.Unlock()
secretName = name
secretNamespace = namespace
useStackdriverSecretEnabled = true
}
func init() {
// Set gcpMetadataFunc to call GCP metadata service.
gcpMetadataFunc = retrieveGCPMetadata
newStackdriverExporterFunc = newOpencensusSDExporter
kubeclientInitErr = nil
}
@ -73,7 +114,7 @@ func newStackdriverExporter(config *metricsConfig, logger *zap.SugaredLogger) (v
logger.Warnw("Issue configuring Stackdriver exporter client options, no additional client options will be used: ", zap.Error(err))
}
// Automatically fall back on Google application default credentials
e, err := newOpencensusSDExporter(stackdriver.Options{
e, err := newStackdriverExporterFunc(stackdriver.Options{
ProjectID: gm.project,
Location: gm.location,
MonitoringClientOptions: co,
@ -93,15 +134,19 @@ func newStackdriverExporter(config *metricsConfig, logger *zap.SugaredLogger) (v
// getStackdriverExporterClientOptions creates client options for the opencensus Stackdriver exporter from the given stackdriverClientConfig.
// On error, an empty array of client options is returned.
func getStackdriverExporterClientOptions(sdconfig *stackdriverClientConfig) ([]option.ClientOption, error) {
func getStackdriverExporterClientOptions(sdconfig *StackdriverClientConfig) ([]option.ClientOption, error) {
var co []option.ClientOption
if sdconfig.GCPSecretName != "" {
if sdconfig.UseSecret && useStackdriverSecretEnabled {
secret, err := getStackdriverSecret(sdconfig)
if err != nil {
return co, err
}
co = append(co, convertSecretToExporterOption(secret))
if opt, err := convertSecretToExporterOption(secret); err == nil {
co = append(co, opt)
} else {
return co, err
}
}
return co, nil
@ -111,7 +156,7 @@ func getStackdriverExporterClientOptions(sdconfig *stackdriverClientConfig) ([]o
// to Stackdriver. Values can come from the GCE metadata server or the config.
// Values explicitly set in the config take the highest precedent.
func getMergedGCPMetadata(config *metricsConfig) *gcpMetadata {
gm := retrieveGCPMetadata()
gm := gcpMetadataFunc()
if config.stackdriverClientConfig.ProjectID != "" {
gm.project = config.stackdriverClientConfig.ProjectID
}
@ -164,28 +209,30 @@ func getMetricTypeFunc(metricTypePrefix, customMetricTypePrefix string) func(vie
}
// getStackdriverSecret returns the Kubernetes Secret specified in the given config.
func getStackdriverSecret(sdconfig *stackdriverClientConfig) (*corev1.Secret, error) {
// TODO(anniefu): Update exporter if Secret changes (https://github.com/knative/pkg/issues/842)
func getStackdriverSecret(sdconfig *StackdriverClientConfig) (*corev1.Secret, error) {
if err := ensureKubeclient(); err != nil {
return nil, err
}
ns := sdconfig.GCPSecretNamespace
if ns == "" {
ns = secretNamespaceDefault
}
stackdriverMtx.RLock()
defer stackdriverMtx.RUnlock()
sec, secErr := kubeclient.CoreV1().Secrets(ns).Get(sdconfig.GCPSecretName, metav1.GetOptions{})
sec, secErr := kubeclient.CoreV1().Secrets(secretNamespace).Get(secretName, metav1.GetOptions{})
if secErr != nil {
return nil, fmt.Errorf("Error getting Secret [%v] in namespace [%v]: %v", sdconfig.GCPSecretName, sdconfig.GCPSecretNamespace, secErr)
return nil, fmt.Errorf("Error getting Secret [%v] in namespace [%v]: %v", secretName, secretNamespace, secErr)
}
return sec, nil
}
// convertSecretToExporterOption converts a Kubernetes Secret to an OpenCensus Stackdriver Exporter Option.
func convertSecretToExporterOption(secret *corev1.Secret) option.ClientOption {
return option.WithCredentialsJSON(secret.Data[secretDataFieldKey])
func convertSecretToExporterOption(secret *corev1.Secret) (option.ClientOption, error) {
if data, ok := secret.Data[secretDataFieldKey]; ok {
return option.WithCredentialsJSON(data), nil
}
return nil, fmt.Errorf("Expected Secret to store key in data field named [%v]", secretDataFieldKey)
}
// ensureKubeclient is the lazy initializer for kubeclient.

View File

@ -0,0 +1,37 @@
// +build !ignore_autogenerated
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by deepcopy-gen. DO NOT EDIT.
package metrics
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ObservabilityConfig) DeepCopyInto(out *ObservabilityConfig) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ObservabilityConfig.
func (in *ObservabilityConfig) DeepCopy() *ObservabilityConfig {
if in == nil {
return nil
}
out := new(ObservabilityConfig)
in.DeepCopyInto(out)
return out
}

View File

@ -76,8 +76,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
func readProfilingFlag(configMap *corev1.ConfigMap) (bool, error) {
profiling, ok := configMap.Data[profilingKey]
func ReadProfilingFlag(config map[string]string) (bool, error) {
profiling, ok := config[profilingKey]
if !ok {
return false, nil
}
@ -91,7 +91,7 @@ func readProfilingFlag(configMap *corev1.ConfigMap) (bool, error) {
// UpdateFromConfigMap modifies the Enabled flag in the Handler
// according to the value in the given ConfigMap
func (h *Handler) UpdateFromConfigMap(configMap *corev1.ConfigMap) {
enabled, err := readProfilingFlag(configMap)
enabled, err := ReadProfilingFlag(configMap.Data)
if err != nil {
h.log.Errorw("Failed to update the profiling flag", zap.Error(err))
return

View File

@ -89,5 +89,5 @@ func (o *ObjectSorter) IndexerForObjectType(obj runtime.Object) cache.Indexer {
}
func emptyIndexer() cache.Indexer {
return cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
return cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
}

View File

@ -20,7 +20,6 @@ import (
"sync"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/tracker"
)
@ -32,7 +31,7 @@ type NullTracker = FakeTracker
// FakeTracker implements Tracker.
type FakeTracker struct {
sync.Mutex
references []corev1.ObjectReference
references []tracker.Reference
}
var _ tracker.Interface = (*FakeTracker)(nil)
@ -40,8 +39,18 @@ var _ tracker.Interface = (*FakeTracker)(nil)
// OnChanged implements OnChanged.
func (*FakeTracker) OnChanged(interface{}) {}
// Track implements Track.
// Track implements tracker.Interface.
func (n *FakeTracker) Track(ref corev1.ObjectReference, obj interface{}) error {
return n.TrackReference(tracker.Reference{
APIVersion: ref.APIVersion,
Kind: ref.Kind,
Namespace: ref.Namespace,
Name: ref.Name,
}, obj)
}
// TrackReference implements tracker.Interface.
func (n *FakeTracker) TrackReference(ref tracker.Reference, obj interface{}) error {
n.Lock()
defer n.Unlock()
@ -50,7 +59,7 @@ func (n *FakeTracker) Track(ref corev1.ObjectReference, obj interface{}) error {
}
// References returns the list of objects being tracked
func (n *FakeTracker) References() []corev1.ObjectReference {
func (n *FakeTracker) References() []tracker.Reference {
n.Lock()
defer n.Unlock()

View File

@ -28,7 +28,6 @@ import (
"knative.dev/pkg/apis"
pkgapisduck "knative.dev/pkg/apis/duck"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
apisv1alpha1 "knative.dev/pkg/apis/v1alpha1"
"knative.dev/pkg/controller"
"knative.dev/pkg/network"
"knative.dev/pkg/tracker"
@ -63,7 +62,7 @@ func NewURIResolver(ctx context.Context, callback func(types.NamespacedName)) *U
}
// URIFromDestination resolves a Destination into a URI string.
func (r *URIResolver) URIFromDestination(dest apisv1alpha1.Destination, parent interface{}) (string, error) {
func (r *URIResolver) URIFromDestination(dest duckv1beta1.Destination, parent interface{}) (string, error) {
var deprecatedObjectReference *corev1.ObjectReference
if dest.DeprecatedAPIVersion == "" && dest.DeprecatedKind == "" && dest.DeprecatedName == "" && dest.DeprecatedNamespace == "" {
deprecatedObjectReference = nil

View File

@ -214,8 +214,8 @@ func (fgc *FakeGithubClient) ListPullRequests(org, repo, head, base string) ([]*
}
for _, PR := range PRs {
// Filter with consistent logic of CreatePullRequest function below
if ("" == head || *PR.Head.Label == head) &&
("" == base || *PR.Base.Ref == base) {
if (head == "" || head == *PR.Head.Label) &&
(base == "" || base == *PR.Base.Ref) {
res = append(res, PR)
}
}
@ -304,7 +304,7 @@ func (fgc *FakeGithubClient) CreatePullRequest(org, repo, head, base, title, bod
State: &stateStr,
Number: &PRNumber,
}
if "" != head {
if head != "" {
tokens := strings.Split(head, ":")
if len(tokens) != 2 {
return nil, fmt.Errorf("invalid head, want: 'user:ref', got: '%s'", head)
@ -314,7 +314,7 @@ func (fgc *FakeGithubClient) CreatePullRequest(org, repo, head, base, title, bod
Ref: &tokens[1],
}
}
if "" != base {
if base != "" {
l := fmt.Sprintf("%s:%s", repo, base)
PR.Base = &github.PullRequestBranch{
Label: &l,
@ -349,7 +349,7 @@ func (fgc *FakeGithubClient) AddFileToCommit(org, repo, SHA, filename, patch str
Patch: &patch,
}
if _, ok := fgc.CommitFiles[SHA]; !ok {
fgc.CommitFiles[SHA] = make([]*github.CommitFile, 0, 0)
fgc.CommitFiles[SHA] = make([]*github.CommitFile, 0)
}
fgc.CommitFiles[SHA] = append(fgc.CommitFiles[SHA], f)
return nil
@ -366,7 +366,7 @@ func (fgc *FakeGithubClient) AddCommitToPullRequest(org, repo string, ID int, SH
return fmt.Errorf("Pull Request %d not exist", ID)
}
if _, ok = fgc.PRCommits[ID]; !ok {
fgc.PRCommits[ID] = make([]*github.RepositoryCommit, 0, 0)
fgc.PRCommits[ID] = make([]*github.RepositoryCommit, 0)
}
fgc.PRCommits[ID] = append(fgc.PRCommits[ID], &github.RepositoryCommit{SHA: &SHA})
return nil

View File

@ -20,9 +20,9 @@ import (
"fmt"
container "google.golang.org/api/container/v1beta1"
option "google.golang.org/api/option"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
)
// SDKOperations wraps GKE SDK related functions
@ -42,14 +42,8 @@ type sdkClient struct {
}
// NewSDKClient returns an SDKClient that implements SDKOperations
func NewSDKClient() (SDKOperations, error) {
ctx := context.Background()
c, err := google.DefaultClient(ctx, container.CloudPlatformScope)
if err != nil {
return nil, fmt.Errorf("failed to create Google client: '%v'", err)
}
containerService, err := container.New(c)
func NewSDKClient(opts ...option.ClientOption) (SDKOperations, error) {
containerService, err := container.NewService(context.Background(), opts...)
if err != nil {
return nil, fmt.Errorf("failed to create container service: '%v'", err)
}

View File

@ -43,13 +43,14 @@ func Wait(gsc SDKOperations, project, region, zone, opName string, wait time.Dur
var err error
timeout := time.After(wait)
tick := time.Tick(500 * time.Millisecond)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
// Got a timeout! fail with a timeout error
case <-timeout:
return errors.New("timed out waiting")
case <-tick:
case <-ticker.C:
// Retry 3 times in case of weird network error, or rate limiting
for r, w := 0, 50*time.Microsecond; r < 3; r, w = r+1, w*2 {
op, err = gsc.GetOperation(project, region, zone, opName)

View File

@ -26,7 +26,7 @@ func Run(message string, call func() error, dryrun bool) error {
log.Printf("[dry run] %s", message)
return nil
}
log.Printf(message)
log.Print(message)
return call()
}

View File

@ -177,7 +177,7 @@ func (gih *IssueHandler) CloseIssueForTest(testName string) error {
return nil
}
// If the issue is still active, do not close it.
if time.Now().Sub(issue.GetUpdatedAt()) < daysConsideredActive*24*time.Hour {
if time.Since(issue.GetUpdatedAt()) < daysConsideredActive*24*time.Hour {
return nil
}
@ -233,7 +233,7 @@ func (gih *IssueHandler) findIssue(title string) (*github.Issue, error) {
if *issue.Title == title {
// If the issue has been closed a long time ago, ignore this issue.
if issue.GetState() == string(ghutil.IssueCloseState) &&
time.Now().Sub(*issue.UpdatedAt) > daysConsideredOld*24*time.Hour {
time.Since(*issue.UpdatedAt) > daysConsideredOld*24*time.Hour {
continue
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package resourcetree
//test_util contains types defined and used by types and their corresponding verification methods.
//lint:file-ignore U1000 Ignore all unused code, it's needed
import (
"container/list"

View File

@ -33,7 +33,7 @@ import (
"knative.dev/pkg/test/webhook-apicoverage/coveragecalculator"
"knative.dev/pkg/test/webhook-apicoverage/resourcetree"
"knative.dev/pkg/test/webhook-apicoverage/view"
"knative.dev/pkg/webhook"
"knative.dev/pkg/webhook/resourcesemantics"
)
var (
@ -67,7 +67,7 @@ type resourceChannelMsg struct {
type APICoverageRecorder struct {
Logger *zap.SugaredLogger
ResourceForest resourcetree.ResourceForest
ResourceMap map[schema.GroupVersionKind]webhook.GenericCRD
ResourceMap map[schema.GroupVersionKind]resourcesemantics.GenericCRD
NodeRules resourcetree.NodeRules
FieldRules resourcetree.FieldRules
DisplayRules view.DisplayRules

View File

@ -36,8 +36,8 @@ import (
"k8s.io/client-go/rest"
"knative.dev/pkg/configmap"
"knative.dev/pkg/logging"
"knative.dev/pkg/webhook"
certresources "knative.dev/pkg/webhook/certificates/resources"
"knative.dev/pkg/webhook/resourcesemantics"
)
var (
@ -152,7 +152,7 @@ func (acw *APICoverageWebhook) registerWebhook(rules []admissionregistrationv1be
return nil
}
func (acw *APICoverageWebhook) getValidationRules(resources map[schema.GroupVersionKind]webhook.GenericCRD) []admissionregistrationv1beta1.RuleWithOperations {
func (acw *APICoverageWebhook) getValidationRules(resources map[schema.GroupVersionKind]resourcesemantics.GenericCRD) []admissionregistrationv1beta1.RuleWithOperations {
var rules []admissionregistrationv1beta1.RuleWithOperations
for gvk := range resources {
plural := strings.ToLower(inflect.Pluralize(gvk.Kind))
@ -173,7 +173,7 @@ func (acw *APICoverageWebhook) getValidationRules(resources map[schema.GroupVers
}
// SetupWebhook sets up the webhook with the provided http.handler, resourcegroup Map, namespace and stop channel.
func (acw *APICoverageWebhook) SetupWebhook(handler http.Handler, resources map[schema.GroupVersionKind]webhook.GenericCRD, namespace string, stop <-chan struct{}) error {
func (acw *APICoverageWebhook) SetupWebhook(handler http.Handler, resources map[schema.GroupVersionKind]resourcesemantics.GenericCRD, namespace string, stop <-chan struct{}) error {
server, err := acw.getWebhookServer(handler)
rules := acw.getValidationRules(resources)
if err != nil {

View File

@ -234,7 +234,7 @@ func (gc *GKECluster) Acquire() error {
if i != len(regions)-1 {
errMsg = fmt.Sprintf("%sRetry another region %q for cluster creation", errMsg, regions[i+1])
}
log.Printf(errMsg)
log.Print(errMsg)
} else {
log.Print("Cluster creation completed")
gc.Cluster = cluster

View File

@ -14,6 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// +k8s:deepcopy-gen=package
// Package tracker defines a utility to enable Reconcilers to trigger
// reconciliations when objects that are cross-referenced change, so
// that the level-based reconciliation can react to the change. The

View File

@ -24,6 +24,8 @@ import (
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
@ -47,9 +49,12 @@ func New(callback func(types.NamespacedName), lease time.Duration) Interface {
type impl struct {
m sync.Mutex
// mapping maps from an object reference to the set of
// exact maps from an object reference to the set of
// keys for objects watching it.
mapping map[corev1.ObjectReference]set
exact map[Reference]set
// inexact maps from a partial object reference (no name/selector) to
// a map from watcher keys to the compiled selector and expiry.
inexact map[Reference]matchers
// The amount of time that an object may watch another
// before having to renew the lease.
@ -64,15 +69,50 @@ var _ Interface = (*impl)(nil)
// set is a map from keys to expirations
type set map[types.NamespacedName]time.Time
// matchers maps the tracker's key to the matcher.
type matchers map[types.NamespacedName]matcher
// matcher holds the selector and expiry for matching tracked objects.
type matcher struct {
// The selector to complete the match.
selector labels.Selector
// When this lease expires.
expiry time.Time
}
// Track implements Interface.
func (i *impl) Track(ref corev1.ObjectReference, obj interface{}) error {
return i.TrackReference(Reference{
APIVersion: ref.APIVersion,
Kind: ref.Kind,
Namespace: ref.Namespace,
Name: ref.Name,
}, obj)
}
func (i *impl) TrackReference(ref Reference, obj interface{}) error {
invalidFields := map[string][]string{
"APIVersion": validation.IsQualifiedName(ref.APIVersion),
"Kind": validation.IsCIdentifier(ref.Kind),
"Namespace": validation.IsDNS1123Label(ref.Namespace),
"Name": validation.IsDNS1123Subdomain(ref.Name),
}
var selector labels.Selector
fieldErrors := []string{}
switch {
case ref.Selector != nil && ref.Name != "":
fieldErrors = append(fieldErrors, "cannot provide both Name and Selector")
case ref.Name != "":
invalidFields["Name"] = validation.IsDNS1123Subdomain(ref.Name)
case ref.Selector != nil:
ls, err := metav1.LabelSelectorAsSelector(ref.Selector)
if err != nil {
invalidFields["Selector"] = []string{err.Error()}
}
selector = ls
default:
fieldErrors = append(fieldErrors, "must provide either Name or Selector")
}
for k, v := range invalidFields {
for _, msg := range v {
fieldErrors = append(fieldErrors, fmt.Sprintf("%s: %s", k, msg))
@ -80,27 +120,66 @@ func (i *impl) Track(ref corev1.ObjectReference, obj interface{}) error {
}
if len(fieldErrors) > 0 {
sort.Strings(fieldErrors)
return fmt.Errorf("invalid ObjectReference:\n%s", strings.Join(fieldErrors, "\n"))
return fmt.Errorf("invalid Reference:\n%s", strings.Join(fieldErrors, "\n"))
}
// Determine the key of the object tracking this reference.
object, err := kmeta.DeletionHandlingAccessor(obj)
if err != nil {
return err
}
key := types.NamespacedName{Namespace: object.GetNamespace(), Name: object.GetName()}
i.m.Lock()
defer i.m.Unlock()
if i.mapping == nil {
i.mapping = make(map[corev1.ObjectReference]set)
if i.exact == nil {
i.exact = make(map[Reference]set)
}
if i.inexact == nil {
i.inexact = make(map[Reference]matchers)
}
l, ok := i.mapping[ref]
if !ok {
l = set{}
// If the reference uses Name then it is an exact match.
if selector == nil {
l, ok := i.exact[ref]
if !ok {
l = set{}
}
if expiry, ok := l[key]; !ok || isExpired(expiry) {
// When covering an uncovered key, immediately call the
// registered callback to ensure that the following pattern
// doesn't create problems:
// foo, err := lister.Get(key)
// // Later...
// err := tracker.Track(fooRef, parent)
// In this example, "Later" represents a window where "foo" may
// have changed or been created while the Track is not active.
// The simplest way of eliminating such a window is to call the
// callback to "catch up" immediately following new
// registrations.
i.cb(key)
}
// Overwrite the key with a new expiration.
l[key] = time.Now().Add(i.leaseDuration)
i.exact[ref] = l
return nil
}
if expiry, ok := l[key]; !ok || isExpired(expiry) {
// Otherwise, it is an inexact match by selector.
partialRef := Reference{
APIVersion: ref.APIVersion,
Kind: ref.Kind,
Namespace: ref.Namespace,
// Exclude the selector.
}
l, ok := i.inexact[partialRef]
if !ok {
l = matchers{}
}
if m, ok := l[key]; !ok || isExpired(m.expiry) {
// When covering an uncovered key, immediately call the
// registered callback to ensure that the following pattern
// doesn't create problems:
@ -115,9 +194,12 @@ func (i *impl) Track(ref corev1.ObjectReference, obj interface{}) error {
i.cb(key)
}
// Overwrite the key with a new expiration.
l[key] = time.Now().Add(i.leaseDuration)
l[key] = matcher{
selector: selector,
expiry: time.Now().Add(i.leaseDuration),
}
i.mapping[ref] = l
i.inexact[partialRef] = l
return nil
}
@ -129,32 +211,53 @@ func isExpired(expiry time.Time) bool {
func (i *impl) OnChanged(obj interface{}) {
item, err := kmeta.DeletionHandlingAccessor(obj)
if err != nil {
// TODO(mattmoor): We should consider logging here.
return
}
or := kmeta.ObjectReference(item)
ref := Reference{
APIVersion: or.APIVersion,
Kind: or.Kind,
Namespace: or.Namespace,
Name: or.Name,
}
// TODO(mattmoor): Consider locking the mapping (global) for a
// smaller scope and leveraging a per-set lock to guard its access.
i.m.Lock()
defer i.m.Unlock()
s, ok := i.mapping[or]
if !ok {
// TODO(mattmoor): We should consider logging here.
return
}
for key, expiry := range s {
// If the expiration has lapsed, then delete the key.
if isExpired(expiry) {
delete(s, key)
continue
// Handle exact matches.
s, ok := i.exact[ref]
if ok {
for key, expiry := range s {
// If the expiration has lapsed, then delete the key.
if isExpired(expiry) {
delete(s, key)
continue
}
i.cb(key)
}
if len(s) == 0 {
delete(i.exact, ref)
}
i.cb(key)
}
if len(s) == 0 {
delete(i.mapping, or)
// Handle inexact matches.
ref.Name = ""
ms, ok := i.inexact[ref]
if ok {
ls := labels.Set(item.GetLabels())
for key, m := range ms {
// If the expiration has lapsed, then delete the key.
if isExpired(m.expiry) {
delete(ms, key)
continue
}
if m.selector.Matches(ls) {
i.cb(key)
}
}
if len(s) == 0 {
delete(i.exact, ref)
}
}
}

View File

@ -17,17 +17,154 @@ limitations under the License.
package tracker
import (
"context"
"strings"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/validation"
"knative.dev/pkg/apis"
)
// Reference is modeled after corev1.ObjectReference, but omits fields
// unsupported by the tracker, and permits us to extend things in
// divergent ways.
type Reference struct {
// API version of the referent.
// +optional
APIVersion string `json:"apiVersion,omitempty"`
// Kind of the referent.
// +optional
Kind string `json:"kind,omitempty"`
// Namespace of the referent.
// +optional
Namespace string `json:"namespace,omitempty"`
// Name of the referent.
// Mutually exclusive with Selector.
// +optional
Name string `json:"name,omitempty"`
// Selector of the referents.
// Mutually exclusive with Name.
// +optional
Selector *metav1.LabelSelector `json:"selector,omitempty"`
}
// Interface defines the interface through which an object can register
// that it is tracking another object by reference.
type Interface interface {
// Track tells us that "obj" is tracking changes to the
// referenced object.
// DEPRECATED: use TrackReference
Track(ref corev1.ObjectReference, obj interface{}) error
// Track tells us that "obj" is tracking changes to the
// referenced object.
TrackReference(ref Reference, obj interface{}) error
// OnChanged is a callback to register with the InformerFactory
// so that we are notified for appropriate object changes.
OnChanged(obj interface{})
}
// GroupVersionKind returns the GroupVersion of the object referenced.
func (ref *Reference) GroupVersionKind() schema.GroupVersionKind {
gv, _ := schema.ParseGroupVersion(ref.APIVersion)
return schema.GroupVersionKind{
Group: gv.Group,
Version: gv.Version,
Kind: ref.Kind,
}
}
// ObjectReference returns the tracker Reference as an ObjectReference.
func (ref *Reference) ObjectReference() corev1.ObjectReference {
return corev1.ObjectReference{
APIVersion: ref.APIVersion,
Kind: ref.Kind,
Namespace: ref.Namespace,
Name: ref.Name,
}
}
// ValidateObjectReference validates that the Reference uses a subset suitable for
// translation to a corev1.ObjectReference. This helper is intended to simplify
// validating a particular (narrow) use of tracker.Reference.
func (ref *Reference) ValidateObjectReference(ctx context.Context) *apis.FieldError {
var errs *apis.FieldError
// Required fields
if ref.APIVersion == "" {
errs = errs.Also(apis.ErrMissingField("apiVersion"))
} else if verrs := validation.IsQualifiedName(ref.APIVersion); len(verrs) != 0 {
errs = errs.Also(apis.ErrInvalidValue(strings.Join(verrs, ", "), "apiVersion"))
}
if ref.Kind == "" {
errs = errs.Also(apis.ErrMissingField("kind"))
} else if verrs := validation.IsCIdentifier(ref.Kind); len(verrs) != 0 {
errs = errs.Also(apis.ErrInvalidValue(strings.Join(verrs, ", "), "kind"))
}
if ref.Name == "" {
errs = errs.Also(apis.ErrMissingField("name"))
} else if verrs := validation.IsDNS1123Label(ref.Name); len(verrs) != 0 {
errs = errs.Also(apis.ErrInvalidValue(strings.Join(verrs, ", "), "name"))
}
if ref.Namespace == "" {
errs = errs.Also(apis.ErrMissingField("namespace"))
} else if verrs := validation.IsDNS1123Label(ref.Namespace); len(verrs) != 0 {
errs = errs.Also(apis.ErrInvalidValue(strings.Join(verrs, ", "), "namespace"))
}
// Disallowed fields in ObjectReference-compatible context.
if ref.Selector != nil {
errs = errs.Also(apis.ErrDisallowedFields("selector"))
}
return errs
}
func (ref *Reference) Validate(ctx context.Context) *apis.FieldError {
var errs *apis.FieldError
// Required fields
if ref.APIVersion == "" {
errs = errs.Also(apis.ErrMissingField("apiVersion"))
} else if verrs := validation.IsQualifiedName(ref.APIVersion); len(verrs) != 0 {
errs = errs.Also(apis.ErrInvalidValue(strings.Join(verrs, ", "), "apiVersion"))
}
if ref.Kind == "" {
errs = errs.Also(apis.ErrMissingField("kind"))
} else if verrs := validation.IsCIdentifier(ref.Kind); len(verrs) != 0 {
errs = errs.Also(apis.ErrInvalidValue(strings.Join(verrs, ", "), "kind"))
}
if ref.Namespace == "" {
errs = errs.Also(apis.ErrMissingField("namespace"))
} else if verrs := validation.IsDNS1123Label(ref.Namespace); len(verrs) != 0 {
errs = errs.Also(apis.ErrInvalidValue(strings.Join(verrs, ", "), "namespace"))
}
switch {
case ref.Selector != nil && ref.Name != "":
errs = errs.Also(apis.ErrMultipleOneOf("selector", "name"))
case ref.Selector != nil:
_, err := metav1.LabelSelectorAsSelector(ref.Selector)
if err != nil {
errs = errs.Also(apis.ErrInvalidValue(err.Error(), "selector"))
}
case ref.Name != "":
if verrs := validation.IsDNS1123Label(ref.Name); len(verrs) != 0 {
errs = errs.Also(apis.ErrInvalidValue(strings.Join(verrs, ", "), "name"))
}
default:
errs = errs.Also(apis.ErrMissingOneOf("selector", "name"))
}
return errs
}

View File

@ -18,35 +18,29 @@ limitations under the License.
// Code generated by deepcopy-gen. DO NOT EDIT.
package v1alpha1
package tracker
import (
v1 "k8s.io/api/core/v1"
apis "knative.dev/pkg/apis"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Destination) DeepCopyInto(out *Destination) {
func (in *Reference) DeepCopyInto(out *Reference) {
*out = *in
if in.Ref != nil {
in, out := &in.Ref, &out.Ref
*out = new(v1.ObjectReference)
**out = **in
}
if in.URI != nil {
in, out := &in.URI, &out.URI
*out = new(apis.URL)
if in.Selector != nil {
in, out := &in.Selector, &out.Selector
*out = new(v1.LabelSelector)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Destination.
func (in *Destination) DeepCopy() *Destination {
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Reference.
func (in *Reference) DeepCopy() *Reference {
if in == nil {
return nil
}
out := new(Destination)
out := new(Reference)
in.DeepCopyInto(out)
return out
}

101
vendor/knative.dev/pkg/webhook/README.md vendored Normal file
View File

@ -0,0 +1,101 @@
## Knative Webhooks
Knative provides infrastructure for authoring webhooks under
`knative.dev/pkg/webhook` and has a few built-in helpers for certain
common admission control scenarios. The built-in admission controllers
are:
1. Resource validation and defaulting (builds around `apis.Validatable`
and `apis.Defaultable` under `knative.dev/pkg/apis`).
2. ConfigMap validation, which builds around similar patterns from
`knative.dev/pkg/configmap` (in particular the `store` concept)
To illustrate standing up the webhook, let's start with one of these
built-in admission controllers and then talk about how you can write
your own admission controller.
## Standing up a Webhook from an Admission Controller
We provide facilities in `knative.dev/pkg/injection/sharedmain` to try and
eliminate much of the boilerplate involved in standing up a webhook. For this
example we will show how to stand up the webhook using the built-in admission
controller for validating and defaulting resources.
The code to stand up such a webhook looks roughly like this:
```go
// Create a function matching this signature to pass into sharedmain.
func NewResourceAdmissionController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
return resourcesemantics.NewAdmissionController(ctx,
// Name of the resource webhook (created via yaml)
fmt.Sprintf("resources.webhook.%s.knative.dev", system.Namespace()),
// The path on which to serve the webhook.
"/resource-validation",
// The resources to validate and default.
map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
// List the types to validate, this from knative.dev/sample-controller
v1alpha1.SchemeGroupVersion.WithKind("AddressableService"): &v1alpha1.AddressableService{},
},
// A function that infuses the context passed to Validate/SetDefaults with custom metadata.
func(ctx context.Context) context.Context {
// Here is where you would infuse the context with state
// (e.g. attach a store with configmap data, like knative.dev/serving attaches config-defaults)
return ctx
},
// Whether to disallow unknown fields when parsing the resources' JSON.
true,
)
}
func main() {
// Set up a signal context with our webhook options.
ctx := webhook.WithOptions(signals.NewContext(), webhook.Options{
// The name of the Kubernetes service selecting over this deployment's pods.
ServiceName: "webhook",
// The port on which to serve.
Port: 8443,
// The name of the secret containing certificate data.
SecretName: "webhook-certs",
})
sharedmain.MainWithContext(ctx, "webhook",
// The certificate controller will ensure that the named secret (above) has
// the appropriate shape for our webhook's admission controllers.
certificates.NewController,
// This invokes the method defined above to instantiate the resource admission
// controller.
NewResourceAdmissionController,
)
}
```
There is also a config map validation admission controller built in under
`knative.dev/pkg/webhook/configmaps`.
## Writing new Admission Controllers
To implement your own admission controller akin to the resource defaulting and
validation controller above, you implement a `knative.dev/pkg/controller.Reconciler` as with
any you would with any other type of controller, but the `Reconciler` that gets
embedded in the `*controller.Impl` should *also* implement:
```go
// AdmissionController provides the interface for different admission controllers
type AdmissionController interface {
// Path returns the path that this particular admission controller serves on.
Path() string
// Admit is the callback which is invoked when an HTTPS request comes in on Path().
Admit(context.Context, *admissionv1beta1.AdmissionRequest) *admissionv1beta1.AdmissionResponse
}
```
The `Reconciler` part is responsible for the mutating or validating webhook configuration.
The `AdmissionController` part is responsible for guiding request dispatch (`Path()`) and
handling admission requests (`Admit()`).

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package webhook
package configmaps
import (
"bytes"
@ -26,49 +26,62 @@ import (
admissionv1beta1 "k8s.io/api/admission/v1beta1"
admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
admissionlisters "k8s.io/client-go/listers/admissionregistration/v1beta1"
corelisters "k8s.io/client-go/listers/core/v1"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/kmp"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"
"knative.dev/pkg/system"
"knative.dev/pkg/webhook"
certresources "knative.dev/pkg/webhook/certificates/resources"
)
// ConfigValidationController implements the AdmissionController for ConfigMaps
type ConfigValidationController struct {
// name of the ValidatingWebhookConfiguration
name string
// path that the webhook should serve on
// reconciler implements the AdmissionController for ConfigMaps
type reconciler struct {
name string
path string
constructors map[string]reflect.Value
client kubernetes.Interface
vwhlister admissionlisters.ValidatingWebhookConfigurationLister
secretlister corelisters.SecretLister
secretName string
}
// NewConfigValidationController constructs a ConfigValidationController
func NewConfigValidationController(
name, path string,
constructors configmap.Constructors) AdmissionController {
cfgValidations := &ConfigValidationController{
name: name,
path: path,
constructors: make(map[string]reflect.Value),
var _ controller.Reconciler = (*reconciler)(nil)
var _ webhook.AdmissionController = (*reconciler)(nil)
// Reconcile implements controller.Reconciler
func (ac *reconciler) Reconcile(ctx context.Context, key string) error {
logger := logging.FromContext(ctx)
secret, err := ac.secretlister.Secrets(system.Namespace()).Get(ac.secretName)
if err != nil {
logger.Errorf("Error fetching secret: %v", err)
return err
}
for configName, constructor := range constructors {
cfgValidations.registerConfig(configName, constructor)
caCert, ok := secret.Data[certresources.CACert]
if !ok {
return fmt.Errorf("secret %q is missing %q key", ac.secretName, certresources.CACert)
}
return cfgValidations
return ac.reconcileValidatingWebhook(ctx, caCert)
}
// Path implements AdmissionController
func (ac *ConfigValidationController) Path() string {
func (ac *reconciler) Path() string {
return ac.path
}
// Admit implements AdmissionController
func (ac *ConfigValidationController) Admit(ctx context.Context, request *admissionv1beta1.AdmissionRequest) *admissionv1beta1.AdmissionResponse {
func (ac *reconciler) Admit(ctx context.Context, request *admissionv1beta1.AdmissionRequest) *admissionv1beta1.AdmissionResponse {
logger := logging.FromContext(ctx)
switch request.Operation {
case admissionv1beta1.Create, admissionv1beta1.Update:
@ -78,7 +91,7 @@ func (ac *ConfigValidationController) Admit(ctx context.Context, request *admiss
}
if err := ac.validate(ctx, request); err != nil {
return makeErrorStatus("validation failed: %v", err)
return webhook.MakeErrorStatus("validation failed: %v", err)
}
return &admissionv1beta1.AdmissionResponse{
@ -86,9 +99,7 @@ func (ac *ConfigValidationController) Admit(ctx context.Context, request *admiss
}
}
// Register implements AdmissionController
func (ac *ConfigValidationController) Register(ctx context.Context, kubeClient kubernetes.Interface, caCert []byte) error {
client := kubeClient.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations()
func (ac *reconciler) reconcileValidatingWebhook(ctx context.Context, caCert []byte) error {
logger := logging.FromContext(ctx)
ruleScope := admissionregistrationv1beta1.NamespacedScope
@ -105,7 +116,7 @@ func (ac *ConfigValidationController) Register(ctx context.Context, kubeClient k
},
}}
configuredWebhook, err := client.Get(ac.name, metav1.GetOptions{})
configuredWebhook, err := ac.vwhlister.Get(ac.name)
if err != nil {
return fmt.Errorf("error retrieving webhook: %v", err)
}
@ -125,14 +136,15 @@ func (ac *ConfigValidationController) Register(ctx context.Context, kubeClient k
if webhook.Webhooks[i].ClientConfig.Service == nil {
return fmt.Errorf("missing service reference for webhook: %s", wh.Name)
}
webhook.Webhooks[i].ClientConfig.Service.Path = ptr.String(ac.path)
webhook.Webhooks[i].ClientConfig.Service.Path = ptr.String(ac.Path())
}
if ok, err := kmp.SafeEqual(configuredWebhook, webhook); err != nil {
return fmt.Errorf("error diffing webhooks: %v", err)
} else if !ok {
logger.Info("Updating webhook")
if _, err := client.Update(webhook); err != nil {
vwhclient := ac.client.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations()
if _, err := vwhclient.Update(webhook); err != nil {
return fmt.Errorf("failed to update webhook: %v", err)
}
} else {
@ -142,7 +154,7 @@ func (ac *ConfigValidationController) Register(ctx context.Context, kubeClient k
return nil
}
func (ac *ConfigValidationController) validate(ctx context.Context, req *admissionv1beta1.AdmissionRequest) error {
func (ac *reconciler) validate(ctx context.Context, req *admissionv1beta1.AdmissionRequest) error {
logger := logging.FromContext(ctx)
kind := req.Kind
newBytes := req.Object.Raw
@ -186,7 +198,7 @@ func (ac *ConfigValidationController) validate(ctx context.Context, req *admissi
return err
}
func (ac *ConfigValidationController) registerConfig(name string, constructor interface{}) {
func (ac *reconciler) registerConfig(name string, constructor interface{}) {
if err := configmap.ValidateConstructor(constructor); err != nil {
panic(err)
}

View File

@ -0,0 +1,84 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package configmaps
import (
"context"
"reflect"
// Injection stuff
kubeclient "knative.dev/pkg/client/injection/kube/client"
vwhinformer "knative.dev/pkg/client/injection/kube/informers/admissionregistration/v1beta1/validatingwebhookconfiguration"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/system"
"knative.dev/pkg/webhook"
)
// NewAdmissionController constructs a reconciler
func NewAdmissionController(
ctx context.Context,
name, path string,
constructors configmap.Constructors,
) *controller.Impl {
client := kubeclient.Get(ctx)
vwhInformer := vwhinformer.Get(ctx)
secretInformer := secretinformer.Get(ctx)
options := webhook.GetOptions(ctx)
wh := &reconciler{
name: name,
path: path,
constructors: make(map[string]reflect.Value),
secretName: options.SecretName,
client: client,
vwhlister: vwhInformer.Lister(),
secretlister: secretInformer.Lister(),
}
for configName, constructor := range constructors {
wh.registerConfig(configName, constructor)
}
logger := logging.FromContext(ctx)
c := controller.NewImpl(wh, logger, "ConfigMapWebhook")
// Reconcile when the named ValidatingWebhookConfiguration changes.
vwhInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(name),
// It doesn't matter what we enqueue because we will always Reconcile
// the named VWH resource.
Handler: controller.HandleAll(c.Enqueue),
})
// Reconcile when the cert bundle changes.
secretInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithNameAndNamespace(system.Namespace(), wh.secretName),
// It doesn't matter what we enqueue because we will always Reconcile
// the named VWH resource.
Handler: controller.HandleAll(c.Enqueue),
})
return c
}

View File

@ -0,0 +1,83 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcesemantics
import (
"context"
// Injection stuff
kubeclient "knative.dev/pkg/client/injection/kube/client"
mwhinformer "knative.dev/pkg/client/injection/kube/informers/admissionregistration/v1beta1/mutatingwebhookconfiguration"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/system"
"knative.dev/pkg/webhook"
)
// NewAdmissionController constructs a reconciler
func NewAdmissionController(
ctx context.Context,
name, path string,
handlers map[schema.GroupVersionKind]GenericCRD,
wc func(context.Context) context.Context,
disallowUnknownFields bool,
) *controller.Impl {
client := kubeclient.Get(ctx)
mwhInformer := mwhinformer.Get(ctx)
secretInformer := secretinformer.Get(ctx)
options := webhook.GetOptions(ctx)
wh := &reconciler{
name: name,
path: path,
handlers: handlers,
withContext: wc,
disallowUnknownFields: disallowUnknownFields,
secretName: options.SecretName,
client: client,
mwhlister: mwhInformer.Lister(),
secretlister: secretInformer.Lister(),
}
logger := logging.FromContext(ctx)
c := controller.NewImpl(wh, logger, "ConfigMapWebhook")
// Reconcile when the named MutatingWebhookConfiguration changes.
mwhInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(name),
// It doesn't matter what we enqueue because we will always Reconcile
// the named MWH resource.
Handler: controller.HandleAll(c.Enqueue),
})
// Reconcile when the cert bundle changes.
secretInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithNameAndNamespace(system.Namespace(), wh.secretName),
// It doesn't matter what we enqueue because we will always Reconcile
// the named MWH resource.
Handler: controller.HandleAll(c.Enqueue),
})
return c
}

View File

@ -14,12 +14,13 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package webhook
package resourcesemantics
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"sort"
"strings"
@ -29,17 +30,20 @@ import (
"go.uber.org/zap"
admissionv1beta1 "k8s.io/api/admission/v1beta1"
admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
admissionlisters "k8s.io/client-go/listers/admissionregistration/v1beta1"
corelisters "k8s.io/client-go/listers/core/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck"
"knative.dev/pkg/controller"
"knative.dev/pkg/kmp"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"
"knative.dev/pkg/system"
"knative.dev/pkg/webhook"
certresources "knative.dev/pkg/webhook/certificates/resources"
)
// GenericCRD is the interface definition that allows us to perform the generic
@ -50,49 +54,58 @@ type GenericCRD interface {
runtime.Object
}
// ResourceAdmissionController implements the AdmissionController for resources
type ResourceAdmissionController struct {
// name of the MutatingWebhookConfiguration
name string
// path that the webhook should serve on
var errMissingNewObject = errors.New("the new object may not be nil")
// reconciler implements the AdmissionController for resources
type reconciler struct {
name string
path string
handlers map[schema.GroupVersionKind]GenericCRD
disallowUnknownFields bool
withContext func(context.Context) context.Context
// WithContext is public for testing.
WithContext func(context.Context) context.Context
client kubernetes.Interface
mwhlister admissionlisters.MutatingWebhookConfigurationLister
secretlister corelisters.SecretLister
disallowUnknownFields bool
secretName string
}
// NewResourceAdmissionController constructs a ResourceAdmissionController
func NewResourceAdmissionController(
name, path string,
handlers map[schema.GroupVersionKind]GenericCRD,
disallowUnknownFields bool,
withContext func(context.Context) context.Context,
) AdmissionController {
return &ResourceAdmissionController{
name: name,
path: path,
handlers: handlers,
disallowUnknownFields: disallowUnknownFields,
WithContext: withContext,
var _ controller.Reconciler = (*reconciler)(nil)
var _ webhook.AdmissionController = (*reconciler)(nil)
// Reconcile implements controller.Reconciler
func (ac *reconciler) Reconcile(ctx context.Context, key string) error {
logger := logging.FromContext(ctx)
// Look up the webhook secret, and fetch the CA cert bundle.
secret, err := ac.secretlister.Secrets(system.Namespace()).Get(ac.secretName)
if err != nil {
logger.Errorf("Error fetching secret: %v", err)
return err
}
caCert, ok := secret.Data[certresources.CACert]
if !ok {
return fmt.Errorf("secret %q is missing %q key", ac.secretName, certresources.CACert)
}
// Reconcile the webhook configuration.
return ac.reconcileMutatingWebhook(ctx, caCert)
}
// Path implements AdmissionController
func (ac *ResourceAdmissionController) Path() string {
func (ac *reconciler) Path() string {
return ac.path
}
// Admit implements AdmissionController
func (ac *ResourceAdmissionController) Admit(ctx context.Context, request *admissionv1beta1.AdmissionRequest) *admissionv1beta1.AdmissionResponse {
logger := logging.FromContext(ctx)
if ac.WithContext != nil {
ctx = ac.WithContext(ctx)
func (ac *reconciler) Admit(ctx context.Context, request *admissionv1beta1.AdmissionRequest) *admissionv1beta1.AdmissionResponse {
if ac.withContext != nil {
ctx = ac.withContext(ctx)
}
logger := logging.FromContext(ctx)
switch request.Operation {
case admissionv1beta1.Create, admissionv1beta1.Update:
default:
@ -102,7 +115,7 @@ func (ac *ResourceAdmissionController) Admit(ctx context.Context, request *admis
patchBytes, err := ac.mutate(ctx, request)
if err != nil {
return makeErrorStatus("mutation failed: %v", err)
return webhook.MakeErrorStatus("mutation failed: %v", err)
}
logger.Infof("Kind: %q PatchBytes: %v", request.Kind, string(patchBytes))
@ -116,9 +129,7 @@ func (ac *ResourceAdmissionController) Admit(ctx context.Context, request *admis
}
}
// Register implements AdmissionController
func (ac *ResourceAdmissionController) Register(ctx context.Context, kubeClient kubernetes.Interface, caCert []byte) error {
client := kubeClient.AdmissionregistrationV1beta1().MutatingWebhookConfigurations()
func (ac *reconciler) reconcileMutatingWebhook(ctx context.Context, caCert []byte) error {
logger := logging.FromContext(ctx)
var rules []admissionregistrationv1beta1.RuleWithOperations
@ -150,7 +161,7 @@ func (ac *ResourceAdmissionController) Register(ctx context.Context, kubeClient
return lhs.Resources[0] < rhs.Resources[0]
})
configuredWebhook, err := client.Get(ac.name, metav1.GetOptions{})
configuredWebhook, err := ac.mwhlister.Get(ac.name)
if err != nil {
return fmt.Errorf("error retrieving webhook: %v", err)
}
@ -170,14 +181,15 @@ func (ac *ResourceAdmissionController) Register(ctx context.Context, kubeClient
if webhook.Webhooks[i].ClientConfig.Service == nil {
return fmt.Errorf("missing service reference for webhook: %s", wh.Name)
}
webhook.Webhooks[i].ClientConfig.Service.Path = ptr.String(ac.path)
webhook.Webhooks[i].ClientConfig.Service.Path = ptr.String(ac.Path())
}
if ok, err := kmp.SafeEqual(configuredWebhook, webhook); err != nil {
return fmt.Errorf("error diffing webhooks: %v", err)
} else if !ok {
logger.Info("Updating webhook")
if _, err := client.Update(webhook); err != nil {
mwhclient := ac.client.AdmissionregistrationV1beta1().MutatingWebhookConfigurations()
if _, err := mwhclient.Update(webhook); err != nil {
return fmt.Errorf("failed to update webhook: %v", err)
}
} else {
@ -186,7 +198,7 @@ func (ac *ResourceAdmissionController) Register(ctx context.Context, kubeClient
return nil
}
func (ac *ResourceAdmissionController) mutate(ctx context.Context, req *admissionv1beta1.AdmissionRequest) ([]byte, error) {
func (ac *reconciler) mutate(ctx context.Context, req *admissionv1beta1.AdmissionRequest) ([]byte, error) {
kind := req.Kind
newBytes := req.Object.Raw
oldBytes := req.OldObject.Raw
@ -291,7 +303,7 @@ func (ac *ResourceAdmissionController) mutate(ctx context.Context, req *admissio
return json.Marshal(patches)
}
func (ac *ResourceAdmissionController) setUserInfoAnnotations(ctx context.Context, patches duck.JSONPatch, new GenericCRD, groupName string) (duck.JSONPatch, error) {
func (ac *reconciler) setUserInfoAnnotations(ctx context.Context, patches duck.JSONPatch, new GenericCRD, groupName string) (duck.JSONPatch, error) {
if new == nil {
return patches, nil
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package webhook
package resourcesemantics
import (
"context"

View File

@ -17,11 +17,13 @@ limitations under the License.
package testing
import (
admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
admissionlisters "k8s.io/client-go/listers/admissionregistration/v1beta1"
appsv1listers "k8s.io/client-go/listers/apps/v1"
autoscalingv2beta1listers "k8s.io/client-go/listers/autoscaling/v2beta1"
corev1listers "k8s.io/client-go/listers/core/v1"
@ -130,3 +132,13 @@ func (l *Listers) GetConfigMapLister() corev1listers.ConfigMapLister {
func (l *Listers) GetNamespaceLister() corev1listers.NamespaceLister {
return corev1listers.NewNamespaceLister(l.IndexerFor(&corev1.Namespace{}))
}
// GetMutatingWebhookConfigurationLister gets lister for K8s MutatingWebhookConfiguration resource.
func (l *Listers) GetMutatingWebhookConfigurationLister() admissionlisters.MutatingWebhookConfigurationLister {
return admissionlisters.NewMutatingWebhookConfigurationLister(l.IndexerFor(&admissionregistrationv1beta1.MutatingWebhookConfiguration{}))
}
// GetValidatingWebhookConfigurationLister gets lister for K8s ValidatingWebhookConfiguration resource.
func (l *Listers) GetValidatingWebhookConfigurationLister() admissionlisters.ValidatingWebhookConfigurationLister {
return admissionlisters.NewValidatingWebhookConfigurationLister(l.IndexerFor(&admissionregistrationv1beta1.ValidatingWebhookConfiguration{}))
}

View File

@ -28,21 +28,20 @@ import (
admissionv1beta1 "k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/system"
pkgtest "knative.dev/pkg/testing"
// Makes system.Namespace work in tests.
_ "knative.dev/pkg/system/testing"
. "knative.dev/pkg/testing"
)
// CreateResource creates a testing.Resource with the given name in the system namespace.
func CreateResource(name string) *Resource {
return &Resource{
func CreateResource(name string) *pkgtest.Resource {
return &pkgtest.Resource{
ObjectMeta: metav1.ObjectMeta{
Namespace: system.Namespace(),
Name: name,
},
Spec: ResourceSpec{
Spec: pkgtest.ResourceSpec{
FieldWithValidation: "magic value",
},
}

View File

@ -27,24 +27,18 @@ import (
// Injection stuff
kubeclient "knative.dev/pkg/client/injection/kube/client"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
kubeinformerfactory "knative.dev/pkg/client/injection/kube/informers/factory"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
admissionv1beta1 "k8s.io/api/admission/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"knative.dev/pkg/logging"
"knative.dev/pkg/logging/logkey"
"knative.dev/pkg/system"
certresources "knative.dev/pkg/webhook/certificates/resources"
admissionv1beta1 "k8s.io/api/admission/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
)
var (
errMissingNewObject = errors.New("the new object may not be nil")
)
// Options contains the configuration for the webhook
@ -64,12 +58,6 @@ type Options struct {
// only a single port for the service.
Port int
// RegistrationDelay controls how long admission registration
// occurs after the webhook is started. This is used to avoid
// potential races where registration completes and k8s apiserver
// invokes the webhook before the HTTP server is started.
RegistrationDelay time.Duration
// StatsReporter reports metrics about the webhook.
// This will be automatically initialized by the constructor if left uninitialized.
StatsReporter StatsReporter
@ -81,11 +69,9 @@ type AdmissionController interface {
Path() string
// Admit is the callback which is invoked when an HTTPS request comes in on Path().
// TODO(mattmoor): This will need to be different for Conversion webhooks, which is something
// to start thinking about.
Admit(context.Context, *admissionv1beta1.AdmissionRequest) *admissionv1beta1.AdmissionResponse
// Register is called at startup to give the AdmissionController a chance to
// register with the API Server.
Register(context.Context, kubernetes.Interface, []byte) error
}
// Webhook implements the external webhook for validation of
@ -105,7 +91,14 @@ func New(
) (*Webhook, error) {
client := kubeclient.Get(ctx)
secretInformer := secretinformer.Get(ctx)
// Injection is too aggressive for this case because by simply linking this
// library we force consumers to have secret access. If we require that one
// of the admission controllers' informers *also* require the secret
// informer, then we can fetch the shared informer factory here and produce
// a new secret informer from it.
secretInformer := kubeinformerfactory.Get(ctx).Core().V1().Secrets()
opts := GetOptions(ctx)
if opts == nil {
return nil, errors.New("context must have Options specified")
@ -120,10 +113,11 @@ func New(
opts.StatsReporter = reporter
}
acs := make(map[string]AdmissionController, len(admissionControllers))
// Build up a map of paths to admission controllers for routing handlers.
acs := map[string]AdmissionController{}
for _, ac := range admissionControllers {
if _, ok := acs[ac.Path()]; ok {
return nil, fmt.Errorf("admission controller with conflicting path: %q", ac.Path())
return nil, fmt.Errorf("duplicate admission controller path %q", ac.Path())
}
acs[ac.Path()] = ac
}
@ -140,14 +134,7 @@ func New(
// Run implements the admission controller run loop.
func (ac *Webhook) Run(stop <-chan struct{}) error {
logger := ac.Logger
ctx := logging.WithLogger(context.TODO(), logger)
// TODO(mattmoor): Separate out the certificate creation process and use listers
// to fetch this from the secret below.
_, _, caCert, err := getOrGenerateKeyCertsFromSecret(ctx, ac.Client, &ac.Options)
if err != nil {
return err
}
ctx := logging.WithLogger(context.Background(), logger)
server := &http.Server{
Handler: ac,
@ -177,34 +164,8 @@ func (ac *Webhook) Run(stop <-chan struct{}) error {
}
logger.Info("Found certificates for webhook...")
if ac.Options.RegistrationDelay != 0 {
logger.Infof("Delaying admission webhook registration for %v", ac.Options.RegistrationDelay)
}
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
select {
case <-time.After(ac.Options.RegistrationDelay):
// Wait an initial delay before registering
case <-stop:
return nil
}
// Register the webhook, and then periodically check that it is up to date.
for {
for _, c := range ac.admissionControllers {
if err := c.Register(ctx, ac.Client, caCert); err != nil {
logger.Errorw("failed to register webhook", zap.Error(err))
return err
}
}
logger.Info("Successfully registered webhook")
select {
case <-time.After(10 * time.Minute):
case <-stop:
return nil
}
}
})
eg.Go(func() error {
if err := server.ListenAndServeTLS("", ""); err != nil {
logger.Errorw("ListenAndServeTLS for admission webhook returned error", zap.Error(err))
@ -251,13 +212,15 @@ func (ac *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
zap.String(logkey.UserInfo, fmt.Sprint(review.Request.UserInfo)))
ctx := logging.WithLogger(r.Context(), logger)
if _, ok := ac.admissionControllers[r.URL.Path]; !ok {
c, ok := ac.admissionControllers[r.URL.Path]
if !ok {
http.Error(w, fmt.Sprintf("no admission controller registered for: %s", r.URL.Path), http.StatusBadRequest)
return
}
c := ac.admissionControllers[r.URL.Path]
// Where the magic happens.
reviewResponse := c.Admit(ctx, review.Request)
var response admissionv1beta1.AdmissionReview
if reviewResponse != nil {
response.Response = reviewResponse
@ -278,47 +241,7 @@ func (ac *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
func getOrGenerateKeyCertsFromSecret(ctx context.Context, client kubernetes.Interface,
options *Options) (serverKey, serverCert, caCert []byte, err error) {
logger := logging.FromContext(ctx)
secret, err := client.CoreV1().Secrets(system.Namespace()).Get(options.SecretName, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return nil, nil, nil, err
}
logger.Info("Did not find existing secret, creating one")
newSecret, err := certresources.MakeSecret(
ctx, options.SecretName, system.Namespace(), options.ServiceName)
if err != nil {
return nil, nil, nil, err
}
secret, err = client.CoreV1().Secrets(newSecret.Namespace).Create(newSecret)
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return nil, nil, nil, err
}
// OK, so something else might have created, try fetching it instead.
secret, err = client.CoreV1().Secrets(system.Namespace()).Get(options.SecretName, metav1.GetOptions{})
if err != nil {
return nil, nil, nil, err
}
}
}
var ok bool
if serverKey, ok = secret.Data[certresources.ServerKey]; !ok {
return nil, nil, nil, errors.New("server key missing")
}
if serverCert, ok = secret.Data[certresources.ServerCert]; !ok {
return nil, nil, nil, errors.New("server cert missing")
}
if caCert, ok = secret.Data[certresources.CACert]; !ok {
return nil, nil, nil, errors.New("ca cert missing")
}
return serverKey, serverCert, caCert, nil
}
func makeErrorStatus(reason string, args ...interface{}) *admissionv1beta1.AdmissionResponse {
func MakeErrorStatus(reason string, args ...interface{}) *admissionv1beta1.AdmissionResponse {
result := apierrors.NewBadRequest(fmt.Sprintf(reason, args...)).Status()
return &admissionv1beta1.AdmissionResponse{
Result: &result,

View File

@ -310,7 +310,7 @@ function create_test_cluster_with_retries() {
# - latest GKE not available in this region/zone yet (https://github.com/knative/test-infra/issues/694)
[[ -z "$(grep -Fo 'does not have enough resources available to fulfill' ${cluster_creation_log})" \
&& -z "$(grep -Fo 'ResponseError: code=400, message=No valid versions with the prefix' ${cluster_creation_log})" \
&& -z "$(grep -Po 'ResponseError: code=400, message=Master version "[0-9a-z\-\.]+" is unsupported' ${cluster_creation_log})" ]] \
&& -z "$(grep -Po 'ResponseError: code=400, message=Master version "[0-9a-z\-\.]+" is unsupported' ${cluster_creation_log})" \
&& -z "$(grep -Po 'only \d+ nodes out of \d+ have registered; this is likely due to Nodes failing to start correctly' ${cluster_creation_log})" ]] \
&& return 1
done
@ -325,6 +325,9 @@ function setup_test_cluster() {
set -o errexit
set -o pipefail
header "Test cluster setup"
kubectl get nodes
header "Setting up test cluster"
# Set the actual project the test cluster resides in

View File

@ -132,30 +132,46 @@ function wait_until_object_does_not_exist() {
# Parameters: $1 - namespace.
function wait_until_pods_running() {
echo -n "Waiting until all pods in namespace $1 are up"
local failed_pod=""
for i in {1..150}; do # timeout after 5 minutes
local pods="$(kubectl get pods --no-headers -n $1 2>/dev/null)"
# All pods must be running
local not_running=$(echo "${pods}" | grep -v Running | grep -v Completed | wc -l)
if [[ -n "${pods}" && ${not_running} -eq 0 ]]; then
local not_running_pods=$(echo "${pods}" | grep -v Running | grep -v Completed)
if [[ -n "${pods}" ]] && [[ -z "${not_running_pods}" ]]; then
# All Pods are running or completed. Verify the containers on each Pod.
local all_ready=1
while read pod ; do
local status=(`echo -n ${pod} | cut -f2 -d' ' | tr '/' ' '`)
# Set this Pod as the failed_pod. If nothing is wrong with it, then after the checks, set
# failed_pod to the empty string.
failed_pod=$(echo -n "${pod}" | cut -f1 -d' ')
# All containers must be ready
[[ -z ${status[0]} ]] && all_ready=0 && break
[[ -z ${status[1]} ]] && all_ready=0 && break
[[ ${status[0]} -lt 1 ]] && all_ready=0 && break
[[ ${status[1]} -lt 1 ]] && all_ready=0 && break
[[ ${status[0]} -ne ${status[1]} ]] && all_ready=0 && break
# All the tests passed, this is not a failed pod.
failed_pod=""
done <<< "$(echo "${pods}" | grep -v Completed)"
if (( all_ready )); then
echo -e "\nAll pods are up:\n${pods}"
return 0
fi
elif [[ -n "${not_running_pods}" ]]; then
# At least one Pod is not running, just save the first one's name as the failed_pod.
failed_pod="$(echo "${not_running_pods}" | head -n 1 | cut -f1 -d' ')"
fi
echo -n "."
sleep 2
done
echo -e "\n\nERROR: timeout waiting for pods to come up\n${pods}"
if [[ -n "${failed_pod}" ]]; then
echo -e "\n\nFailed Pod (data in YAML format) - ${failed_pod}\n"
kubectl -n $1 get pods "${failed_pod}" -oyaml
echo -e "\n\nPod Logs\n"
kubectl -n $1 logs "${failed_pod}" --all-containers
fi
return 1
}
@ -337,7 +353,7 @@ function create_junit_xml() {
# Also escape `<` and `>` as here: https://github.com/golang/go/blob/50bd1c4d4eb4fac8ddeb5f063c099daccfb71b26/src/encoding/json/encode.go#L48,
# this is temporary solution for fixing https://github.com/knative/test-infra/issues/1204,
# which should be obsolete once Test-infra 2.0 is in place
local msg="$(echo -n "$3" | sed 's/$/\&#xA;/g' | sed 's/</\\u003c/' | sed 's/>/\\u003e/' | tr -d '\n')"
local msg="$(echo -n "$3" | sed 's/$/\&#xA;/g' | sed 's/</\\u003c/' | sed 's/>/\\u003e/' | sed 's/&/\\u0026/' | tr -d '\n')"
failure="<failure message=\"Failed\" type=\"\">${msg}</failure>"
fi
cat << EOF > "${xml}"
@ -401,6 +417,23 @@ function start_knative_serving() {
wait_until_pods_running knative-serving || return 1
}
# Install Knative Monitoring in the current cluster.
# Parameters: $1 - Knative Monitoring manifest.
function start_knative_monitoring() {
header "Starting Knative Monitoring"
subheader "Installing Knative Monitoring"
# namespace istio-system needs to be created first, due to the comment
# mentioned in
# https://github.com/knative/serving/blob/4202efc0dc12052edc0630515b101cbf8068a609/config/monitoring/tracing/zipkin/100-zipkin.yaml#L21
kubectl create namespace istio-system 2>/dev/null
echo "Installing Monitoring CRDs from $1"
kubectl apply --selector knative.dev/crd-install=true -f "$1" || return 1
echo "Installing the rest of monitoring components from $1"
kubectl apply -f "$1" || return 1
wait_until_pods_running knative-monitoring || return 1
wait_until_pods_running istio-system || return 1
}
# Install the stable release Knative/serving in the current cluster.
# Parameters: $1 - Knative Serving version number, e.g. 0.6.0.
function start_release_knative_serving() {

View File

@ -106,12 +106,18 @@ function update_clusters() {
header "Done updating all clusters"
}
# Run the perf-tests tool
# Parameters: $1..$n - parameters passed to the tool
function run_perf_cluster_tool() {
go run ${REPO_ROOT_DIR}/vendor/knative.dev/pkg/testutils/clustermanager/perf-tests $@
}
# Delete the old clusters belonged to the current repo, and recreate them with the same configuration.
function recreate_clusters() {
header "Recreating clusters for ${REPO_NAME}"
go run ${REPO_ROOT_DIR}/vendor/knative.dev/pkg/testutils/clustermanager/perf-tests \
--recreate \
--gcp-project=${PROJECT_NAME} --repository=${REPO_NAME} --benchmark-root=${BENCHMARK_ROOT_PATH}
run_perf_cluster_tool --recreate \
--gcp-project=${PROJECT_NAME} --repository=${REPO_NAME} --benchmark-root=${BENCHMARK_ROOT_PATH} \
|| abort "failed recreating clusters for ${REPO_NAME}"
header "Done recreating clusters"
# Update all clusters after they are recreated
update_clusters
@ -121,9 +127,9 @@ function recreate_clusters() {
# This function will be run as postsubmit jobs.
function reconcile_benchmark_clusters() {
header "Reconciling clusters for ${REPO_NAME}"
go run ${REPO_ROOT_DIR}/vendor/knative.dev/pkg/testutils/clustermanager/perf-tests \
--reconcile \
--gcp-project=${PROJECT_NAME} --repository=${REPO_NAME} --benchmark-root=${BENCHMARK_ROOT_PATH}
run_perf_cluster_tool --reconcile \
--gcp-project=${PROJECT_NAME} --repository=${REPO_NAME} --benchmark-root=${BENCHMARK_ROOT_PATH} \
|| abort "failed reconciling clusters for ${REPO_NAME}"
header "Done reconciling clusters"
# For now, do nothing after reconciling the clusters, and the next update_clusters job will automatically
# update them. So there will be a period that the newly created clusters are being idle, and the duration

View File

@ -212,7 +212,7 @@ function prepare_dot_release() {
echo "Dot release will be generated for ${version_filter}"
releases="$(echo "${releases}" | grep ^${version_filter})"
fi
local last_version="$(echo "${releases}" | grep '^v[0-9]\+\.[0-9]\+\.[0-9]\+$' | sort -r | head -1)"
local last_version="$(echo "${releases}" | grep '^v[0-9]\+\.[0-9]\+\.[0-9]\+$' | sort -r -V | head -1)"
[[ -n "${last_version}" ]] || abort "no previous release exist"
local major_minor_version=""
if [[ -z "${RELEASE_BRANCH}" ]]; then