linkerd2/web/srv/api_handlers.go

525 lines
15 KiB
Go

package srv
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"regexp"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/julienschmidt/httprouter"
"github.com/linkerd/linkerd2/pkg/healthcheck"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/protohttp"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
vizUtil "github.com/linkerd/linkerd2/viz/metrics-api/util"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
tappkg "github.com/linkerd/linkerd2/viz/tap/pkg"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
)
// Control Frame payload size can be no bigger than 125 bytes. 2 bytes are
// reserved for the status code when formatting the message.
const maxControlFrameMsgSize = 123
type (
jsonError struct {
Error string `json:"error"`
}
)
var (
defaultResourceType = k8s.Deployment
pbMarshaler = protojson.MarshalOptions{EmitUnpopulated: true}
maxMessageSize = 2048
websocketUpgrader = websocket.Upgrader{
ReadBufferSize: maxMessageSize,
WriteBufferSize: maxMessageSize,
// Only allows requests from the same host, to prevent CSRF attacks.
// This is the default behavior in gorilla/websocket even if the
// CheckOrigin field is not set, but we make it explicit here as a good
// practice, avoiding relying on default behavior.
CheckOrigin: checkSameOrigin,
}
// Checks whose description matches the following regexp won't be included
// in the handleApiCheck output. In the context of the dashboard, some
// checks like cli or kubectl versions ones may not be relevant.
//
// TODO(tegioz): use more reliable way to identify the checks that should
// not be displayed in the dashboard (hint anchor is not unique).
excludedChecksRE = regexp.MustCompile(`(?i)cli|(?i)kubectl`)
)
func renderJSONError(w http.ResponseWriter, err error, status int) {
w.Header().Set("Content-Type", "application/json")
log.Error(err.Error())
rsp, _ := json.Marshal(jsonError{Error: err.Error()})
w.WriteHeader(status)
w.Write(rsp)
}
func renderJSON(w http.ResponseWriter, resp interface{}) {
w.Header().Set("Content-Type", "application/json")
jsonResp, err := json.Marshal(resp)
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
w.Write(jsonResp)
}
func renderJSONPb(w http.ResponseWriter, msg proto.Message) {
w.Header().Set("Content-Type", "application/json")
json, err := pbMarshaler.Marshal(msg)
if err != nil {
renderJSONError(w, err, http.StatusBadRequest)
}
w.Write(json)
}
func renderJSONBytes(w http.ResponseWriter, b []byte) {
w.Header().Set("Content-Type", "application/json")
w.Write(b)
}
func (h *handler) handleAPIVersion(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
resp := map[string]interface{}{
"version": h.version,
}
renderJSON(w, resp)
}
func (h *handler) handleAPIPods(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
pods, err := h.apiClient.ListPods(req.Context(), &metricsPb.ListPodsRequest{
Selector: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: req.FormValue("namespace"),
},
},
})
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
renderJSONPb(w, pods)
}
func (h *handler) handleAPIServices(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
services, err := h.apiClient.ListServices(req.Context(), &metricsPb.ListServicesRequest{
Namespace: req.FormValue("namespace"),
})
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
renderJSONPb(w, services)
}
func (h *handler) handleAPIStat(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
// Try to get stat summary from cache using the query as key
cachedResultJSON, ok := h.statCache.Get(req.URL.RawQuery)
if ok {
// Cache hit, render cached json result
renderJSONBytes(w, cachedResultJSON.([]byte))
return
}
trueStr := fmt.Sprintf("%t", true)
requestParams := vizUtil.StatsSummaryRequestParams{
StatsBaseRequestParams: vizUtil.StatsBaseRequestParams{
TimeWindow: req.FormValue("window"),
ResourceName: req.FormValue("resource_name"),
ResourceType: req.FormValue("resource_type"),
Namespace: req.FormValue("namespace"),
AllNamespaces: req.FormValue("all_namespaces") == trueStr,
},
ToName: req.FormValue("to_name"),
ToType: req.FormValue("to_type"),
ToNamespace: req.FormValue("to_namespace"),
FromName: req.FormValue("from_name"),
FromType: req.FormValue("from_type"),
FromNamespace: req.FormValue("from_namespace"),
SkipStats: req.FormValue("skip_stats") == trueStr,
TCPStats: req.FormValue("tcp_stats") == trueStr,
}
// default to returning deployment stats
if requestParams.ResourceType == "" {
requestParams.ResourceType = defaultResourceType
}
statRequest, err := vizUtil.BuildStatSummaryRequest(requestParams)
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
result, err := h.apiClient.StatSummary(req.Context(), statRequest)
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
// Marshal result into json and cache it
json, err := pbMarshaler.Marshal(result)
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
var resultJSON bytes.Buffer
resultJSON.Write(json)
h.statCache.SetDefault(req.URL.RawQuery, resultJSON.Bytes())
renderJSONBytes(w, resultJSON.Bytes())
}
func (h *handler) handleAPITopRoutes(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
requestParams := vizUtil.TopRoutesRequestParams{
StatsBaseRequestParams: vizUtil.StatsBaseRequestParams{
TimeWindow: req.FormValue("window"),
ResourceName: req.FormValue("resource_name"),
ResourceType: req.FormValue("resource_type"),
Namespace: req.FormValue("namespace"),
},
ToName: req.FormValue("to_name"),
ToType: req.FormValue("to_type"),
ToNamespace: req.FormValue("to_namespace"),
}
topReq, err := vizUtil.BuildTopRoutesRequest(requestParams)
if err != nil {
renderJSONError(w, err, http.StatusBadRequest)
return
}
result, err := h.apiClient.TopRoutes(req.Context(), topReq)
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
renderJSONPb(w, result)
}
// Control frame payload size must be no longer than `maxControlFrameMsgSize`
// bytes. In the case of an unexpected HTTP status code or unexpected error,
// truncate the message after `maxControlFrameMsgSize` bytes so that the web
// socket message is properly written.
func validateControlFrameMsg(err error) string {
log.Debugf("tap error: %s", err.Error())
msg := err.Error()
if len(msg) > maxControlFrameMsgSize {
return msg[:maxControlFrameMsgSize]
}
return msg
}
func websocketError(ws *websocket.Conn, wsError int, err error) {
msg := validateControlFrameMsg(err)
err = ws.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(wsError, msg),
time.Time{})
if err != nil {
log.Errorf("Unexpected websocket error: %s", err)
}
}
func (h *handler) handleAPITap(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
ws, err := websocketUpgrader.Upgrade(w, req, nil)
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
defer ws.Close()
messageType, message, err := ws.ReadMessage()
if err != nil {
websocketError(ws, websocket.CloseInternalServerErr, err)
return
}
if messageType != websocket.TextMessage {
websocketError(ws, websocket.CloseUnsupportedData, errors.New("messageType not supported"))
return
}
var requestParams tappkg.TapRequestParams
err = json.Unmarshal(message, &requestParams)
if err != nil {
websocketError(ws, websocket.CloseInternalServerErr, err)
return
}
tapReq, err := tappkg.BuildTapByResourceRequest(requestParams)
if err != nil {
websocketError(ws, websocket.CloseInternalServerErr, err)
return
}
go func() {
reader, body, err := tappkg.Reader(req.Context(), h.k8sAPI, tapReq)
if err != nil {
// If there was a [403] error when initiating a tap, close the
// socket with `ClosePolicyViolation` status code so that the error
// renders without the error prefix in the banner
var he protohttp.HTTPError
if errors.Is(err, &he) && he.Code == http.StatusForbidden {
err := fmt.Errorf("missing authorization, visit %s to remedy", tappkg.TapRbacURL)
websocketError(ws, websocket.ClosePolicyViolation, err)
return
}
// All other errors from initiating a tap should close with
// `CloseInternalServerErr` status code
websocketError(ws, websocket.CloseInternalServerErr, err)
return
}
defer body.Close()
for {
event := tapPb.TapEvent{}
err := protohttp.FromByteStreamToProtocolBuffers(reader, &event)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
websocketError(ws, websocket.CloseInternalServerErr, err)
break
}
json, err := pbMarshaler.Marshal(&event)
if err != nil {
websocketError(ws, websocket.CloseUnsupportedData, err)
break
}
buf := new(bytes.Buffer)
buf.Write(json)
if err := ws.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
log.Error(err)
}
break
}
}
}()
for {
_, _, err := ws.ReadMessage()
if err != nil {
log.Debugf("Received close frame: %v", err)
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
log.Errorf("Unexpected close error: %s", err)
}
return
}
}
}
func (h *handler) handleAPIEdges(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
requestParams := vizUtil.EdgesRequestParams{
Namespace: req.FormValue("namespace"),
ResourceType: req.FormValue("resource_type"),
}
edgesRequest, err := vizUtil.BuildEdgesRequest(requestParams)
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
result, err := h.apiClient.Edges(req.Context(), edgesRequest)
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
renderJSONPb(w, result)
}
func (h *handler) handleAPICheck(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
type CheckResult struct {
*healthcheck.CheckResult
ErrMsg string `json:",omitempty"`
HintURL string `json:",omitempty"`
}
success := true
results := make(map[healthcheck.CategoryID][]*CheckResult)
collectResults := func(result *healthcheck.CheckResult) {
if result.Retry || excludedChecksRE.MatchString(result.Description) {
return
}
var errMsg, hintURL string
if result.Err != nil {
if !result.Warning {
success = false
}
errMsg = result.Err.Error()
hintURL = result.HintURL
}
results[result.Category] = append(results[result.Category], &CheckResult{
CheckResult: result,
ErrMsg: errMsg,
HintURL: hintURL,
})
}
// TODO (tegioz): ignore runchecks results until we stop filtering checks
// in this method (see #3670 for more details)
_, _ = h.hc.RunChecks(collectResults)
renderJSON(w, map[string]interface{}{
"success": success,
"results": results,
})
}
func (h *handler) handleAPIResourceDefinition(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
var missingParams []string
requiredParams := []string{"namespace", "resource_type", "resource_name"}
for _, param := range requiredParams {
if req.FormValue(param) == "" {
missingParams = append(missingParams, param)
}
}
if len(missingParams) != 0 {
renderJSONError(w, fmt.Errorf("required params not provided: %s", strings.Join(missingParams, ", ")), http.StatusBadRequest)
return
}
namespace := req.FormValue("namespace")
resourceType := req.FormValue("resource_type")
resourceName := req.FormValue("resource_name")
var resource interface{}
var err error
options := metav1.GetOptions{}
switch resourceType {
case k8s.CronJob:
resource, err = h.k8sAPI.BatchV1beta1().CronJobs(namespace).Get(req.Context(), resourceName, options)
case k8s.DaemonSet:
resource, err = h.k8sAPI.AppsV1().DaemonSets(namespace).Get(req.Context(), resourceName, options)
case k8s.Deployment:
resource, err = h.k8sAPI.AppsV1().Deployments(namespace).Get(req.Context(), resourceName, options)
case k8s.Service:
resource, err = h.k8sAPI.CoreV1().Services(namespace).Get(req.Context(), resourceName, options)
case k8s.Job:
resource, err = h.k8sAPI.BatchV1().Jobs(namespace).Get(req.Context(), resourceName, options)
case k8s.Pod:
resource, err = h.k8sAPI.CoreV1().Pods(namespace).Get(req.Context(), resourceName, options)
case k8s.ReplicationController:
resource, err = h.k8sAPI.CoreV1().ReplicationControllers(namespace).Get(req.Context(), resourceName, options)
case k8s.ReplicaSet:
resource, err = h.k8sAPI.AppsV1().ReplicaSets(namespace).Get(req.Context(), resourceName, options)
default:
renderJSONError(w, errors.New("Invalid resource type: "+resourceType), http.StatusBadRequest)
return
}
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
resourceDefinition, err := yaml.Marshal(resource)
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/yaml")
w.Write(resourceDefinition)
}
func (h *handler) handleGetExtensions(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
ctx := req.Context()
extensionName := req.FormValue("extension_name")
type Extension struct {
Name string `json:"name"`
UID string `json:"uid"`
Namespace string `json:"namespace"`
}
resp := map[string]interface{}{}
if extensionName != "" {
ns, err := h.k8sAPI.GetNamespaceWithExtensionLabel(ctx, extensionName)
if err != nil && kerrors.IsNotFound(err) {
renderJSON(w, resp)
return
} else if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
resp["data"] = Extension{
UID: string(ns.UID),
Name: ns.GetLabels()[k8s.LinkerdExtensionLabel],
Namespace: ns.Name,
}
renderJSON(w, resp)
return
}
installedExtensions, err := h.k8sAPI.GetAllNamespacesWithExtensionLabel(ctx)
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
extensionList := make([]Extension, len(installedExtensions))
for i, installedExtension := range installedExtensions {
extensionList[i] = Extension{
UID: string(installedExtension.GetObjectMeta().GetUID()),
Name: installedExtension.GetLabels()[k8s.LinkerdExtensionLabel],
Namespace: installedExtension.GetName(),
}
}
resp["extensions"] = extensionList
renderJSON(w, resp)
}
func (h *handler) handleAPIGateways(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
window := req.FormValue("window")
if window == "" {
window = "1m"
}
_, err := time.ParseDuration(window)
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
gatewayRequest := &metricsPb.GatewaysRequest{
TimeWindow: window,
GatewayNamespace: req.FormValue("gatewayNamespace"),
RemoteClusterName: req.FormValue("remoteClusterName"),
}
result, err := h.apiClient.Gateways(req.Context(), gatewayRequest)
if err != nil {
renderJSONError(w, err, http.StatusInternalServerError)
return
}
renderJSONPb(w, result)
}