elemental-operator/pkg/server/api_registration.go

427 lines
14 KiB
Go

/*
Copyright © 2022 - 2025 SUSE LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"path"
"time"
"github.com/gorilla/websocket"
"gopkg.in/yaml.v3"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
elementalv1 "github.com/rancher/elemental-operator/api/v1beta1"
"github.com/rancher/elemental-operator/pkg/hostinfo"
"github.com/rancher/elemental-operator/pkg/log"
"github.com/rancher/elemental-operator/pkg/network"
"github.com/rancher/elemental-operator/pkg/register"
"github.com/rancher/elemental-operator/pkg/templater"
)
type LegacyConfig struct {
Elemental elementalv1.Elemental `yaml:"elemental"`
CloudConfig map[string]interface{} `yaml:"cloud-config,omitempty"`
}
var errInventoryNotFound = errors.New("MachineInventory not found")
func (i *InventoryServer) apiRegistration(resp http.ResponseWriter, req *http.Request) error {
var err error
var registration *elementalv1.MachineRegistration
// get the machine registration relevant to this request
if registration, err = i.getMachineRegistration(path.Base(req.URL.Path)); err != nil {
http.Error(resp, err.Error(), http.StatusNotFound)
return err
}
if !websocket.IsWebSocketUpgrade(req) {
log.Debugf("got a plain HTTP request: send unauthenticated registration")
if err = i.unauthenticatedResponse(registration, resp); err != nil {
log.Errorf("error sending unauthenticated response: %s", err)
}
return err
}
// upgrade to websocket
conn, err := upgrade(resp, req)
if err != nil {
log.Errorf("failed to upgrade connection to websocket: %s", err)
return err
}
defer conn.Close()
log.Debugf("connection upgraded to websocket")
// attempt to authenticate the machine, if err, authentication has failed
inventory, err := i.authMachine(conn, req, registration.Namespace)
if err != nil {
log.Errorf("authentication failed: %s", err)
http.Error(resp, "authentication failure", http.StatusUnauthorized)
return err
}
// no error and no inventory: Auth header is missing or unrecognized
if inventory == nil {
if authHeader := req.Header.Get("Authorization"); authHeader != "" {
errMsg := "unrecognized authentication method"
log.Errorf("websocket connection: %s", errMsg)
http.Error(resp, errMsg, http.StatusUnauthorized)
return err
}
log.Warning("unauthenticated websocket connection")
if err = i.unauthenticatedResponse(registration, resp); err != nil {
log.Errorf("error sending unauthenticated response: %s", err)
return err
}
return nil
}
log.Debugf("authentication completed")
if err = register.WriteMessage(conn, register.MsgReady, []byte{}); err != nil {
log.Errorf("cannot finalize the authentication process: %s", err)
return err
}
if isNewInventory(inventory) {
initInventory(inventory, registration)
}
if err = i.serveLoop(conn, inventory, registration); err != nil {
log.Errorf("Error during serve-loop: %s", err)
return err
}
return nil
}
func (i *InventoryServer) unauthenticatedResponse(registration *elementalv1.MachineRegistration, writer io.Writer) error {
config, err := registration.GetClientRegistrationConfig(i.getRancherCACert())
if err != nil {
return err
}
return yaml.NewEncoder(writer).
Encode(config)
}
func (i *InventoryServer) writeMachineInventoryCloudConfig(conn *websocket.Conn, protoVersion register.MessageType, inventory *elementalv1.MachineInventory, registration *elementalv1.MachineRegistration, netConf elementalv1.NetworkConfig) error {
sa := &corev1.ServiceAccount{}
if err := i.Get(i, types.NamespacedName{
Name: registration.Status.ServiceAccountRef.Name,
Namespace: registration.Status.ServiceAccountRef.Namespace,
}, sa); err != nil {
return fmt.Errorf("failed to get service account: %w", err)
}
secret := &corev1.Secret{}
err := i.Get(i, types.NamespacedName{
Name: sa.Name + elementalv1.SASecretSuffix,
Namespace: sa.Namespace,
}, secret)
if err != nil || secret.Type != corev1.SecretTypeServiceAccountToken {
return fmt.Errorf("failed to get secret: %w", err)
}
serverURL, err := i.getValue("server-url")
if err != nil {
return fmt.Errorf("failed to get server-url: %w", err)
}
if serverURL == "" {
return fmt.Errorf("server-url is not set")
}
config, err := registration.GetClientRegistrationConfig(i.getRancherCACert())
if err != nil {
return err
}
config.Elemental.SystemAgent = elementalv1.SystemAgent{
StrictTLSMode: i.isAgentTLSModeStrict(),
URL: fmt.Sprintf("%s/k8s/clusters/local", serverURL),
Token: string(secret.Data["token"]),
SecretName: inventory.Name,
SecretNamespace: inventory.Namespace,
}
config.Elemental.Install = registration.Spec.Config.Elemental.Install
config.Elemental.Reset = registration.Spec.Config.Elemental.Reset
config.CloudConfig = registration.Spec.Config.CloudConfig
// If client does not support MsgConfig we send back the config as a
// byte-stream without a message-type to keep backwards-compatibility.
if protoVersion < register.MsgConfig {
log.Debugf("Client does not support MsgConfig, sending back raw config.")
writer, err := conn.NextWriter(websocket.BinaryMessage)
if err != nil {
return err
}
defer writer.Close()
return yaml.NewEncoder(writer).Encode(config)
}
log.Debugf("Client supports MsgConfig, sending back config.")
// If the client supports the MsgConfig-message we use that.
data, err := yaml.Marshal(config)
if err != nil {
log.Errorf("error marshalling config: %v", err)
return err
}
if netConf.Configurator != network.ConfiguratorNone && netConf.Configurator != "" {
netData, err := yaml.Marshal(netConf)
if err != nil {
log.Errorf("error marshalling network config: %v", err)
return err
}
data = append(data, netData...)
}
return register.WriteMessage(conn, register.MsgConfig, data)
}
func (i *InventoryServer) getRancherCACert() string {
cacert, err := i.getValue("cacerts")
if err != nil {
log.Errorf("Error getting cacerts: %s", err.Error())
}
if cacert == "" {
if cacert, err = i.getValue("internal-cacerts"); err != nil {
log.Errorf("Error getting internal-cacerts: %s", err.Error())
return ""
}
}
return cacert
}
// Support for agent-tls-mode
func (i *InventoryServer) isAgentTLSModeStrict() bool {
agentTLSMode, err := i.getValue("agent-tls-mode")
if err != nil {
log.Errorf("Error getting agent-tls-mode: %s", err.Error())
}
switch agentTLSMode {
case "strict":
return true
case "system-store":
return false
default:
// Historically the default has been strict TLS verification
return true
}
}
func (i *InventoryServer) serveLoop(conn *websocket.Conn, inventory *elementalv1.MachineInventory, registration *elementalv1.MachineRegistration) error { //nolint: gocyclo
protoVersion := register.MsgUndefined
tmpl := templater.NewTemplater()
for {
var data []byte
msgType, data, err := register.ReadMessage(conn)
if err != nil {
return fmt.Errorf("websocket communication interrupted: %w", err)
}
replyMsgType := register.MsgReady
replyData := []byte{}
switch msgType {
case register.MsgVersion:
protoVersion, err = decodeProtocolVersion(data)
if err != nil {
return fmt.Errorf("failed to negotiate protocol version: %w", err)
}
log.Infof("Negotiated protocol version: %d", protoVersion)
replyMsgType = register.MsgVersion
replyData = []byte{byte(protoVersion)}
case register.MsgSmbios:
smbiosData := map[string]interface{}{}
if err := json.Unmarshal(data, &smbiosData); err != nil {
return fmt.Errorf("failed to extract labels from SMBIOS data: %w", err)
}
tmpl.Fill(smbiosData)
case register.MsgLabels:
if err := mergeInventoryLabels(inventory, data); err != nil {
return err
}
case register.MsgAnnotations:
err = mergeInventoryAnnotations(data, inventory)
if err != nil {
return fmt.Errorf("failed to decode dynamic data: %w", err)
}
case register.MsgGet:
// Final call: here we commit the MachineInventory, send the Elemental config data
// and close the connection.
if err := updateInventoryWithTemplates(tmpl, inventory, registration); err != nil {
return err
}
return i.handleGet(conn, protoVersion, inventory, registration)
case register.MsgUpdate:
err = i.handleUpdate(conn, protoVersion, inventory)
if err != nil {
return fmt.Errorf("failed to negotiate registration update: %w", err)
}
case register.MsgSystemData:
systemData, err := hostinfo.FillData(data)
if err != nil {
return fmt.Errorf("failed to parse system data: %w", err)
}
tmpl.Fill(systemData)
case register.MsgSystemDataV2:
systemData := map[string]interface{}{}
if err := json.Unmarshal(data, &systemData); err != nil {
return fmt.Errorf("failed to parse system data: %w", err)
}
tmpl.Fill(systemData)
default:
return fmt.Errorf("got unexpected message: %s", msgType)
}
if err := register.WriteMessage(conn, replyMsgType, replyData); err != nil {
return fmt.Errorf("cannot complete %s exchange", msgType)
}
}
}
func (i *InventoryServer) handleUpdate(conn *websocket.Conn, protoVersion register.MessageType, inventory *elementalv1.MachineInventory) error {
if isNewInventory(inventory) {
log.Errorf("MachineInventory '%+v' was not found, but the machine is still running. Reprovisioning is needed.\n", inventory)
if writeErr := writeError(conn, protoVersion, register.NewErrorMessage(errInventoryNotFound)); writeErr != nil {
log.Errorf("Error reporting back error to client: %v\n", writeErr)
}
return errInventoryNotFound
}
return nil
}
func (i *InventoryServer) handleGetNetworkConfig(inventory *elementalv1.MachineInventory) (elementalv1.NetworkConfig, error) {
ctx, cancel := context.WithTimeout(context.TODO(), register.RegistrationDeadlineSeconds*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return elementalv1.NetworkConfig{}, fmt.Errorf("NewtworkConfig not ready")
default:
time.Sleep(time.Second)
}
if err := i.Get(ctx, client.ObjectKeyFromObject(inventory), inventory); err != nil {
return elementalv1.NetworkConfig{}, fmt.Errorf("getting machine inventory: %w", err)
}
conditionFound := false
for _, condition := range inventory.Status.Conditions {
if condition.Type == elementalv1.NetworkConfigReady {
conditionFound = true
if condition.Status == metav1.ConditionFalse {
log.Debug("NetworkConfigReady is false")
continue
}
}
}
if !conditionFound {
log.Debug("NetworkConfigReady condition not found")
continue
}
break
}
return inventory.Spec.Network, nil
}
func (i *InventoryServer) handleGet(conn *websocket.Conn, protoVersion register.MessageType, inventory *elementalv1.MachineInventory, registration *elementalv1.MachineRegistration) error {
var err error
var netConf elementalv1.NetworkConfig
inventory, err = i.commitMachineInventory(inventory)
if err != nil {
if writeErr := writeError(conn, protoVersion, register.NewErrorMessage(err)); writeErr != nil {
log.Errorf("Error reporting back error to client: %v", writeErr)
}
return err
}
if protoVersion >= register.MsgNetworkConfig {
netConf, err = i.handleGetNetworkConfig(inventory)
if err != nil {
if writeErr := writeError(conn, protoVersion, register.NewErrorMessage(err)); writeErr != nil {
log.Errorf("Error reporting back error to client: %v", writeErr)
}
return err
}
}
if err := i.writeMachineInventoryCloudConfig(conn, protoVersion, inventory, registration, netConf); err != nil {
if writeErr := writeError(conn, protoVersion, register.NewErrorMessage(err)); writeErr != nil {
log.Errorf("Error reporting back error to client: %v", writeErr)
}
return fmt.Errorf("failed sending elemental cloud config: %w", err)
}
log.Debugf("Elemental cloud config sent")
return nil
}
// writeError reports back an error to the client if the negotiated protocol
// version supports it.
func writeError(conn *websocket.Conn, protoVersion register.MessageType, errorMessage register.ErrorMessage) error {
if protoVersion < register.MsgError {
log.Debugf("client does not support reporting errors, skipping")
return nil
}
data, err := yaml.Marshal(errorMessage)
if err != nil {
return fmt.Errorf("error marshalling error-message: %w", err)
}
return register.WriteMessage(conn, register.MsgError, data)
}
func decodeProtocolVersion(data []byte) (register.MessageType, error) {
protoVersion := register.MsgUndefined
if len(data) != 1 {
return protoVersion, fmt.Errorf("unknown format: %v (%s)", data, data)
}
clientVersion := register.MessageType(data[0])
protoVersion = clientVersion
if clientVersion != register.MsgLast {
log.Debugf("elemental-register (%d) and elemental-operator (%d) protocol versions differ", clientVersion, register.MsgLast)
if clientVersion <= register.MsgGet {
return protoVersion, fmt.Errorf("elemental-register protocol version is deprecated (elemental-register client too old, protocol version = %d)", clientVersion)
}
if clientVersion > register.MsgLast {
protoVersion = register.MsgLast
}
}
return protoVersion, nil
}