Merge pull request #8 from fabxc/node-metrics

Node metrics
This commit is contained in:
Fabian Reinartz 2016-09-07 10:52:22 +02:00 committed by GitHub
commit d44025415f
6 changed files with 277 additions and 236 deletions

View File

@ -65,7 +65,7 @@ func (dc *deploymentCollector) Collect(ch chan<- prometheus.Metric) {
}
}
func (dc *deploymentCollector) collectDeployment(d extensions.Deployment) (res []prometheus.Metric) {
func (dc *deploymentCollector) collectDeployment(d extensions.Deployment) []prometheus.Metric {
return []prometheus.Metric{
prometheus.MustNewConstMetric(
descDeploymentReplicas, prometheus.GaugeValue, float64(d.Status.Replicas),

View File

@ -125,18 +125,24 @@ func gatherAndCompare(c prometheus.Collector, expected string) error {
return fmt.Errorf("parsing expected metrics failed: %s", err)
}
// Compare the sorted gathering result with the parsed expected result.
// Apply the same normalization to the expected output as the client library
// does to the gathering output.
if !reflect.DeepEqual(metrics, normalizeMetricFamilies(expectedMetrics)) {
// Encode the gathered output to the readbale text format for comparison.
var buf bytes.Buffer
enc := expfmt.NewEncoder(&buf, expfmt.FmtText)
var buf1 bytes.Buffer
enc := expfmt.NewEncoder(&buf1, expfmt.FmtText)
for _, mf := range metrics {
if err := enc.Encode(mf); err != nil {
return fmt.Errorf("encoding result failed: %s", err)
}
}
// Encode normalized expected metrics again to generate them in the same ordering
// the registry does to spot differences more easily.
var buf2 bytes.Buffer
enc = expfmt.NewEncoder(&buf2, expfmt.FmtText)
for _, mf := range normalizeMetricFamilies(expectedMetrics) {
if err := enc.Encode(mf); err != nil {
return fmt.Errorf("encoding result failed: %s", err)
}
}
return fmt.Errorf(`
metric output does not match expectation; want:
@ -145,8 +151,8 @@ metric output does not match expectation; want:
got:
%s
`, expected, buf.String())
%s
`, buf2.String(), buf1.String())
}
return nil
}
@ -187,9 +193,13 @@ func (s metricSorter) Swap(i, j int) {
}
func (s metricSorter) Less(i, j int) bool {
sort.Sort(prometheus.LabelPairSorter(s[i].Label))
sort.Sort(prometheus.LabelPairSorter(s[j].Label))
if len(s[i].Label) != len(s[j].Label) {
return len(s[i].Label) < len(s[j].Label)
}
for n, lp := range s[i].Label {
vi := lp.GetValue()
vj := s[j].Label[n].GetValue()
@ -218,9 +228,6 @@ func normalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily)
for name, mf := range metricFamiliesByName {
if len(mf.Metric) > 0 {
names = append(names, name)
for _, m := range mf.Metric {
sort.Sort(prometheus.LabelPairSorter(m.Label))
}
}
}
sort.Strings(names)

128
main.go
View File

@ -18,7 +18,6 @@ package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
@ -68,9 +67,6 @@ var (
apiserver = flags.String("apiserver", "", `The URL of the apiserver to use as a master`)
port = flags.Int("port", 80, `Port to expose metrics on.`)
dryRun = flags.Bool("dry-run", false, `if set, a single dry run of configuration
parsing is executed. Results written to stdout.`)
)
func main() {
@ -90,35 +86,8 @@ func main() {
glog.Fatalf("Failed to create client: %v", err)
}
initializeMetrics()
// Run metrics server.
go metricsServer()
r := &metricsRegistryImpl{}
mc := newMetricsController(kubeClient)
if *dryRun {
// Wait for the initial informer sync.
time.Sleep(100 * time.Millisecond)
mc.updateMetrics(r)
err := dumpMetrics()
if err != nil {
glog.Fatalf("%v", err)
}
} else {
// Update metrics every 10 seconds.
wait.Until(func() {
err := mc.updateMetrics(r)
if err != nil {
if err == errDeferredSync {
glog.Infof("%v", err)
} else {
glog.Fatalf("%v", err)
}
}
}, 10*time.Second, wait.NeverStop)
}
runMetricsController(kubeClient)
metricsServer()
}
func createKubeClient(clientConfig clientcmd.ClientConfig) (kubeClient clientset.Interface, err error) {
@ -165,35 +134,6 @@ func createKubeClient(clientConfig clientcmd.ClientConfig) (kubeClient clientset
return kubeClient, nil
}
func initializeMetrics() {
metrics.nodes = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "nodes",
Help: "Number of nodes",
},
[]string{
// Whether they are reporting ready status
"ready",
},
)
prometheus.MustRegister(metrics.nodes)
}
// Dumps a call to /metrics to stdout. For development/testing.
func dumpMetrics() error {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", *port))
if err != nil {
glog.Fatalf("%v", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
glog.Fatalf("%v", err)
}
glog.Infof("%s", body)
return nil
}
func metricsServer() {
// Address to listen on for web interface and telemetry
listenAddress := fmt.Sprintf(":%d", *port)
@ -222,23 +162,6 @@ func metricsServer() {
log.Fatal(http.ListenAndServe(listenAddress, nil))
}
// All this machinery is for mocking out the prometheus interface for testing
// because promtheus won't let us fetch a metric value after we set it.
type metricsRegistry interface {
setReadyNodes(float64)
setUnreadyNodes(float64)
}
type metricsRegistryImpl struct{}
func (mr *metricsRegistryImpl) setReadyNodes(count float64) {
metrics.nodes.With(prometheus.Labels{"ready": "true"}).Set(count)
}
func (mr *metricsRegistryImpl) setUnreadyNodes(count float64) {
metrics.nodes.With(prometheus.Labels{"ready": "false"}).Set(count)
}
// metricsController watches the kubernetes api and adds/removes services
// from the loadbalancer, via loadBalancerConfig.
type metricsController struct {
@ -251,43 +174,8 @@ type metricsController struct {
nodeStore cache.StoreToNodeLister
}
// sync all services with the loadbalancer.
func (mc *metricsController) updateMetrics(r metricsRegistry) error {
if !mc.nodeController.HasSynced() {
time.Sleep(100 * time.Millisecond)
return errDeferredSync
}
nodes, err := mc.nodeStore.List()
if err != nil {
return err
}
registerNodeMetrics(r, nodes.Items)
return nil
}
func registerNodeMetrics(r metricsRegistry, nodes []api.Node) {
var readyNodes float64
var unreadyNodes float64
for _, n := range nodes {
for _, c := range n.Status.Conditions {
if c.Type == api.NodeReady {
if c.Status == api.ConditionTrue {
readyNodes += 1
} else {
// Even if status is unknown, call it unready.
unreadyNodes += 1
}
}
}
}
r.setReadyNodes(readyNodes)
r.setUnreadyNodes(unreadyNodes)
}
// newMetricsController creates a new controller from the given config.
func newMetricsController(kubeClient clientset.Interface) *metricsController {
// runMetricsController creates a new controller from the given config.
func runMetricsController(kubeClient clientset.Interface) *metricsController {
mc := &metricsController{
client: kubeClient,
}
@ -343,6 +231,14 @@ func newMetricsController(kubeClient clientset.Interface) *metricsController {
store: &mc.podStore,
})
}()
go func() {
for !mc.nodeController.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
prometheus.MustRegister(&nodeCollector{
store: &mc.nodeStore,
})
}()
return mc
}

View File

@ -1,109 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"reflect"
"testing"
"k8s.io/kubernetes/pkg/api"
)
type metricsRegistryMock struct {
readyNodes float64
unreadyNodes float64
containerRestarts map[string]float64
}
func (mr *metricsRegistryMock) setReadyNodes(count float64) {
mr.readyNodes = count
}
func (mr *metricsRegistryMock) setUnreadyNodes(count float64) {
mr.unreadyNodes = count
}
func (mr *metricsRegistryMock) setContainerRestarts(name, namespace, podName string, count float64) {
if mr.containerRestarts == nil {
mr.containerRestarts = map[string]float64{}
}
mr.containerRestarts[name+"-"+podName+"-"+namespace] = count
}
func getNode(condition api.ConditionStatus) api.Node {
return api.Node{
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: condition,
},
},
},
}
}
// This pod will have two containers - you can confiure the restart count on both.
func getPod(name, namespace string, containerStatuses []api.ContainerStatus) *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,
Namespace: namespace,
},
Status: api.PodStatus{
ContainerStatuses: containerStatuses,
},
}
}
func getContainerStatus(name string, restartCount int) api.ContainerStatus {
return api.ContainerStatus{
Name: name,
RestartCount: int32(restartCount),
}
}
func TestRegisterNodeMetrics(t *testing.T) {
cases := []struct {
desc string
nodes []api.Node
registry *metricsRegistryMock
}{
{
desc: "three ready nodes, one unready node, one unknown node",
nodes: []api.Node{
getNode(api.ConditionTrue),
getNode(api.ConditionTrue),
getNode(api.ConditionTrue),
getNode(api.ConditionFalse),
getNode(api.ConditionUnknown),
},
registry: &metricsRegistryMock{
readyNodes: 3,
unreadyNodes: 2,
},
},
}
for _, c := range cases {
r := &metricsRegistryMock{}
registerNodeMetrics(r, c.nodes)
if !reflect.DeepEqual(r, c.registry) {
t.Errorf("error in case \"%s\": actual %v does not equal expected %v", c.desc, r, c.registry)
}
}
}

117
node.go Normal file
View File

@ -0,0 +1,117 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/kubernetes/pkg/api"
)
var (
descNodeInfo = prometheus.NewDesc(
"node_info",
"Information about a cluster node.",
[]string{
"node",
"kernel_version",
"os_image",
"container_runtime_version",
"kubelet_version",
"kubeproxy_version",
}, nil,
)
descNodeStatusReady = prometheus.NewDesc(
"node_status_ready",
"The ready status of a cluster node.",
[]string{"node", "condition"}, nil,
)
)
type nodeStore interface {
List() (api.NodeList, error)
}
// nodeCollector collects metrics about all nodes in the cluster.
type nodeCollector struct {
store nodeStore
}
// Describe implements the prometheus.Collector interface.
func (nc *nodeCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- descNodeInfo
ch <- descNodeStatusReady
}
// Collect implements the prometheus.Collector interface.
func (nc *nodeCollector) Collect(ch chan<- prometheus.Metric) {
nodes, err := nc.store.List()
if err != nil {
glog.Errorf("listing nodes failed: %s", err)
return
}
for _, n := range nodes.Items {
nc.collectNode(ch, n)
}
}
func (nc *nodeCollector) collectNode(ch chan<- prometheus.Metric, n api.Node) {
// Collect node conditions and while default to false.
// TODO(fabxc): add remaining conditions: NodeOutOfDisk, NodeMemoryPressure, NodeDiskPressure, NodeNetworkUnavailable
for _, c := range n.Status.Conditions {
switch c.Type {
case api.NodeReady:
nodeStatusMetrics(ch, descNodeStatusReady, n.Name, c.Status)
}
}
// NOTE: the instrumentation API requires providing label values in order of declaration
// in the metric descriptor. Be careful when making modifications.
ch <- prometheus.MustNewConstMetric(
descNodeInfo, prometheus.GaugeValue, 1,
n.Name,
n.Status.NodeInfo.KernelVersion,
n.Status.NodeInfo.OSImage,
n.Status.NodeInfo.ContainerRuntimeVersion,
n.Status.NodeInfo.KubeletVersion,
n.Status.NodeInfo.KubeProxyVersion,
)
}
// nodeStatusMetrics generates one metric for each possible node condition status.
func nodeStatusMetrics(ch chan<- prometheus.Metric, desc *prometheus.Desc, name string, cs api.ConditionStatus) {
ch <- prometheus.MustNewConstMetric(
desc, prometheus.GaugeValue, boolFloat64(cs == api.ConditionTrue),
name, "true",
)
ch <- prometheus.MustNewConstMetric(
desc, prometheus.GaugeValue, boolFloat64(cs == api.ConditionFalse),
name, "false",
)
ch <- prometheus.MustNewConstMetric(
desc, prometheus.GaugeValue, boolFloat64(cs == api.ConditionUnknown),
name, "unknown",
)
}
func boolFloat64(b bool) float64 {
if b {
return 1
}
return 0
}

130
node_test.go Normal file
View File

@ -0,0 +1,130 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"testing"
"k8s.io/kubernetes/pkg/api"
)
type mockNodeStore struct {
list func() (api.NodeList, error)
}
func (ns mockNodeStore) List() (api.NodeList, error) {
return ns.list()
}
func TestNodeCollector(t *testing.T) {
// Fixed metadata on type and help text. We prepend this to every expected
// output so we only have to modify a single place when doing adjustments.
const metadata = `
# HELP node_info Information about a cluster node.
# TYPE node_info gauge
# HELP node_status_ready The ready status of a cluster node.
# TYPE node_status_ready gauge
`
cases := []struct {
nodes []api.Node
want string
}{
// Verify populating of node_info metric.
{
nodes: []api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "127.0.0.1",
},
Status: api.NodeStatus{
NodeInfo: api.NodeSystemInfo{
KernelVersion: "kernel",
KubeletVersion: "kubelet",
KubeProxyVersion: "kubeproxy",
OSImage: "osimage",
ContainerRuntimeVersion: "rkt",
},
},
},
},
want: metadata + `
node_info{container_runtime_version="rkt",kernel_version="kernel",kubelet_version="kubelet",kubeproxy_version="kubeproxy",node="127.0.0.1",os_image="osimage"} 1
`,
},
// Verify condition mappings to 1, 0, and NaN.
{
nodes: []api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "127.0.0.1",
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{Type: api.NodeReady, Status: api.ConditionTrue},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "127.0.0.2",
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{Type: api.NodeReady, Status: api.ConditionUnknown},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "127.0.0.3",
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{Type: api.NodeReady, Status: api.ConditionFalse},
},
},
},
},
want: metadata + `
node_status_ready{node="127.0.0.1",condition="true"} 1
node_status_ready{node="127.0.0.1",condition="false"} 0
node_status_ready{node="127.0.0.1",condition="unknown"} 0
node_status_ready{node="127.0.0.2",condition="true"} 0
node_status_ready{node="127.0.0.2",condition="false"} 0
node_status_ready{node="127.0.0.2",condition="unknown"} 1
node_status_ready{node="127.0.0.3",condition="true"} 0
node_status_ready{node="127.0.0.3",condition="false"} 1
node_status_ready{node="127.0.0.3",condition="unknown"} 0
node_info{container_runtime_version="",kernel_version="",kubelet_version="",kubeproxy_version="",node="127.0.0.1",os_image=""} 1
node_info{container_runtime_version="",kernel_version="",kubelet_version="",kubeproxy_version="",node="127.0.0.2",os_image=""} 1
node_info{container_runtime_version="",kernel_version="",kubelet_version="",kubeproxy_version="",node="127.0.0.3",os_image=""} 1
`,
},
}
for _, c := range cases {
dc := &nodeCollector{
store: &mockNodeStore{
list: func() (api.NodeList, error) {
return api.NodeList{Items: c.nodes}, nil
},
},
}
if err := gatherAndCompare(dc, c.want); err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
}
}