427 lines
14 KiB
Go
427 lines
14 KiB
Go
/*
|
|
Copyright © 2022 - 2025 SUSE LLC
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package server
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"path"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"gopkg.in/yaml.v3"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
|
|
elementalv1 "github.com/rancher/elemental-operator/api/v1beta1"
|
|
"github.com/rancher/elemental-operator/pkg/hostinfo"
|
|
"github.com/rancher/elemental-operator/pkg/log"
|
|
"github.com/rancher/elemental-operator/pkg/network"
|
|
"github.com/rancher/elemental-operator/pkg/register"
|
|
"github.com/rancher/elemental-operator/pkg/templater"
|
|
)
|
|
|
|
type LegacyConfig struct {
|
|
Elemental elementalv1.Elemental `yaml:"elemental"`
|
|
CloudConfig map[string]interface{} `yaml:"cloud-config,omitempty"`
|
|
}
|
|
|
|
var errInventoryNotFound = errors.New("MachineInventory not found")
|
|
|
|
func (i *InventoryServer) apiRegistration(resp http.ResponseWriter, req *http.Request) error {
|
|
var err error
|
|
var registration *elementalv1.MachineRegistration
|
|
|
|
// get the machine registration relevant to this request
|
|
if registration, err = i.getMachineRegistration(path.Base(req.URL.Path)); err != nil {
|
|
http.Error(resp, err.Error(), http.StatusNotFound)
|
|
return err
|
|
}
|
|
|
|
if !websocket.IsWebSocketUpgrade(req) {
|
|
log.Debugf("got a plain HTTP request: send unauthenticated registration")
|
|
if err = i.unauthenticatedResponse(registration, resp); err != nil {
|
|
log.Errorf("error sending unauthenticated response: %s", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// upgrade to websocket
|
|
conn, err := upgrade(resp, req)
|
|
if err != nil {
|
|
log.Errorf("failed to upgrade connection to websocket: %s", err)
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
log.Debugf("connection upgraded to websocket")
|
|
|
|
// attempt to authenticate the machine, if err, authentication has failed
|
|
inventory, err := i.authMachine(conn, req, registration.Namespace)
|
|
if err != nil {
|
|
log.Errorf("authentication failed: %s", err)
|
|
http.Error(resp, "authentication failure", http.StatusUnauthorized)
|
|
return err
|
|
}
|
|
// no error and no inventory: Auth header is missing or unrecognized
|
|
if inventory == nil {
|
|
if authHeader := req.Header.Get("Authorization"); authHeader != "" {
|
|
errMsg := "unrecognized authentication method"
|
|
log.Errorf("websocket connection: %s", errMsg)
|
|
http.Error(resp, errMsg, http.StatusUnauthorized)
|
|
return err
|
|
}
|
|
|
|
log.Warning("unauthenticated websocket connection")
|
|
if err = i.unauthenticatedResponse(registration, resp); err != nil {
|
|
log.Errorf("error sending unauthenticated response: %s", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
log.Debugf("authentication completed")
|
|
|
|
if err = register.WriteMessage(conn, register.MsgReady, []byte{}); err != nil {
|
|
log.Errorf("cannot finalize the authentication process: %s", err)
|
|
return err
|
|
}
|
|
|
|
if isNewInventory(inventory) {
|
|
initInventory(inventory, registration)
|
|
}
|
|
|
|
if err = i.serveLoop(conn, inventory, registration); err != nil {
|
|
log.Errorf("Error during serve-loop: %s", err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (i *InventoryServer) unauthenticatedResponse(registration *elementalv1.MachineRegistration, writer io.Writer) error {
|
|
config, err := registration.GetClientRegistrationConfig(i.getRancherCACert())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return yaml.NewEncoder(writer).
|
|
Encode(config)
|
|
}
|
|
|
|
func (i *InventoryServer) writeMachineInventoryCloudConfig(conn *websocket.Conn, protoVersion register.MessageType, inventory *elementalv1.MachineInventory, registration *elementalv1.MachineRegistration, netConf elementalv1.NetworkConfig) error {
|
|
sa := &corev1.ServiceAccount{}
|
|
|
|
if err := i.Get(i, types.NamespacedName{
|
|
Name: registration.Status.ServiceAccountRef.Name,
|
|
Namespace: registration.Status.ServiceAccountRef.Namespace,
|
|
}, sa); err != nil {
|
|
return fmt.Errorf("failed to get service account: %w", err)
|
|
}
|
|
|
|
secret := &corev1.Secret{}
|
|
err := i.Get(i, types.NamespacedName{
|
|
Name: sa.Name + elementalv1.SASecretSuffix,
|
|
Namespace: sa.Namespace,
|
|
}, secret)
|
|
|
|
if err != nil || secret.Type != corev1.SecretTypeServiceAccountToken {
|
|
return fmt.Errorf("failed to get secret: %w", err)
|
|
}
|
|
|
|
serverURL, err := i.getValue("server-url")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get server-url: %w", err)
|
|
}
|
|
if serverURL == "" {
|
|
return fmt.Errorf("server-url is not set")
|
|
}
|
|
|
|
config, err := registration.GetClientRegistrationConfig(i.getRancherCACert())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
config.Elemental.SystemAgent = elementalv1.SystemAgent{
|
|
StrictTLSMode: i.isAgentTLSModeStrict(),
|
|
URL: fmt.Sprintf("%s/k8s/clusters/local", serverURL),
|
|
Token: string(secret.Data["token"]),
|
|
SecretName: inventory.Name,
|
|
SecretNamespace: inventory.Namespace,
|
|
}
|
|
config.Elemental.Install = registration.Spec.Config.Elemental.Install
|
|
config.Elemental.Reset = registration.Spec.Config.Elemental.Reset
|
|
config.CloudConfig = registration.Spec.Config.CloudConfig
|
|
|
|
// If client does not support MsgConfig we send back the config as a
|
|
// byte-stream without a message-type to keep backwards-compatibility.
|
|
if protoVersion < register.MsgConfig {
|
|
log.Debugf("Client does not support MsgConfig, sending back raw config.")
|
|
|
|
writer, err := conn.NextWriter(websocket.BinaryMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer writer.Close()
|
|
|
|
return yaml.NewEncoder(writer).Encode(config)
|
|
}
|
|
|
|
log.Debugf("Client supports MsgConfig, sending back config.")
|
|
|
|
// If the client supports the MsgConfig-message we use that.
|
|
data, err := yaml.Marshal(config)
|
|
if err != nil {
|
|
log.Errorf("error marshalling config: %v", err)
|
|
return err
|
|
}
|
|
|
|
if netConf.Configurator != network.ConfiguratorNone && netConf.Configurator != "" {
|
|
netData, err := yaml.Marshal(netConf)
|
|
if err != nil {
|
|
log.Errorf("error marshalling network config: %v", err)
|
|
return err
|
|
}
|
|
data = append(data, netData...)
|
|
}
|
|
|
|
return register.WriteMessage(conn, register.MsgConfig, data)
|
|
}
|
|
|
|
func (i *InventoryServer) getRancherCACert() string {
|
|
cacert, err := i.getValue("cacerts")
|
|
if err != nil {
|
|
log.Errorf("Error getting cacerts: %s", err.Error())
|
|
}
|
|
|
|
if cacert == "" {
|
|
if cacert, err = i.getValue("internal-cacerts"); err != nil {
|
|
log.Errorf("Error getting internal-cacerts: %s", err.Error())
|
|
return ""
|
|
}
|
|
}
|
|
return cacert
|
|
}
|
|
|
|
// Support for agent-tls-mode
|
|
func (i *InventoryServer) isAgentTLSModeStrict() bool {
|
|
agentTLSMode, err := i.getValue("agent-tls-mode")
|
|
if err != nil {
|
|
log.Errorf("Error getting agent-tls-mode: %s", err.Error())
|
|
}
|
|
switch agentTLSMode {
|
|
case "strict":
|
|
return true
|
|
case "system-store":
|
|
return false
|
|
default:
|
|
// Historically the default has been strict TLS verification
|
|
return true
|
|
}
|
|
}
|
|
|
|
func (i *InventoryServer) serveLoop(conn *websocket.Conn, inventory *elementalv1.MachineInventory, registration *elementalv1.MachineRegistration) error { //nolint: gocyclo
|
|
protoVersion := register.MsgUndefined
|
|
tmpl := templater.NewTemplater()
|
|
|
|
for {
|
|
var data []byte
|
|
|
|
msgType, data, err := register.ReadMessage(conn)
|
|
if err != nil {
|
|
return fmt.Errorf("websocket communication interrupted: %w", err)
|
|
}
|
|
replyMsgType := register.MsgReady
|
|
replyData := []byte{}
|
|
|
|
switch msgType {
|
|
case register.MsgVersion:
|
|
protoVersion, err = decodeProtocolVersion(data)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to negotiate protocol version: %w", err)
|
|
}
|
|
log.Infof("Negotiated protocol version: %d", protoVersion)
|
|
replyMsgType = register.MsgVersion
|
|
replyData = []byte{byte(protoVersion)}
|
|
case register.MsgSmbios:
|
|
smbiosData := map[string]interface{}{}
|
|
if err := json.Unmarshal(data, &smbiosData); err != nil {
|
|
return fmt.Errorf("failed to extract labels from SMBIOS data: %w", err)
|
|
}
|
|
tmpl.Fill(smbiosData)
|
|
case register.MsgLabels:
|
|
if err := mergeInventoryLabels(inventory, data); err != nil {
|
|
return err
|
|
}
|
|
case register.MsgAnnotations:
|
|
err = mergeInventoryAnnotations(data, inventory)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to decode dynamic data: %w", err)
|
|
}
|
|
case register.MsgGet:
|
|
// Final call: here we commit the MachineInventory, send the Elemental config data
|
|
// and close the connection.
|
|
if err := updateInventoryWithTemplates(tmpl, inventory, registration); err != nil {
|
|
return err
|
|
}
|
|
return i.handleGet(conn, protoVersion, inventory, registration)
|
|
case register.MsgUpdate:
|
|
err = i.handleUpdate(conn, protoVersion, inventory)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to negotiate registration update: %w", err)
|
|
}
|
|
case register.MsgSystemData:
|
|
systemData, err := hostinfo.FillData(data)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse system data: %w", err)
|
|
}
|
|
tmpl.Fill(systemData)
|
|
case register.MsgSystemDataV2:
|
|
systemData := map[string]interface{}{}
|
|
if err := json.Unmarshal(data, &systemData); err != nil {
|
|
return fmt.Errorf("failed to parse system data: %w", err)
|
|
}
|
|
tmpl.Fill(systemData)
|
|
default:
|
|
return fmt.Errorf("got unexpected message: %s", msgType)
|
|
}
|
|
if err := register.WriteMessage(conn, replyMsgType, replyData); err != nil {
|
|
return fmt.Errorf("cannot complete %s exchange", msgType)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (i *InventoryServer) handleUpdate(conn *websocket.Conn, protoVersion register.MessageType, inventory *elementalv1.MachineInventory) error {
|
|
if isNewInventory(inventory) {
|
|
log.Errorf("MachineInventory '%+v' was not found, but the machine is still running. Reprovisioning is needed.\n", inventory)
|
|
if writeErr := writeError(conn, protoVersion, register.NewErrorMessage(errInventoryNotFound)); writeErr != nil {
|
|
log.Errorf("Error reporting back error to client: %v\n", writeErr)
|
|
}
|
|
return errInventoryNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (i *InventoryServer) handleGetNetworkConfig(inventory *elementalv1.MachineInventory) (elementalv1.NetworkConfig, error) {
|
|
ctx, cancel := context.WithTimeout(context.TODO(), register.RegistrationDeadlineSeconds*time.Second)
|
|
defer cancel()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return elementalv1.NetworkConfig{}, fmt.Errorf("NewtworkConfig not ready")
|
|
default:
|
|
time.Sleep(time.Second)
|
|
}
|
|
if err := i.Get(ctx, client.ObjectKeyFromObject(inventory), inventory); err != nil {
|
|
return elementalv1.NetworkConfig{}, fmt.Errorf("getting machine inventory: %w", err)
|
|
}
|
|
conditionFound := false
|
|
for _, condition := range inventory.Status.Conditions {
|
|
if condition.Type == elementalv1.NetworkConfigReady {
|
|
conditionFound = true
|
|
if condition.Status == metav1.ConditionFalse {
|
|
log.Debug("NetworkConfigReady is false")
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
if !conditionFound {
|
|
log.Debug("NetworkConfigReady condition not found")
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
|
|
return inventory.Spec.Network, nil
|
|
}
|
|
|
|
func (i *InventoryServer) handleGet(conn *websocket.Conn, protoVersion register.MessageType, inventory *elementalv1.MachineInventory, registration *elementalv1.MachineRegistration) error {
|
|
var err error
|
|
var netConf elementalv1.NetworkConfig
|
|
|
|
inventory, err = i.commitMachineInventory(inventory)
|
|
if err != nil {
|
|
if writeErr := writeError(conn, protoVersion, register.NewErrorMessage(err)); writeErr != nil {
|
|
log.Errorf("Error reporting back error to client: %v", writeErr)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
if protoVersion >= register.MsgNetworkConfig {
|
|
netConf, err = i.handleGetNetworkConfig(inventory)
|
|
if err != nil {
|
|
if writeErr := writeError(conn, protoVersion, register.NewErrorMessage(err)); writeErr != nil {
|
|
log.Errorf("Error reporting back error to client: %v", writeErr)
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := i.writeMachineInventoryCloudConfig(conn, protoVersion, inventory, registration, netConf); err != nil {
|
|
if writeErr := writeError(conn, protoVersion, register.NewErrorMessage(err)); writeErr != nil {
|
|
log.Errorf("Error reporting back error to client: %v", writeErr)
|
|
}
|
|
|
|
return fmt.Errorf("failed sending elemental cloud config: %w", err)
|
|
}
|
|
|
|
log.Debugf("Elemental cloud config sent")
|
|
|
|
return nil
|
|
}
|
|
|
|
// writeError reports back an error to the client if the negotiated protocol
|
|
// version supports it.
|
|
func writeError(conn *websocket.Conn, protoVersion register.MessageType, errorMessage register.ErrorMessage) error {
|
|
if protoVersion < register.MsgError {
|
|
log.Debugf("client does not support reporting errors, skipping")
|
|
return nil
|
|
}
|
|
|
|
data, err := yaml.Marshal(errorMessage)
|
|
if err != nil {
|
|
return fmt.Errorf("error marshalling error-message: %w", err)
|
|
}
|
|
|
|
return register.WriteMessage(conn, register.MsgError, data)
|
|
}
|
|
|
|
func decodeProtocolVersion(data []byte) (register.MessageType, error) {
|
|
protoVersion := register.MsgUndefined
|
|
|
|
if len(data) != 1 {
|
|
return protoVersion, fmt.Errorf("unknown format: %v (%s)", data, data)
|
|
}
|
|
clientVersion := register.MessageType(data[0])
|
|
protoVersion = clientVersion
|
|
if clientVersion != register.MsgLast {
|
|
log.Debugf("elemental-register (%d) and elemental-operator (%d) protocol versions differ", clientVersion, register.MsgLast)
|
|
if clientVersion <= register.MsgGet {
|
|
return protoVersion, fmt.Errorf("elemental-register protocol version is deprecated (elemental-register client too old, protocol version = %d)", clientVersion)
|
|
}
|
|
if clientVersion > register.MsgLast {
|
|
protoVersion = register.MsgLast
|
|
}
|
|
}
|
|
|
|
return protoVersion, nil
|
|
}
|