caching/vendor/knative.dev/pkg/metrics/resource_view.go

318 lines
8.4 KiB
Go

/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/resource"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
type storedViews struct {
views []*view.View
lock sync.Mutex
}
type meterExporter struct {
m view.Meter // NOTE: DO NOT RETURN THIS DIRECTLY; the view.Meter will not work for the empty Resource
o stats.Options // Cache the option to reduce allocations
e view.Exporter
}
// ResourceExporterFactory provides a hook for producing separate view.Exporters
// for each observed Resource. This is needed because OpenCensus support for
// Resources is a bit tacked-on rather than being a first-class component like
// Tags are.
type ResourceExporterFactory func(*resource.Resource) (view.Exporter, error)
type meters struct {
meters map[string]*meterExporter
factory ResourceExporterFactory
lock sync.Mutex
}
// Lock regime: lock allMeters before resourceViews. The critical path is in
// optionForResource, which must lock allMeters, but only needs to lock
// resourceViews if a new meter needs to be created.
var resourceViews = storedViews{}
var allMeters = meters{
meters: map[string]*meterExporter{"": &defaultMeter},
}
// RegisterResourceView is similar to view.Register(), except that it will
// register the view across all Resources tracked by the system, rather than
// simply the default view.
func RegisterResourceView(views ...*view.View) error {
var err error
allMeters.lock.Lock()
defer allMeters.lock.Unlock()
resourceViews.lock.Lock()
defer resourceViews.lock.Unlock()
for _, meter := range allMeters.meters {
// make a copy of views to avoid data races
viewCopy := make([]*view.View, 0, len(views))
for _, v := range views {
c := *v
viewCopy = append(viewCopy, &c)
}
if e := meter.m.Register(viewCopy...); e != nil {
err = e
}
}
if err != nil {
return err
}
resourceViews.views = append(resourceViews.views, views...)
return nil
}
// UnregisterResourceView is similar to view.Unregiste(), except that it will
// unregister the view across all Resources tracked byt he system, rather than
// simply the default view.
func UnregisterResourceView(views ...*view.View) {
allMeters.lock.Lock()
defer allMeters.lock.Unlock()
resourceViews.lock.Lock()
defer resourceViews.lock.Unlock()
for _, meter := range allMeters.meters {
// Since we make a defensive copy of all views in RegisterResourceView,
// the caller might not have the same view pointer that was registered.
// Use Meter.Find() to find the view with a matching name.
for _, v := range views {
name := v.Name
if v.Name == "" {
name = v.Measure.Name()
}
if v := meter.m.Find(name); v != nil {
meter.m.Unregister(v)
}
}
}
j := 0
for _, view := range resourceViews.views {
toRemove := false
for _, viewToRemove := range views {
if view == viewToRemove {
toRemove = true
break
}
}
if !toRemove {
resourceViews.views[j] = view
j++
}
}
resourceViews.views = resourceViews.views[:j]
}
func setFactory(f ResourceExporterFactory) error {
if f == nil {
return errors.New("do not setFactory(nil)")
}
allMeters.lock.Lock()
defer allMeters.lock.Unlock()
allMeters.factory = f
var retErr error
for r, meter := range allMeters.meters {
e, err := f(resourceFromKey(r))
if err != nil {
retErr = err
continue // Keep trying to clean up remaining Meters.
}
meter.m.UnregisterExporter(meter.e)
meter.m.RegisterExporter(e)
meter.e = e
}
return retErr
}
func flushResourceExporters() {
allMeters.lock.Lock()
defer allMeters.lock.Unlock()
for _, meter := range allMeters.meters {
flushGivenExporter(meter.e)
}
}
func meterExporterForResource(r *resource.Resource) *meterExporter {
key := resourceToKey(r)
mE := allMeters.meters[key]
if mE == nil {
mE = &meterExporter{}
allMeters.meters[key] = mE
}
if mE.o != nil {
return mE
}
mE.m = view.NewMeter()
mE.m.SetResource(r)
mE.m.Start()
resourceViews.lock.Lock()
defer resourceViews.lock.Unlock()
// make a copy of views to avoid data races
viewsCopy := make([]*view.View, 0, len(resourceViews.views))
for _, v := range resourceViews.views {
c := *v
viewsCopy = append(viewsCopy, &c)
}
mE.m.Register(viewsCopy...)
mE.o = stats.WithRecorder(mE.m)
allMeters.meters[key] = mE
return mE
}
// optionForResource finds or creates a stats exporter for the resource, and
// returns a stats.Option indicating which meter to record to.
func optionForResource(r *resource.Resource) (stats.Options, error) {
allMeters.lock.Lock()
defer allMeters.lock.Unlock()
mE := meterExporterForResource(r)
if mE == nil {
return nil, fmt.Errorf("failed lookup for resource %v", r)
}
if mE.e != nil {
// Assume the exporter is already started.
return mE.o, nil
}
if allMeters.factory == nil {
if mE.o != nil {
// If we can't create exporters but we have a Meter, return that.
return mE.o, nil
}
return nil, fmt.Errorf("whoops, allMeters.factory is nil")
}
exporter, err := allMeters.factory(r)
if err != nil {
return nil, err
}
mE.m.RegisterExporter(exporter)
mE.e = exporter
return mE.o, nil
}
func resourceToKey(r *resource.Resource) string {
if r == nil {
return ""
}
var s strings.Builder
l := len(r.Type)
kvs := make([]string, 0, len(r.Labels))
for k, v := range r.Labels {
l += len(k) + len(v) + 2
// We use byte values 1 and 2 to avoid colliding with valid resource labels
// and to make unpacking easy
kvs = append(kvs, fmt.Sprintf("\x01%s\x02%s", k, v))
}
s.Grow(l)
s.WriteString(r.Type)
sort.Strings(kvs) // Go maps are unsorted, so sort by key to produce stable output.
for _, kv := range kvs {
s.WriteString(kv)
}
return s.String()
}
func resourceFromKey(s string) *resource.Resource {
if s == "" {
return nil
}
r := &resource.Resource{Labels: map[string]string{}}
parts := strings.Split(s, "\x01")
r.Type = parts[0]
for _, label := range parts[1:] {
keyValue := strings.SplitN(label, "\x02", 2)
r.Labels[keyValue[0]] = keyValue[1]
}
return r
}
// defaultMeterImpl is a pass-through to the default worker in OpenCensus.
type defaultMeterImpl struct{}
var _ view.Meter = (*defaultMeterImpl)(nil)
// defaultMeter is a custom meterExporter which passes through to the default
// worker in OpenCensus. This allows legacy code that uses OpenCensus and does
// not store a Resource in the context to continue to interoperate.
var defaultMeter = meterExporter{
m: &defaultMeterImpl{},
o: stats.WithRecorder(nil),
}
func (*defaultMeterImpl) Record(*tag.Map, interface{}, map[string]interface{}) {
// using an empty option prevents this from being called
}
// Find calls view.Find
func (*defaultMeterImpl) Find(name string) *view.View {
return view.Find(name)
}
// Register calls view.Register
func (*defaultMeterImpl) Register(views ...*view.View) error {
return view.Register(views...)
}
func (*defaultMeterImpl) Unregister(views ...*view.View) {
view.Unregister(views...)
}
func (*defaultMeterImpl) SetReportingPeriod(t time.Duration) {
view.SetReportingPeriod(t)
}
func (*defaultMeterImpl) RegisterExporter(e view.Exporter) {
view.RegisterExporter(e)
}
func (*defaultMeterImpl) UnregisterExporter(e view.Exporter) {
view.UnregisterExporter(e)
}
func (*defaultMeterImpl) Start() {}
func (*defaultMeterImpl) Stop() {}
func (*defaultMeterImpl) RetrieveData(viewName string) ([]*view.Row, error) {
return view.RetrieveData(viewName)
}
func (*defaultMeterImpl) SetResource(*resource.Resource) {
}
// Read is implemented to support casting defaultMeterImpl to a metricproducer.Producer,
// but returns no values because the prometheus exporter (which is the only consumer)
// already has a built in path which collects these metrics via metricexport, which calls
// concat(x.Read() for x in metricproducer.GlobalManager.GetAll()).
func (*defaultMeterImpl) Read() []*metricdata.Metric {
return []*metricdata.Metric{}
}