Implement top streaming for containers and pods

* Implement API query parameter stream and delay for containers and
  pods top endpoints
* Update swagger with breaking changes
* Add python API tests for endpoints

Fixes #12115

Signed-off-by: Jhon Honce <jhonce@redhat.com>
This commit is contained in:
Jhon Honce 2021-11-01 14:17:30 -07:00
parent 3147ff829b
commit 449cc7a5c2
6 changed files with 235 additions and 63 deletions

View File

@ -1,8 +1,11 @@
package compat package compat
import ( import (
"encoding/json"
"fmt"
"net/http" "net/http"
"strings" "strings"
"time"
"github.com/containers/podman/v3/libpod" "github.com/containers/podman/v3/libpod"
"github.com/containers/podman/v3/pkg/api/handlers" "github.com/containers/podman/v3/pkg/api/handlers"
@ -10,20 +13,24 @@ import (
api "github.com/containers/podman/v3/pkg/api/types" api "github.com/containers/podman/v3/pkg/api/types"
"github.com/gorilla/schema" "github.com/gorilla/schema"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus"
) )
func TopContainer(w http.ResponseWriter, r *http.Request) { func TopContainer(w http.ResponseWriter, r *http.Request) {
runtime := r.Context().Value(api.RuntimeKey).(*libpod.Runtime) runtime := r.Context().Value(api.RuntimeKey).(*libpod.Runtime)
decoder := r.Context().Value(api.DecoderKey).(*schema.Decoder) decoder := r.Context().Value(api.DecoderKey).(*schema.Decoder)
defaultValue := "-ef" psArgs := "-ef"
if utils.IsLibpodRequest(r) { if utils.IsLibpodRequest(r) {
defaultValue = "" psArgs = ""
} }
query := struct { query := struct {
Delay int `schema:"delay"`
PsArgs string `schema:"ps_args"` PsArgs string `schema:"ps_args"`
Stream bool `schema:"stream"`
}{ }{
PsArgs: defaultValue, Delay: 5,
PsArgs: psArgs,
} }
if err := decoder.Decode(&query, r.URL.Query()); err != nil { if err := decoder.Decode(&query, r.URL.Query()); err != nil {
utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest, utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest,
@ -31,6 +38,12 @@ func TopContainer(w http.ResponseWriter, r *http.Request) {
return return
} }
if query.Delay < 1 {
utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest,
fmt.Errorf("\"delay\" parameter of value %d < 1", query.Delay))
return
}
name := utils.GetName(r) name := utils.GetName(r)
c, err := runtime.LookupContainer(name) c, err := runtime.LookupContainer(name)
if err != nil { if err != nil {
@ -38,26 +51,56 @@ func TopContainer(w http.ResponseWriter, r *http.Request) {
return return
} }
output, err := c.Top([]string{query.PsArgs}) // We are committed now - all errors logged but not reported to client, ship has sailed
if err != nil { w.WriteHeader(http.StatusOK)
utils.InternalServerError(w, err) w.Header().Set("Content-Type", "application/json")
return if f, ok := w.(http.Flusher); ok {
f.Flush()
} }
var body = handlers.ContainerTopOKBody{} encoder := json.NewEncoder(w)
if len(output) > 0 {
body.Titles = strings.Split(output[0], "\t")
for i := range body.Titles {
body.Titles[i] = strings.TrimSpace(body.Titles[i])
}
for _, line := range output[1:] { loop: // break out of for/select infinite` loop
process := strings.Split(line, "\t") for {
for i := range process { select {
process[i] = strings.TrimSpace(process[i]) case <-r.Context().Done():
break loop
default:
output, err := c.Top([]string{query.PsArgs})
if err != nil {
logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err)
break loop
}
if len(output) > 0 {
body := handlers.ContainerTopOKBody{}
body.Titles = strings.Split(output[0], "\t")
for i := range body.Titles {
body.Titles[i] = strings.TrimSpace(body.Titles[i])
}
for _, line := range output[1:] {
process := strings.Split(line, "\t")
for i := range process {
process[i] = strings.TrimSpace(process[i])
}
body.Processes = append(body.Processes, process)
}
if err := encoder.Encode(body); err != nil {
logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err)
break loop
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
if query.Stream {
time.Sleep(time.Duration(query.Delay) * time.Second)
} else {
break loop
} }
body.Processes = append(body.Processes, process)
} }
} }
utils.WriteJSON(w, http.StatusOK, body)
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"strings" "strings"
"time"
"github.com/containers/common/pkg/config" "github.com/containers/common/pkg/config"
"github.com/containers/podman/v3/libpod" "github.com/containers/podman/v3/libpod"
@ -363,10 +364,17 @@ func PodTop(w http.ResponseWriter, r *http.Request) {
runtime := r.Context().Value(api.RuntimeKey).(*libpod.Runtime) runtime := r.Context().Value(api.RuntimeKey).(*libpod.Runtime)
decoder := r.Context().Value(api.DecoderKey).(*schema.Decoder) decoder := r.Context().Value(api.DecoderKey).(*schema.Decoder)
psArgs := "-ef"
if utils.IsLibpodRequest(r) {
psArgs = ""
}
query := struct { query := struct {
Delay int `schema:"delay"`
PsArgs string `schema:"ps_args"` PsArgs string `schema:"ps_args"`
Stream bool `schema:"stream"`
}{ }{
PsArgs: "", Delay: 5,
PsArgs: psArgs,
} }
if err := decoder.Decode(&query, r.URL.Query()); err != nil { if err := decoder.Decode(&query, r.URL.Query()); err != nil {
utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest, utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest,
@ -374,6 +382,12 @@ func PodTop(w http.ResponseWriter, r *http.Request) {
return return
} }
if query.Delay < 1 {
utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest,
fmt.Errorf("\"delay\" parameter of value %d < 1", query.Delay))
return
}
name := utils.GetName(r) name := utils.GetName(r)
pod, err := runtime.LookupPod(name) pod, err := runtime.LookupPod(name)
if err != nil { if err != nil {
@ -381,24 +395,58 @@ func PodTop(w http.ResponseWriter, r *http.Request) {
return return
} }
args := []string{} // We are committed now - all errors logged but not reported to client, ship has sailed
if query.PsArgs != "" { w.WriteHeader(http.StatusOK)
args = append(args, query.PsArgs) w.Header().Set("Content-Type", "application/json")
} if f, ok := w.(http.Flusher); ok {
output, err := pod.GetPodPidInformation(args) f.Flush()
if err != nil {
utils.InternalServerError(w, err)
return
} }
var body = handlers.PodTopOKBody{} encoder := json.NewEncoder(w)
if len(output) > 0 {
body.Titles = strings.Split(output[0], "\t") loop: // break out of for/select infinite` loop
for _, line := range output[1:] { for {
body.Processes = append(body.Processes, strings.Split(line, "\t")) select {
case <-r.Context().Done():
break loop
default:
output, err := pod.GetPodPidInformation([]string{query.PsArgs})
if err != nil {
logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err)
break loop
}
if len(output) > 0 {
var body = handlers.PodTopOKBody{}
body.Titles = strings.Split(output[0], "\t")
for i := range body.Titles {
body.Titles[i] = strings.TrimSpace(body.Titles[i])
}
for _, line := range output[1:] {
process := strings.Split(line, "\t")
for i := range process {
process[i] = strings.TrimSpace(process[i])
}
body.Processes = append(body.Processes, process)
}
if err := encoder.Encode(body); err != nil {
logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err)
break loop
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
if query.Stream {
time.Sleep(time.Duration(query.Delay) * time.Second)
} else {
break loop
}
} }
} }
utils.WriteJSON(w, http.StatusOK, body)
} }
func PodKill(w http.ResponseWriter, r *http.Request) { func PodKill(w http.ResponseWriter, r *http.Request) {

View File

@ -442,6 +442,7 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error {
// - in: query // - in: query
// name: ps_args // name: ps_args
// type: string // type: string
// default: -ef
// description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used. // description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
// produces: // produces:
// - application/json // - application/json
@ -1142,19 +1143,23 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error {
// name: name // name: name
// type: string // type: string
// required: true // required: true
// description: | // description: Name of container to query for processes (As of version 1.xx)
// Name of container to query for processes
// (As of version 1.xx)
// - in: query // - in: query
// name: stream // name: stream
// type: boolean // type: boolean
// default: true // description: when true, repeatedly stream the latest output (As of version 4.0)
// description: Stream the output // - in: query
// name: delay
// type: integer
// description: if streaming, delay in seconds between updates. Must be >1. (As of version 4.0)
// default: 5
// - in: query // - in: query
// name: ps_args // name: ps_args
// type: string // type: string
// default: -ef // default: -ef
// description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used. // description: |
// arguments to pass to ps such as aux.
// Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
// produces: // produces:
// - application/json // - application/json
// responses: // responses:

View File

@ -296,18 +296,23 @@ func (s *APIServer) registerPodsHandlers(r *mux.Router) error {
// name: name // name: name
// type: string // type: string
// required: true // required: true
// description: | // description: Name of pod to query for processes
// Name of pod to query for processes
// - in: query // - in: query
// name: stream // name: stream
// type: boolean // type: boolean
// default: true // description: when true, repeatedly stream the latest output (As of version 4.0)
// description: Stream the output // - in: query
// name: delay
// type: integer
// description: if streaming, delay in seconds between updates. Must be >1. (As of version 4.0)
// default: 5
// - in: query // - in: query
// name: ps_args // name: ps_args
// type: string // type: string
// default: -ef // default: -ef
// description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used. // description: |
// arguments to pass to ps such as aux.
// Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
// responses: // responses:
// 200: // 200:
// $ref: "#/responses/DocsPodTopResponse" // $ref: "#/responses/DocsPodTopResponse"

View File

@ -110,11 +110,11 @@ t GET libpod/pods/fakename/top 404 \
.cause="no such pod" .cause="no such pod"
t GET libpod/pods/foo/top 200 \ t GET libpod/pods/foo/top 200 \
.Processes[0][-1]="/pause " \ .Processes[0][-1]="/pause" \
.Titles[-1]="COMMAND" .Titles[-1]="COMMAND"
t GET libpod/pods/foo/top?ps_args=args,pid 200 \ t GET libpod/pods/foo/top?ps_args=args,pid 200 \
.Processes[0][0]="/pause " \ .Processes[0][0]="/pause" \
.Processes[0][1]="1" \ .Processes[0][1]="1" \
.Titles[0]="COMMAND" \ .Titles[0]="COMMAND" \
.Titles[1]="PID" \ .Titles[1]="PID" \

View File

@ -1,8 +1,11 @@
import multiprocessing
import queue
import random import random
import threading
import unittest import unittest
import json
import requests import requests
import time
from dateutil.parser import parse from dateutil.parser import parse
from .fixtures import APITestCase from .fixtures import APITestCase
@ -16,7 +19,10 @@ class ContainerTestCase(APITestCase):
self.assertEqual(len(obj), 1) self.assertEqual(len(obj), 1)
def test_list_filters(self): def test_list_filters(self):
r = requests.get(self.podman_url + "/v1.40/containers/json?filters%3D%7B%22status%22%3A%5B%22running%22%5D%7D") r = requests.get(
self.podman_url
+ "/v1.40/containers/json?filters%3D%7B%22status%22%3A%5B%22running%22%5D%7D"
)
self.assertEqual(r.status_code, 200, r.text) self.assertEqual(r.status_code, 200, r.text)
payload = r.json() payload = r.json()
containerAmnt = len(payload) containerAmnt = len(payload)
@ -33,18 +39,18 @@ class ContainerTestCase(APITestCase):
self.assertId(r.content) self.assertId(r.content)
_ = parse(r.json()["Created"]) _ = parse(r.json()["Created"])
r = requests.post( r = requests.post(
self.podman_url + "/v1.40/containers/create?name=topcontainer", self.podman_url + "/v1.40/containers/create?name=topcontainer",
json={"Cmd": ["top"], json={
"Image": "alpine:latest", "Cmd": ["top"],
"Healthcheck": { "Image": "alpine:latest",
"Test": ["CMD", "pidof", "top"], "Healthcheck": {
"Interval": 5000000000, "Test": ["CMD", "pidof", "top"],
"Timeout": 2000000000, "Interval": 5000000000,
"Retries": 3, "Timeout": 2000000000,
"StartPeriod": 5000000000 "Retries": 3,
} "StartPeriod": 5000000000,
},
}, },
) )
self.assertEqual(r.status_code, 201, r.text) self.assertEqual(r.status_code, 201, r.text)
@ -67,7 +73,7 @@ class ContainerTestCase(APITestCase):
self.assertEqual(r.status_code, 200, r.text) self.assertEqual(r.status_code, 200, r.text)
self.assertId(r.content) self.assertId(r.content)
out = r.json() out = r.json()
hc = out["Config"]["Healthcheck"]["Test"] hc = out["Config"]["Healthcheck"]["Test"]
self.assertListEqual(["CMD", "pidof", "top"], hc) self.assertListEqual(["CMD", "pidof", "top"], hc)
r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/start") r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/start")
@ -84,7 +90,9 @@ class ContainerTestCase(APITestCase):
self.assertIn(r.status_code, (200, 409), r.text) self.assertIn(r.status_code, (200, 409), r.text)
if r.status_code == 200: if r.status_code == 200:
self.assertId(r.content) self.assertId(r.content)
r = requests.get(self.uri(self.resolve_container("/containers/{}/stats?stream=false&one-shot=true"))) r = requests.get(
self.uri(self.resolve_container("/containers/{}/stats?stream=false&one-shot=true"))
)
self.assertIn(r.status_code, (200, 409), r.text) self.assertIn(r.status_code, (200, 409), r.text)
if r.status_code == 200: if r.status_code == 200:
self.assertId(r.content) self.assertId(r.content)
@ -136,9 +144,15 @@ class ContainerTestCase(APITestCase):
payload = r.json() payload = r.json()
container_id = payload["Id"] container_id = payload["Id"]
self.assertIsNotNone(container_id) self.assertIsNotNone(container_id)
r = requests.get(self.podman_url + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=0") r = requests.get(
self.podman_url
+ f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=0"
)
self.assertEqual(r.status_code, 200, r.text) self.assertEqual(r.status_code, 200, r.text)
r = requests.get(self.podman_url + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=1") r = requests.get(
self.podman_url
+ f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=1"
)
self.assertEqual(r.status_code, 200, r.text) self.assertEqual(r.status_code, 200, r.text)
def test_commit(self): def test_commit(self):
@ -257,6 +271,63 @@ class ContainerTestCase(APITestCase):
r = requests.delete(self.podman_url + f"/v1.40/containers/{container_id}") r = requests.delete(self.podman_url + f"/v1.40/containers/{container_id}")
self.assertEqual(r.status_code, 204, r.text) self.assertEqual(r.status_code, 204, r.text)
def test_top_no_stream(self):
uri = self.uri(self.resolve_container("/containers/{}/top"))
q = queue.Queue()
def _impl(fifo):
fifo.put(requests.get(uri, params={"stream": False}, timeout=2))
top = threading.Thread(target=_impl, args=(q,))
top.start()
time.sleep(2)
self.assertFalse(top.is_alive(), f"GET {uri} failed to return in 2s")
qr = q.get(False)
self.assertEqual(qr.status_code, 200, qr.text)
qr.close()
top.join()
def test_top_stream(self):
uri = self.uri(self.resolve_container("/containers/{}/top"))
q = queue.Queue()
stop_thread = False
def _impl(fifo, stop):
try:
with requests.get(uri, params={"stream": True, "delay": 1}, stream=True) as r:
r.raise_for_status()
fifo.put(r)
for buf in r.iter_lines(chunk_size=None):
if stop():
break
fifo.put(buf)
except Exception:
pass
top = threading.Thread(target=_impl, args=(q, (lambda: stop_thread)))
top.start()
time.sleep(4)
self.assertTrue(top.is_alive(), f"GET {uri} exited too soon")
stop_thread = True
for _ in range(10):
try:
qr = q.get_nowait()
if qr is not None:
self.assertEqual(qr.status_code, 200)
qr.close()
break
except queue.Empty:
pass
finally:
time.sleep(1)
else:
self.fail("Server failed to respond in 10s")
top.join()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()