spark-operator/sparkctl/cmd/create.go

474 lines
13 KiB
Go

/*
Copyright 2017 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"context"
"fmt"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"reflect"
"unicode/utf8"
"github.com/google/go-cloud/blob"
"github.com/spf13/cobra"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"
clientset "k8s.io/client-go/kubernetes"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
crdclientset "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned"
)
const bufferSize = 1024
const rootPath = "spark-app-dependencies"
var UploadToPath string
var UploadToEndpoint string
var UploadToRegion string
var Public bool
var Override bool
var From string
var createCmd = &cobra.Command{
Use: "create <yaml file>",
Short: "Create a SparkApplication object",
Long: `Create a SparkApplication from a given YAML file storing the application specification.`,
Run: func(cmd *cobra.Command, args []string) {
if From != "" && len(args) != 1 {
fmt.Fprintln(os.Stderr, "must specify the name of a ScheduledSparkApplication")
return
}
if len(args) != 1 {
fmt.Fprintln(os.Stderr, "must specify a YAML file of a SparkApplication")
return
}
kubeClient, err := getKubeClient()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get Kubernetes client: %v\n", err)
return
}
crdClient, err := getSparkApplicationClient()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get SparkApplication client: %v\n", err)
return
}
if From != "" {
if err := createFromScheduledSparkApplication(args[0], kubeClient, crdClient); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
} else {
if err := createFromYaml(args[0], kubeClient, crdClient); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
}
},
}
func init() {
createCmd.Flags().StringVarP(&UploadToPath, "upload-to", "u", "",
"a URL of the remote location where local application dependencies are to be uploaded to")
createCmd.Flags().StringVarP(&UploadToRegion, "upload-to-region", "r", "",
"the GCS or S3 storage region for the bucket")
createCmd.Flags().StringVarP(&UploadToEndpoint, "upload-to-endpoint", "e",
"https://storage.googleapis.com", "the GCS or S3 storage api endpoint url")
createCmd.Flags().BoolVarP(&Public, "public", "c", false,
"whether to make uploaded files publicly available")
createCmd.Flags().BoolVarP(&Override, "override", "o", false,
"whether to override remote files with the same names")
createCmd.Flags().StringVarP(&From, "from", "f", "",
"the name of ScheduledSparkApplication from which a forced SparkApplication run is created")
}
func createFromYaml(yamlFile string, kubeClient clientset.Interface, crdClient crdclientset.Interface) error {
app, err := loadFromYAML(yamlFile)
if err != nil {
return fmt.Errorf("failed to read a SparkApplication from %s: %v", yamlFile, err)
}
if err := createSparkApplication(app, kubeClient, crdClient); err != nil {
return fmt.Errorf("failed to create SparkApplication %s: %v", app.Name, err)
}
return nil
}
func createFromScheduledSparkApplication(name string, kubeClient clientset.Interface, crdClient crdclientset.Interface) error {
sapp, err := crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(Namespace).Get(From, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get ScheduledSparkApplication %s: %v", From, err)
}
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Namespace: Namespace,
Name: name,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: v1beta2.SchemeGroupVersion.String(),
Kind: reflect.TypeOf(v1beta2.ScheduledSparkApplication{}).Name(),
Name: sapp.Name,
UID: sapp.UID,
},
},
},
Spec: *sapp.Spec.Template.DeepCopy(),
}
if err := createSparkApplication(app, kubeClient, crdClient); err != nil {
return fmt.Errorf("failed to create SparkApplication %s: %v", app.Name, err)
}
return nil
}
func createSparkApplication(app *v1beta2.SparkApplication, kubeClient clientset.Interface, crdClient crdclientset.Interface) error {
v1beta2.SetSparkApplicationDefaults(app)
if err := validateSpec(app.Spec); err != nil {
return err
}
if err := handleLocalDependencies(app); err != nil {
return err
}
if hadoopConfDir := os.Getenv("HADOOP_CONF_DIR"); hadoopConfDir != "" {
fmt.Println("creating a ConfigMap for Hadoop configuration files in HADOOP_CONF_DIR")
if err := handleHadoopConfiguration(app, hadoopConfDir, kubeClient); err != nil {
return err
}
}
if _, err := crdClient.SparkoperatorV1beta2().SparkApplications(Namespace).Create(app); err != nil {
return err
}
fmt.Printf("SparkApplication \"%s\" created\n", app.Name)
return nil
}
func loadFromYAML(yamlFile string) (*v1beta2.SparkApplication, error) {
file, err := os.Open(yamlFile)
if err != nil {
return nil, err
}
defer file.Close()
decoder := yaml.NewYAMLOrJSONDecoder(file, bufferSize)
app := &v1beta2.SparkApplication{}
err = decoder.Decode(app)
if err != nil {
return nil, err
}
return app, nil
}
func validateSpec(spec v1beta2.SparkApplicationSpec) error {
if spec.Image == nil && (spec.Driver.Image == nil || spec.Executor.Image == nil) {
return fmt.Errorf("'spec.driver.image' and 'spec.executor.image' cannot be empty when 'spec.image' " +
"is not set")
}
return nil
}
func handleLocalDependencies(app *v1beta2.SparkApplication) error {
if app.Spec.MainApplicationFile != nil {
isMainAppFileLocal, err := isLocalFile(*app.Spec.MainApplicationFile)
if err != nil {
return err
}
if isMainAppFileLocal {
uploadedMainFile, err := uploadLocalDependencies(app, []string{*app.Spec.MainApplicationFile})
if err != nil {
return fmt.Errorf("failed to upload local main application file: %v", err)
}
app.Spec.MainApplicationFile = &uploadedMainFile[0]
}
}
localJars, err := filterLocalFiles(app.Spec.Deps.Jars)
if err != nil {
return fmt.Errorf("failed to filter local jars: %v", err)
}
if len(localJars) > 0 {
uploadedJars, err := uploadLocalDependencies(app, localJars)
if err != nil {
return fmt.Errorf("failed to upload local jars: %v", err)
}
app.Spec.Deps.Jars = uploadedJars
}
localFiles, err := filterLocalFiles(app.Spec.Deps.Files)
if err != nil {
return fmt.Errorf("failed to filter local files: %v", err)
}
if len(localFiles) > 0 {
uploadedFiles, err := uploadLocalDependencies(app, localFiles)
if err != nil {
return fmt.Errorf("failed to upload local files: %v", err)
}
app.Spec.Deps.Files = uploadedFiles
}
localPyFiles, err := filterLocalFiles(app.Spec.Deps.PyFiles)
if err != nil {
return fmt.Errorf("failed to filter local pyfiles: %v", err)
}
if len(localPyFiles) > 0 {
uploadedPyFiles, err := uploadLocalDependencies(app, localPyFiles)
if err != nil {
return fmt.Errorf("failed to upload local pyfiles: %v", err)
}
app.Spec.Deps.PyFiles = uploadedPyFiles
}
return nil
}
func filterLocalFiles(files []string) ([]string, error) {
var localFiles []string
for _, file := range files {
if isLocal, err := isLocalFile(file); err != nil {
return nil, err
} else if isLocal {
localFiles = append(localFiles, file)
}
}
return localFiles, nil
}
func isLocalFile(file string) (bool, error) {
fileUrl, err := url.Parse(file)
if err != nil {
return false, err
}
if fileUrl.Scheme == "file" || fileUrl.Scheme == "" {
return true, nil
}
return false, nil
}
type blobHandler interface {
// TODO: With go-cloud supporting setting ACLs, remove implementations of interface
setPublicACL(ctx context.Context, bucket string, filePath string) error
}
type uploadHandler struct {
blob blobHandler
blobUploadBucket string
blobEndpoint string
hdpScheme string
ctx context.Context
b *blob.Bucket
}
func (uh uploadHandler) uploadToBucket(uploadPath, localFilePath string) (string, error) {
fileName := filepath.Base(localFilePath)
uploadFilePath := filepath.Join(uploadPath, fileName)
// Check if exists by trying to fetch metadata
reader, err := uh.b.NewRangeReader(uh.ctx, uploadFilePath, 0, 0)
if err == nil {
reader.Close()
}
if (blob.IsNotExist(err)) || (err == nil && Override) {
fmt.Printf("uploading local file: %s\n", fileName)
// Prepare the file for upload.
data, err := ioutil.ReadFile(localFilePath)
if err != nil {
return "", fmt.Errorf("failed to read file: %s", err)
}
// Open Bucket
w, err := uh.b.NewWriter(uh.ctx, uploadFilePath, nil)
if err != nil {
return "", fmt.Errorf("failed to obtain bucket writer: %s", err)
}
// Write data to bucket and close bucket writer
_, writeErr := w.Write(data)
if err := w.Close(); err != nil {
return "", fmt.Errorf("failed to close bucket writer: %s", err)
}
// Check if write has been successful
if writeErr != nil {
return "", fmt.Errorf("failed to write to bucket: %s", err)
}
// Set public ACL if needed
if Public {
err := uh.blob.setPublicACL(uh.ctx, uh.blobUploadBucket, uploadFilePath)
if err != nil {
return "", err
}
endpointURL, err := url.Parse(uh.blobEndpoint)
if err != nil {
return "", err
}
// Public needs full bucket endpoint
return fmt.Sprintf("%s://%s/%s/%s",
endpointURL.Scheme,
endpointURL.Host,
uh.blobUploadBucket,
uploadFilePath), nil
}
} else if err == nil {
fmt.Printf("not uploading file %s as it already exists remotely\n", fileName)
} else {
return "", err
}
// Return path to file with proper hadoop-connector scheme
return fmt.Sprintf("%s://%s/%s", uh.hdpScheme, uh.blobUploadBucket, uploadFilePath), nil
}
func uploadLocalDependencies(app *v1beta2.SparkApplication, files []string) ([]string, error) {
if UploadToPath == "" {
return nil, fmt.Errorf(
"unable to upload local dependencies: no upload location specified via --upload-to")
}
uploadLocationUrl, err := url.Parse(UploadToPath)
if err != nil {
return nil, err
}
uploadBucket := uploadLocationUrl.Host
var uh *uploadHandler
ctx := context.Background()
switch uploadLocationUrl.Scheme {
case "gs":
uh, err = newGCSBlob(ctx, uploadBucket, UploadToEndpoint, UploadToRegion)
case "s3":
uh, err = newS3Blob(ctx, uploadBucket, UploadToEndpoint, UploadToRegion)
default:
return nil, fmt.Errorf("unsupported upload location URL scheme: %s", uploadLocationUrl.Scheme)
}
// Check if bucket has been successfully setup
if err != nil {
return nil, err
}
var uploadedFilePaths []string
uploadPath := filepath.Join(rootPath, app.Namespace, app.Name)
for _, localFilePath := range files {
uploadFilePath, err := uh.uploadToBucket(uploadPath, localFilePath)
if err != nil {
return nil, err
}
uploadedFilePaths = append(uploadedFilePaths, uploadFilePath)
}
return uploadedFilePaths, nil
}
func handleHadoopConfiguration(
app *v1beta2.SparkApplication,
hadoopConfDir string,
kubeClientset clientset.Interface) error {
configMap, err := buildHadoopConfigMap(app.Name, hadoopConfDir)
if err != nil {
return fmt.Errorf("failed to create a ConfigMap for Hadoop configuration files in %s: %v",
hadoopConfDir, err)
}
err = kubeClientset.CoreV1().ConfigMaps(Namespace).Delete(configMap.Name, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete existing ConfigMap %s: %v", configMap.Name, err)
}
if configMap, err = kubeClientset.CoreV1().ConfigMaps(Namespace).Create(configMap); err != nil {
return fmt.Errorf("failed to create ConfigMap %s: %v", configMap.Name, err)
}
app.Spec.HadoopConfigMap = &configMap.Name
return nil
}
func buildHadoopConfigMap(appName string, hadoopConfDir string) (*apiv1.ConfigMap, error) {
info, err := os.Stat(hadoopConfDir)
if err != nil {
return nil, err
}
if !info.IsDir() {
return nil, fmt.Errorf("%s is not a directory", hadoopConfDir)
}
files, err := ioutil.ReadDir(hadoopConfDir)
if err != nil {
return nil, err
}
if len(files) == 0 {
return nil, fmt.Errorf("no Hadoop configuration file found in %s", hadoopConfDir)
}
hadoopStringConfigFiles := make(map[string]string)
hadoopBinaryConfigFiles := make(map[string][]byte)
for _, file := range files {
if file.IsDir() {
continue
}
content, err := ioutil.ReadFile(filepath.Join(hadoopConfDir, file.Name()))
if err != nil {
return nil, err
}
if utf8.Valid(content) {
hadoopStringConfigFiles[file.Name()] = string(content)
} else {
hadoopBinaryConfigFiles[file.Name()] = content
}
}
configMap := &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: appName + "-hadoop-config",
Namespace: Namespace,
},
Data: hadoopStringConfigFiles,
BinaryData: hadoopBinaryConfigFiles,
}
return configMap, nil
}