Initial implementation of multi app run for Kubernetes Dev (#1333)

* initial commit for multi-app run k8s impl

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix import

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* move runfileconfig

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* add protobuf conflict warn env

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* Add pubsub component. Check before component creation.

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix e2e

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix e2e

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix e2e

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* address review comments.

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

---------

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
This commit is contained in:
Mukundan Sundararajan 2023-09-03 20:12:58 -07:00 committed by GitHub
parent 6738eefe2b
commit a15a3eb856
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1402 additions and 167 deletions

View File

@ -42,6 +42,7 @@ jobs:
DAPR_DASHBOARD_PINNED_VERSION: 0.13.0
DAPR_RUNTIME_LATEST_STABLE_VERSION:
DAPR_DASHBOARD_LATEST_STABLE_VERSION:
GOLANG_PROTOBUF_REGISTRATION_CONFLICT: warn
PODMAN_VERSION: 4.4.4
strategy:
# TODO: Remove this when our E2E tests are stable for podman on MacOS.

2
.gitignore vendored
View File

@ -24,6 +24,8 @@ cli
# CLI's auto-generated components directory
**/components
# Auto generated deploy dir inside .dapr directory
**/.dapr/deploy
# Auto generated logs dir inside .dapr directory
**/.dapr/logs

View File

@ -33,6 +33,7 @@ var (
wait bool
timeout uint
slimMode bool
devMode bool
runtimeVersion string
dashboardVersion string
allNamespaces bool
@ -68,6 +69,9 @@ dapr init --image-registry <registry-url>
# Initialize Dapr in Kubernetes
dapr init -k
# Initialize Dapr in Kubernetes in dev mode
dapr init -k --dev
# Initialize Dapr in Kubernetes and wait for the installation to complete (default timeout is 300s/5m)
dapr init -k --wait --timeout 600
@ -127,6 +131,7 @@ dapr init --runtime-path <path-to-install-directory>
DashboardVersion: dashboardVersion,
EnableMTLS: enableMTLS,
EnableHA: enableHA,
EnableDev: devMode,
Args: values,
Wait: wait,
Timeout: timeout,
@ -202,6 +207,7 @@ func init() {
defaultContainerRuntime := string(utils.DOCKER)
InitCmd.Flags().BoolVarP(&kubernetesMode, "kubernetes", "k", false, "Deploy Dapr to a Kubernetes cluster")
InitCmd.Flags().BoolVarP(&devMode, "dev", "", false, "Use Dev mode. Deploy Redis, Zipkin also in the Kubernetes cluster")
InitCmd.Flags().BoolVarP(&wait, "wait", "", false, "Wait for Kubernetes initialization to complete")
InitCmd.Flags().UintVarP(&timeout, "timeout", "", 300, "The wait timeout for the Kubernetes installation")
InitCmd.Flags().BoolVarP(&slimMode, "slim", "s", false, "Exclude placement service, Redis and Zipkin containers from self-hosted installation")

View File

@ -27,11 +27,12 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/dapr/cli/pkg/kubernetes"
"github.com/dapr/cli/pkg/metadata"
"github.com/dapr/cli/pkg/print"
runExec "github.com/dapr/cli/pkg/runexec"
"github.com/dapr/cli/pkg/runfileconfig"
"github.com/dapr/cli/pkg/standalone"
"github.com/dapr/cli/pkg/standalone/runfileconfig"
daprsyscall "github.com/dapr/cli/pkg/syscall"
"github.com/dapr/cli/utils"
)
@ -64,6 +65,7 @@ var (
apiListenAddresses string
runFilePath string
appChannelAddress string
enableRunK8s bool
)
const (
@ -105,6 +107,12 @@ dapr run --run-file dapr.yaml
# Run multiple apps by providing a directory path containing the run config file(dapr.yaml)
dapr run --run-file /path/to/directory
# Run multiple apps in Kubernetes by proficing path of a run config file
dapr run --run-file dapr.yaml -k
# Run multiple apps in Kubernetes by providing a directory path containing the run config file(dapr.yaml)
dapr run --run-file /path/to/directory -k
`,
Args: cobra.MinimumNArgs(0),
PreRun: func(cmd *cobra.Command, args []string) {
@ -117,7 +125,7 @@ dapr run --run-file /path/to/directory
print.FailureStatusEvent(os.Stderr, "Failed to get run file path: %v", err)
os.Exit(1)
}
executeRunWithAppsConfigFile(runConfigFilePath)
executeRunWithAppsConfigFile(runConfigFilePath, enableRunK8s)
return
}
if len(args) == 0 {
@ -457,6 +465,7 @@ func init() {
RunCmd.Flags().IntVar(&appHealthTimeout, "app-health-probe-timeout", 0, "Timeout for app health probes in milliseconds")
RunCmd.Flags().IntVar(&appHealthThreshold, "app-health-threshold", 0, "Number of consecutive failures for the app to be considered unhealthy")
RunCmd.Flags().BoolVar(&enableAPILogging, "enable-api-logging", false, "Log API calls at INFO verbosity. Valid values are: true or false")
RunCmd.Flags().BoolVarP(&enableRunK8s, "kubernetes", "k", false, "Run the multi-app run template against Kubernetes environment.")
RunCmd.Flags().StringVar(&apiListenAddresses, "dapr-listen-addresses", "", "Comma separated list of IP addresses that sidecar will listen to")
RunCmd.Flags().StringVarP(&runFilePath, "run-file", "f", "", "Path to the run template file for the list of apps to run")
RunCmd.Flags().StringVarP(&appChannelAddress, "app-channel-address", "", utils.DefaultAppChannelAddress, "The network address the application listens on")
@ -507,11 +516,11 @@ func executeRun(runTemplateName, runFilePath string, apps []runfileconfig.App) (
// A custom writer used for trimming ASCII color codes from logs when writing to files.
var customAppLogWriter io.Writer
daprdLogWriterCloser := getLogWriter(app.DaprdLogWriteCloser, app.DaprdLogDestination)
daprdLogWriterCloser := runfileconfig.GetLogWriter(app.DaprdLogWriteCloser, app.DaprdLogDestination)
if len(runConfig.Command) == 0 {
print.StatusEvent(os.Stdout, print.LogWarning, "No application command found for app %q present in %s", runConfig.AppID, runFilePath)
appDaprdWriter = getAppDaprdWriter(app, true)
appDaprdWriter = runExec.GetAppDaprdWriter(app, true)
appLogWriter = app.DaprdLogWriteCloser
} else {
err = app.CreateAppLogFile()
@ -520,8 +529,8 @@ func executeRun(runTemplateName, runFilePath string, apps []runfileconfig.App) (
exitWithError = true
break
}
appDaprdWriter = getAppDaprdWriter(app, false)
appLogWriter = getLogWriter(app.AppLogWriteCloser, app.AppLogDestination)
appDaprdWriter = runExec.GetAppDaprdWriter(app, false)
appLogWriter = runfileconfig.GetLogWriter(app.AppLogWriteCloser, app.AppLogDestination)
}
customAppLogWriter = print.CustomLogWriter{W: appLogWriter}
runState, err := startDaprdAndAppProcesses(&runConfig, app.AppDirPath, sigCh,
@ -590,43 +599,6 @@ func executeRun(runTemplateName, runFilePath string, apps []runfileconfig.App) (
return exitWithError, closeError
}
// getAppDaprdWriter returns the writer for writing logs common to both daprd, app and stdout.
func getAppDaprdWriter(app runfileconfig.App, isAppCommandEmpty bool) io.Writer {
var appDaprdWriter io.Writer
if isAppCommandEmpty {
if app.DaprdLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(os.Stdout, app.DaprdLogWriteCloser)
} else {
appDaprdWriter = os.Stdout
}
} else {
if app.AppLogDestination != standalone.Console && app.DaprdLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(app.AppLogWriteCloser, app.DaprdLogWriteCloser, os.Stdout)
} else if app.AppLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(app.AppLogWriteCloser, os.Stdout)
} else if app.DaprdLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(app.DaprdLogWriteCloser, os.Stdout)
} else {
appDaprdWriter = os.Stdout
}
}
return appDaprdWriter
}
// getLogWriter returns the log writer based on the log destination.
func getLogWriter(fileLogWriterCloser io.WriteCloser, logDestination standalone.LogDestType) io.Writer {
var logWriter io.Writer
switch logDestination {
case standalone.Console:
logWriter = os.Stdout
case standalone.File:
logWriter = fileLogWriterCloser
case standalone.FileAndConsole:
logWriter = io.MultiWriter(os.Stdout, fileLogWriterCloser)
}
return logWriter
}
func logInformationalStatusToStdout(app runfileconfig.App) {
print.InfoStatusEvent(os.Stdout, "Started Dapr with app id %q. HTTP Port: %d. gRPC Port: %d",
app.AppID, app.RunConfig.HTTPPort, app.RunConfig.GRPCPort)
@ -652,9 +624,8 @@ func gracefullyShutdownAppsAndCloseResources(runState []*runExec.RunExec, apps [
return err
}
func executeRunWithAppsConfigFile(runFilePath string) {
config := runfileconfig.RunFileConfig{}
apps, err := config.GetApps(runFilePath)
func executeRunWithAppsConfigFile(runFilePath string, k8sEnabled bool) {
config, apps, err := getRunConfigFromRunFile(runFilePath)
if err != nil {
print.StatusEvent(os.Stdout, print.LogFailure, "Error getting apps from config file: %s", err)
os.Exit(1)
@ -663,7 +634,13 @@ func executeRunWithAppsConfigFile(runFilePath string) {
print.StatusEvent(os.Stdout, print.LogFailure, "No apps to run")
os.Exit(1)
}
exitWithError, closeErr := executeRun(config.Name, runFilePath, apps)
var exitWithError bool
var closeErr error
if !k8sEnabled {
exitWithError, closeErr = executeRun(config.Name, runFilePath, apps)
} else {
exitWithError, closeErr = kubernetes.Run(runFilePath, config)
}
if exitWithError {
if closeErr != nil {
print.StatusEvent(os.Stdout, print.LogFailure, "Error closing resources: %s", closeErr)
@ -672,6 +649,12 @@ func executeRunWithAppsConfigFile(runFilePath string) {
}
}
func getRunConfigFromRunFile(runFilePath string) (runfileconfig.RunFileConfig, []runfileconfig.App, error) {
config := runfileconfig.RunFileConfig{}
apps, err := config.GetApps(runFilePath)
return config, apps, err
}
// startDaprdAndAppProcesses is a function to start the App process and the associated Daprd process.
// This should be called as a blocking function call.
func startDaprdAndAppProcesses(runConfig *standalone.RunConfig, commandDir string, sigCh chan os.Signal,

View File

@ -20,11 +20,15 @@ import (
"github.com/spf13/cobra"
"github.com/dapr/cli/pkg/kubernetes"
"github.com/dapr/cli/pkg/print"
"github.com/dapr/cli/pkg/standalone"
)
var stopAppID string
var (
stopAppID string
stopK8s bool
)
var StopCmd = &cobra.Command{
Use: "stop",
@ -38,6 +42,12 @@ dapr stop --run-file dapr.yaml
# Stop multiple apps by providing a directory path containing the run config file(dapr.yaml)
dapr stop --run-file /path/to/directory
# Stop and delete Kubernetes deployment of multiple apps by providing a run config file
dapr stop --run-file dapr.yaml -k
# Stop and delete Kubernetes deployment of multiple apps by providing a directory path containing the run config file(dapr.yaml)
dapr stop --run-file /path/to/directory -k
`,
Run: func(cmd *cobra.Command, args []string) {
var err error
@ -47,13 +57,23 @@ dapr stop --run-file /path/to/directory
print.FailureStatusEvent(os.Stderr, "Failed to get run file path: %v", err)
os.Exit(1)
}
err = executeStopWithRunFile(runFilePath)
if err != nil {
print.FailureStatusEvent(os.Stderr, "Failed to stop Dapr and app processes: %s", err)
} else {
print.SuccessStatusEvent(os.Stdout, "Dapr and app processes stopped successfully")
if !stopK8s {
err = executeStopWithRunFile(runFilePath)
if err != nil {
print.FailureStatusEvent(os.Stderr, "Failed to stop Dapr and app processes: %s", err)
} else {
print.SuccessStatusEvent(os.Stdout, "Dapr and app processes stopped successfully")
}
return
}
config, _, cErr := getRunConfigFromRunFile(runFilePath)
if cErr != nil {
print.FailureStatusEvent(os.Stderr, "Failed to parse run template file %q: %s", runFilePath, cErr.Error())
}
err = kubernetes.Stop(runFilePath, config)
if err != nil {
print.FailureStatusEvent(os.Stderr, "Error stopping deployments from multi-app run template: %v", err)
}
return
}
if stopAppID != "" {
args = append(args, stopAppID)
@ -78,6 +98,7 @@ dapr stop --run-file /path/to/directory
func init() {
StopCmd.Flags().StringVarP(&stopAppID, "app-id", "a", "", "The application id to be stopped")
StopCmd.Flags().StringVarP(&runFilePath, "run-file", "f", "", "Path to the run template file for the list of apps to stop")
StopCmd.Flags().BoolVarP(&stopK8s, "kubernetes", "k", false, "Stop deployments in Kunernetes based on multi-app run file")
StopCmd.Flags().BoolP("help", "h", false, "Print this help message")
RootCmd.AddCommand(StopCmd)
}

View File

@ -30,6 +30,7 @@ import (
var (
uninstallNamespace string
uninstallKubernetes bool
uninstallDev bool
uninstallAll bool
uninstallContainerRuntime string
)
@ -48,6 +49,15 @@ dapr uninstall --all
# Uninstall from Kubernetes
dapr uninstall -k
# Uninstall from Kubernetes and remove CRDs
dapr uninstall -k --all
# Uninstall from Kubernetes remove dev deployments of Redis, Zipkin
dapr uninstall -k --dev
# Uninstall from Kubernetes remove dev deployments of Redis, Zipkin and CRDs
dapr uninstall -k --dev --all
# Uninstall Dapr from non-default install directory
# This will remove the .dapr directory present in the path <path-to-install-directory>
dapr uninstall --runtime-path <path-to-install-directory>
@ -66,7 +76,7 @@ dapr uninstall --runtime-path <path-to-install-directory>
}
print.InfoStatusEvent(os.Stdout, "Removing Dapr from your cluster...")
err = kubernetes.Uninstall(uninstallNamespace, uninstallAll, timeout)
err = kubernetes.Uninstall(uninstallNamespace, uninstallAll, uninstallDev, timeout)
} else {
if !utils.IsValidContainerRuntime(uninstallContainerRuntime) {
print.FailureStatusEvent(os.Stdout, "Invalid container runtime. Supported values are docker and podman.")
@ -87,6 +97,7 @@ dapr uninstall --runtime-path <path-to-install-directory>
func init() {
UninstallCmd.Flags().BoolVarP(&uninstallKubernetes, "kubernetes", "k", false, "Uninstall Dapr from a Kubernetes cluster")
UninstallCmd.Flags().BoolVarP(&uninstallDev, "dev", "", false, "Uninstall Dapr Redis and Zipking installations from Kubernetes cluster")
UninstallCmd.Flags().UintVarP(&timeout, "timeout", "", 300, "The timeout for the Kubernetes uninstall")
UninstallCmd.Flags().BoolVar(&uninstallAll, "all", false, "Remove .dapr directory, Redis, Placement and Zipkin containers on local machine, and CRDs on a Kubernetes cluster")
UninstallCmd.Flags().String("network", "", "The Docker network from which to remove the Dapr runtime")

View File

@ -25,6 +25,7 @@ import (
"github.com/dapr/cli/pkg/age"
"github.com/dapr/cli/utils"
v1alpha1 "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
"github.com/dapr/dapr/pkg/client/clientset/versioned"
)
// ComponentsOutput represent a Dapr component.
@ -46,21 +47,37 @@ func PrintComponents(name, namespace, outputFormat string) error {
return nil, err
}
list, err := client.ComponentsV1alpha1().Components(namespace).List(meta_v1.ListOptions{})
// This means that the Dapr Components CRD is not installed and
// therefore no component items exist.
if apierrors.IsNotFound(err) {
list = &v1alpha1.ComponentList{
Items: []v1alpha1.Component{},
}
} else if err != nil {
return nil, err
}
return list, nil
return listComponents(client, namespace)
}, name, outputFormat)
}
func listComponents(client versioned.Interface, namespace string) (*v1alpha1.ComponentList, error) {
list, err := client.ComponentsV1alpha1().Components(namespace).List(meta_v1.ListOptions{})
// This means that the Dapr Components CRD is not installed and
// therefore no component items exist.
if apierrors.IsNotFound(err) {
list = &v1alpha1.ComponentList{
Items: []v1alpha1.Component{},
}
} else if err != nil {
return nil, err
}
return list, nil
}
func getComponent(client versioned.Interface, namespace string, componentName string) (*v1alpha1.Component, error) {
c, err := client.ComponentsV1alpha1().Components(namespace).Get(componentName, meta_v1.GetOptions{})
// This means that the Dapr Components CRD is not installed and
// therefore no component items exist.
if apierrors.IsNotFound(err) {
return &v1alpha1.Component{}, nil
} else if err != nil {
return nil, err
}
return c, err
}
func writeComponents(writer io.Writer, getConfigFunc func() (*v1alpha1.ComponentList, error), name, outputFormat string) error {
confs, err := getConfigFunc()
if err != nil {

View File

@ -26,6 +26,7 @@ import (
"github.com/dapr/cli/pkg/age"
"github.com/dapr/cli/utils"
v1alpha1 "github.com/dapr/dapr/pkg/apis/configuration/v1alpha1"
"github.com/dapr/dapr/pkg/client/clientset/versioned"
)
type configurationsOutput struct {
@ -66,6 +67,18 @@ func PrintConfigurations(name, namespace, outputFormat string) error {
}, name, outputFormat)
}
func getDaprConfiguration(client versioned.Interface, namespace string, configurationName string) (*v1alpha1.Configuration, error) {
c, err := client.ConfigurationV1alpha1().Configurations(namespace).Get(configurationName, meta_v1.GetOptions{})
// This means that the Dapr Configurations CRD is not installed and
// therefore no configuration items exist.
if apierrors.IsNotFound(err) {
return &v1alpha1.Configuration{}, nil
} else if err != nil {
return nil, err
}
return c, err
}
func writeConfigurations(writer io.Writer, getConfigFunc func() (*v1alpha1.ConfigurationList, error), name, outputFormat string) error {
confs, err := getConfigFunc()
if err != nil {

View File

@ -33,13 +33,27 @@ import (
"github.com/dapr/cli/pkg/print"
cli_ver "github.com/dapr/cli/pkg/version"
"github.com/dapr/cli/utils"
"github.com/dapr/dapr/pkg/client/clientset/versioned"
)
const (
daprReleaseName = "dapr"
dashboardReleaseName = "dapr-dashboard"
daprHelmRepo = "https://dapr.github.io/helm-charts"
latestVersion = "latest"
// dev mode constants.
thirdPartyDevNamespace = "default"
zipkinChartName = "zipkin"
redisChartName = "redis"
zipkinReleaseName = "dapr-dev-zipkin"
redisReleaseName = "dapr-dev-redis"
redisVersion = "6.2"
bitnamiHelmRepo = "https://charts.bitnami.com/bitnami"
daprHelmRepo = "https://dapr.github.io/helm-charts"
zipkinHelmRepo = "https://openzipkin.github.io/zipkin"
stateStoreComponentName = "statestore"
pubsubComponentName = "pubsub"
zipkingConfigurationName = "appconfig"
)
type InitConfiguration struct {
@ -48,6 +62,7 @@ type InitConfiguration struct {
Namespace string
EnableMTLS bool
EnableHA bool
EnableDev bool
Args []string
Wait bool
Timeout uint
@ -60,7 +75,8 @@ type InitConfiguration struct {
// Init deploys the Dapr operator using the supplied runtime version.
func Init(config InitConfiguration) error {
err := installWithConsole(daprReleaseName, config.Version, "Dapr control plane", config)
helmRepoDapr := utils.GetEnv("DAPR_HELM_REPO_URL", daprHelmRepo)
err := installWithConsole(daprReleaseName, config.Version, helmRepoDapr, "Dapr control plane", config)
if err != nil {
return err
}
@ -75,19 +91,53 @@ func Init(config InitConfiguration) error {
}
}
err = installWithConsole(dashboardReleaseName, config.DashboardVersion, "Dapr dashboard", config)
err = installWithConsole(dashboardReleaseName, config.DashboardVersion, helmRepoDapr, "Dapr dashboard", config)
if err != nil {
return err
}
if config.EnableDev {
redisChartVals := []string{
"image.tag=" + redisVersion,
}
err = installThirdPartyWithConsole(redisReleaseName, redisChartName, latestVersion, bitnamiHelmRepo, "Dapr Redis", redisChartVals, config)
if err != nil {
return err
}
err = installThirdPartyWithConsole(zipkinReleaseName, zipkinChartName, latestVersion, zipkinHelmRepo, "Dapr Zipkin", []string{}, config)
if err != nil {
return err
}
err = initDevConfigs()
if err != nil {
return err
}
}
return nil
}
func installWithConsole(releaseName string, releaseVersion string, prettyName string, config InitConfiguration) error {
func installThirdPartyWithConsole(releaseName, chartName, releaseVersion, helmRepo string, prettyName string, chartValues []string, config InitConfiguration) error {
installSpinning := print.Spinner(os.Stdout, "Deploying the "+prettyName+" with "+releaseVersion+" version to your cluster...")
defer installSpinning(print.Failure)
err := install(releaseName, releaseVersion, config)
// releaseVersion of chart will always be latest version.
err := installThirdParty(releaseName, chartName, latestVersion, helmRepo, chartValues, config)
if err != nil {
return err
}
installSpinning(print.Success)
return nil
}
func installWithConsole(releaseName, releaseVersion, helmRepo string, prettyName string, config InitConfiguration) error {
installSpinning := print.Spinner(os.Stdout, "Deploying the "+prettyName+" with "+releaseVersion+" version to your cluster...")
defer installSpinning(print.Failure)
err := install(releaseName, releaseVersion, helmRepo, config)
if err != nil {
return err
}
@ -156,9 +206,9 @@ func locateChartFile(dirPath string) (string, error) {
return filepath.Join(dirPath, files[0].Name()), nil
}
func daprChart(version string, releaseName string, config *helm.Configuration) (*chart.Chart, error) {
func getHelmChart(version, releaseName, helmRepo string, config *helm.Configuration) (*chart.Chart, error) {
pull := helm.NewPullWithOpts(helm.WithConfig(config))
pull.RepoURL = utils.GetEnv("DAPR_HELM_REPO_URL", daprHelmRepo)
pull.RepoURL = helmRepo
pull.Username = utils.GetEnv("DAPR_HELM_REPO_USERNAME", "")
pull.Password = utils.GetEnv("DAPR_HELM_REPO_PASSWORD", "")
@ -188,7 +238,7 @@ func daprChart(version string, releaseName string, config *helm.Configuration) (
return loader.Load(chartPath)
}
func chartValues(config InitConfiguration, version string) (map[string]interface{}, error) {
func daprChartValues(config InitConfiguration, version string) (map[string]interface{}, error) {
chartVals := map[string]interface{}{}
err := utils.ValidateImageVariant(config.ImageVariant)
if err != nil {
@ -227,7 +277,7 @@ func chartValues(config InitConfiguration, version string) (map[string]interface
return chartVals, nil
}
func install(releaseName string, releaseVersion string, config InitConfiguration) error {
func install(releaseName, releaseVersion, helmRepo string, config InitConfiguration) error {
err := createNamespace(config.Namespace)
if err != nil {
return err
@ -238,7 +288,7 @@ func install(releaseName string, releaseVersion string, config InitConfiguration
return err
}
daprChart, err := daprChart(releaseVersion, releaseName, helmConf)
daprChart, err := getHelmChart(releaseVersion, releaseName, helmRepo, helmConf)
if err != nil {
return err
}
@ -261,7 +311,7 @@ func install(releaseName string, releaseVersion string, config InitConfiguration
installClient.Wait = config.Wait
installClient.Timeout = time.Duration(config.Timeout) * time.Second
values, err := chartValues(config, version)
values, err := daprChartValues(config, version)
if err != nil {
return err
}
@ -273,6 +323,38 @@ func install(releaseName string, releaseVersion string, config InitConfiguration
return nil
}
func installThirdParty(releaseName, chartName, releaseVersion, helmRepo string, chartVals []string, config InitConfiguration) error {
helmConf, err := helmConfig(thirdPartyDevNamespace)
if err != nil {
return err
}
helmChart, err := getHelmChart(releaseVersion, chartName, helmRepo, helmConf)
if err != nil {
return err
}
installClient := helm.NewInstall(helmConf)
installClient.ReleaseName = releaseName
installClient.Namespace = thirdPartyDevNamespace
installClient.Wait = config.Wait
installClient.Timeout = time.Duration(config.Timeout) * time.Second
values := map[string]interface{}{}
for _, val := range chartVals {
if err = strvals.ParseInto(val, values); err != nil {
return err
}
}
if _, err = installClient.Run(helmChart, values); err != nil {
return err
}
return nil
}
func debugLogf(format string, v ...interface{}) {
}
@ -290,3 +372,163 @@ func confirmExist(cfg *helm.Configuration, releaseName string) (bool, error) {
return true, nil
}
func checkAndOverWriteFile(filePath string, b []byte) error {
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
// #nosec G306
if err = os.WriteFile(filePath, b, 0o644); err != nil {
return err
}
}
return nil
}
func isComponentPresent(client versioned.Interface, namespace string, componentName string) (bool, error) {
c, err := getComponent(client, namespace, componentName)
if err != nil {
return false, err
}
return c.Name == componentName, err
}
func isConfigurationPresent(client versioned.Interface, namespace string, configurationName string) (bool, error) {
c, err := getDaprConfiguration(client, namespace, configurationName)
if err != nil {
return false, err
}
return c.Name == configurationName, nil
}
func initDevConfigs() error {
redisStatestore := `
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
# These settings will work out of the box if you use helm install
# bitnami/redis. If you have your own setup, replace
# redis-master:6379 with your own Redis master address, and the
# Redis password with your own Secret's name. For more information,
# see https://docs.dapr.io/operations/components/component-secrets .
- name: redisHost
value: dapr-dev-redis-master:6379
- name: redisPassword
secretKeyRef:
name: dapr-dev-redis
key: redis-password
auth:
secretStore: kubernetes
`
redisPubsub := `
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
# These settings will work out of the box if you use helm install
# bitnami/redis. If you have your own setup, replace
# redis-master:6379 with your own Redis master address, and the
# Redis password with your own Secret's name. For more information,
# see https://docs.dapr.io/operations/components/component-secrets .
- name: redisHost
value: dapr-dev-redis-master:6379
- name: redisPassword
secretKeyRef:
name: dapr-dev-redis
key: redis-password
auth:
secretStore: kubernetes
`
zipkinConfig := `
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: appconfig
spec:
tracing:
samplingRate: "1"
zipkin:
endpointAddress: "http://dapr-dev-zipkin.default.svc.cluster.local:9411/api/v2/spans"
`
tempDirPath, err := createTempDir()
defer os.RemoveAll(tempDirPath)
if err != nil {
return err
}
client, err := DaprClient()
if err != nil {
return err
}
present, err := isComponentPresent(client, thirdPartyDevNamespace, stateStoreComponentName)
if present || err != nil {
if err != nil {
print.WarningStatusEvent(os.Stderr, "Error listing components, skipping default dev component creation.")
} else {
print.WarningStatusEvent(os.Stderr, "Component with name %q already present in namespace %q. Skipping component creation.", stateStoreComponentName, thirdPartyDevNamespace)
}
return err
}
redisPath := filepath.Join(tempDirPath, "redis-statestore.yaml")
err = checkAndOverWriteFile(redisPath, []byte(redisStatestore))
if err != nil {
return err
}
print.InfoStatusEvent(os.Stdout, "Applying %q component to Kubernetes %q namespace.", stateStoreComponentName, thirdPartyDevNamespace)
_, err = utils.RunCmdAndWait("kubectl", "apply", "-f", redisPath)
if err != nil {
return err
}
present, err = isComponentPresent(client, thirdPartyDevNamespace, pubsubComponentName)
if present || err != nil {
if err != nil {
print.WarningStatusEvent(os.Stderr, "Error listing components, skipping default dev component creation.")
} else {
print.WarningStatusEvent(os.Stderr, "Component with name %q already present in namespace %q. Skipping component creation.", pubsubComponentName, thirdPartyDevNamespace)
}
return err
}
redisPath = filepath.Join(tempDirPath, "redis-pubsub.yaml")
err = checkAndOverWriteFile(redisPath, []byte(redisPubsub))
if err != nil {
return err
}
print.InfoStatusEvent(os.Stdout, "Applying %q component to Kubernetes %q namespace.", pubsubComponentName, thirdPartyDevNamespace)
_, err = utils.RunCmdAndWait("kubectl", "apply", "-f", redisPath)
if err != nil {
return err
}
present, err = isConfigurationPresent(client, thirdPartyDevNamespace, zipkingConfigurationName)
if present || err != nil {
if err != nil {
print.WarningStatusEvent(os.Stderr, "Error listing configurations, skipping default dev configuration creation.")
} else {
print.WarningStatusEvent(os.Stderr, "Configuration with name %q already present in namespace %q. Skipping configuration creation.", zipkingConfigurationName, thirdPartyDevNamespace)
}
return err
}
zipkinPath := filepath.Join(tempDirPath, "zipkin-config.yaml")
err = checkAndOverWriteFile(zipkinPath, []byte(zipkinConfig))
if err != nil {
return err
}
print.InfoStatusEvent(os.Stdout, "Applying %q zipkin configuration to Kubernetes %q namespace.", zipkingConfigurationName, thirdPartyDevNamespace)
_, err = utils.RunCmdAndWait("kubectl", "apply", "-f", zipkinPath)
if err != nil {
return err
}
return nil
}

View File

@ -14,17 +14,32 @@ limitations under the License.
package kubernetes
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/dapr/cli/pkg/print"
)
const (
daprdContainerName = "daprd"
appIDContainerArgName = "--app-id"
// number of retries when trying to list pods for getting logs.
maxListingRetry = 10
// delay between retries of pod listing.
listingDelay = 200 * time.Microsecond
// delay before retrying for getting logs.
streamingDelay = 100 * time.Millisecond
)
// Logs fetches Dapr sidecar logs from Kubernetes.
@ -84,3 +99,99 @@ func Logs(appID, podName, namespace string) error {
return nil
}
// streamContainerLogsToDisk streams all containers logs for the given selector to a given disk directory.
func streamContainerLogsToDisk(ctx context.Context, appID string, appLogWriter, daprdLogWriter io.Writer, podClient v1.PodInterface) error {
var err error
var podList *corev1.PodList
counter := 0
for {
podList, err = getPods(ctx, appID, podClient)
if err != nil {
return fmt.Errorf("error listing the pod with label %s=%s: %w", daprAppIDKey, appID, err)
}
if len(podList.Items) != 0 {
break
}
counter++
if counter == maxListingRetry {
return fmt.Errorf("error getting logs: error listing the pod with label %s=%s after %d retires", daprAppIDKey, appID, maxListingRetry)
}
// Retry after a delay.
time.Sleep(listingDelay)
}
for _, pod := range podList.Items {
print.InfoStatusEvent(os.Stdout, "Streaming logs for containers in pod %q", pod.GetName())
for _, container := range pod.Spec.Containers {
fileWriter := daprdLogWriter
if container.Name != daprdContainerName {
fileWriter = appLogWriter
}
// create a go routine for each container to stream logs into file/console.
go func(pod, containerName, appID string, fileWriter io.Writer) {
loop:
for {
req := podClient.GetLogs(pod, &corev1.PodLogOptions{
Container: containerName,
Follow: true,
})
stream, err := req.Stream(ctx)
if err != nil {
switch {
case strings.Contains(err.Error(), "Pending"):
// Retry after a delay.
time.Sleep(streamingDelay)
continue loop
case strings.Contains(err.Error(), "ContainerCreating"):
// Retry after a delay.
time.Sleep(streamingDelay)
continue loop
case errors.Is(err, context.Canceled):
return
default:
return
}
}
defer stream.Close()
if containerName != daprdContainerName {
streamScanner := bufio.NewScanner(stream)
for streamScanner.Scan() {
fmt.Fprintln(fileWriter, print.Blue(fmt.Sprintf("== APP - %s == %s", appID, streamScanner.Text())))
}
} else {
_, err = io.Copy(fileWriter, stream)
if err != nil {
switch {
case errors.Is(err, context.Canceled):
return
default:
return
}
}
}
return
}
}(pod.GetName(), container.Name, appID, fileWriter)
}
}
return nil
}
func getPods(ctx context.Context, appID string, podClient v1.PodInterface) (*corev1.PodList, error) {
listCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
labelSelector := fmt.Sprintf("%s=%s", daprAppIDKey, appID)
fmt.Println("Select", labelSelector)
podList, err := podClient.List(listCtx, metav1.ListOptions{
LabelSelector: labelSelector,
})
cancel()
if err != nil {
return nil, err
}
return podList, nil
}

View File

@ -15,24 +15,31 @@ package kubernetes
import (
"context"
"errors"
"fmt"
"strings"
core_v1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
k8s "k8s.io/client-go/kubernetes"
)
func ListPodsInterface(client k8s.Interface, labelSelector map[string]string) (*core_v1.PodList, error) {
opts := v1.ListOptions{}
const podWatchErrTemplate = "error creating pod watcher"
var errPodUnknown error = errors.New("pod in unknown/failed state")
func ListPodsInterface(client k8s.Interface, labelSelector map[string]string) (*corev1.PodList, error) {
opts := metav1.ListOptions{}
if labelSelector != nil {
opts.LabelSelector = labels.FormatLabels(labelSelector)
}
return client.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), opts)
return client.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), opts)
}
func ListPods(client *k8s.Clientset, namespace string, labelSelector map[string]string) (*core_v1.PodList, error) {
opts := v1.ListOptions{}
func ListPods(client *k8s.Clientset, namespace string, labelSelector map[string]string) (*corev1.PodList, error) {
opts := metav1.ListOptions{}
if labelSelector != nil {
opts.LabelSelector = labels.FormatLabels(labelSelector)
}
@ -41,8 +48,8 @@ func ListPods(client *k8s.Clientset, namespace string, labelSelector map[string]
// CheckPodExists returns a boolean representing the pod's existence and the namespace that the given pod resides in,
// or empty if not present in the given namespace.
func CheckPodExists(client *k8s.Clientset, namespace string, labelSelector map[string]string, deployName string) (bool, string) {
opts := v1.ListOptions{}
func CheckPodExists(client k8s.Interface, namespace string, labelSelector map[string]string, deployName string) (bool, string) {
opts := metav1.ListOptions{}
if labelSelector != nil {
opts.LabelSelector = labels.FormatLabels(labelSelector)
}
@ -53,7 +60,7 @@ func CheckPodExists(client *k8s.Clientset, namespace string, labelSelector map[s
}
for _, pod := range podList.Items {
if pod.Status.Phase == core_v1.PodRunning {
if pod.Status.Phase == corev1.PodRunning {
if strings.HasPrefix(pod.Name, deployName) {
return true, pod.Namespace
}
@ -61,3 +68,61 @@ func CheckPodExists(client *k8s.Clientset, namespace string, labelSelector map[s
}
return false, ""
}
func createPodWatcher(ctx context.Context, client k8s.Interface, namespace, appID string) (watch.Interface, error) {
labelSelector := fmt.Sprintf("%s=%s", daprAppIDKey, appID)
opts := metav1.ListOptions{
TypeMeta: metav1.TypeMeta{},
LabelSelector: labelSelector,
}
return client.CoreV1().Pods(namespace).Watch(ctx, opts)
}
func waitPodDeleted(ctx context.Context, client k8s.Interface, namespace, appID string) error {
watcher, err := createPodWatcher(ctx, client, namespace, appID)
if err != nil {
return fmt.Errorf("%s : %w", podWatchErrTemplate, err)
}
defer watcher.Stop()
for {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Deleted {
return nil
}
case <-ctx.Done():
return fmt.Errorf("error context cancelled while waiting for pod deletion: %w", context.Canceled)
}
}
}
func waitPodRunning(ctx context.Context, client k8s.Interface, namespace, appID string) error {
watcher, err := createPodWatcher(ctx, client, namespace, appID)
if err != nil {
return fmt.Errorf("%s : %w", podWatchErrTemplate, err)
}
defer watcher.Stop()
for {
select {
case event := <-watcher.ResultChan():
pod := event.Object.(*corev1.Pod)
if pod.Status.Phase == corev1.PodRunning {
return nil
} else if pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodUnknown {
return fmt.Errorf("error waiting for pod run: %w", errPodUnknown)
}
case <-ctx.Done():
return fmt.Errorf("error context cancelled while waiting for pod run: %w", context.Canceled)
}
}
}

View File

@ -112,7 +112,8 @@ func renewCertificate(rootCert, issuerCert, issuerKey []byte, timeout uint, imag
return err
}
daprChart, err := daprChart(daprVersion, "dapr", helmConf)
helmRepo := utils.GetEnv("DAPR_HELM_REPO_URL", daprHelmRepo)
daprChart, err := getHelmChart(daprVersion, "dapr", helmRepo, helmConf)
if err != nil {
return err
}

View File

@ -13,24 +13,416 @@ limitations under the License.
package kubernetes
// RunConfig represents the application configuration parameters.
type RunConfig struct {
AppID string
AppPort int
HTTPPort int
GRPCPort int
CodeDirectory string
Arguments []string
Image string
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
appV1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
k8s "k8s.io/client-go/kubernetes"
podsv1 "k8s.io/client-go/kubernetes/typed/core/v1"
// Specifically use k8s sig yaml to marshal into json, then convert to yaml.
k8sYaml "sigs.k8s.io/yaml"
"github.com/dapr/cli/pkg/print"
"github.com/dapr/cli/pkg/runfileconfig"
daprsyscall "github.com/dapr/cli/pkg/syscall"
"github.com/dapr/cli/utils"
)
const (
serviceKind = "Service"
deploymentKind = "Deployment"
serviceAPIVersion = "v1"
deploymentAPIVersion = "apps/v1"
loadBalanceType = "LoadBalancer"
daprEnableAnnotationKey = "dapr.io/enabled"
serviceFileName = "service.yaml"
deploymentFileName = "deployment.yaml"
appLabelKey = "app"
nameKey = "name"
labelsKey = "labels"
tcpProtocol = "TCP"
podCreationDeletionTimeout = 1 * time.Minute
)
type deploymentConfig struct {
Kind string `json:"kind"`
APIVersion string `json:"apiVersion"`
Metadata map[string]any `json:"metadata"`
Spec appV1.DeploymentSpec `json:"spec"`
}
// RunOutput represents the run output.
type RunOutput struct {
Message string
type serviceConfig struct {
Kind string `json:"kind"`
APIVersion string `json:"apiVersion"`
Metadata map[string]any `json:"metadata"`
Spec corev1.ServiceSpec `json:"spec"`
}
// Run executes the application based on the run configuration.
func Run(config *RunConfig) (*RunOutput, error) {
//nolint
return nil, nil
type runState struct {
serviceFilePath string
deploymentFilePath string
app runfileconfig.App
logCancel context.CancelFunc
}
// Run executes the application based on the run file configuration.
// Run creates a temporary `deploy` folder within the app/.dapr directory and then applies that to the context pointed to
// kubectl client.
func Run(runFilePath string, config runfileconfig.RunFileConfig) (bool, error) {
// At this point, we expect the runfile to be parsed and the values within config
// Validations and default setting will only be done after this point.
var exitWithError bool
// get k8s client PodsInterface.
client, cErr := Client()
if cErr != nil {
// exit with error.
return true, fmt.Errorf("error getting k8s client: %w", cErr)
}
namespace := corev1.NamespaceDefault
podsInterface := client.CoreV1().Pods(namespace)
// setup a monitoring context for shutdown call from another cli process.
monitoringContext, monitoringCancel := context.WithCancel(context.Background())
defer monitoringCancel()
// setup shutdown notify channel.
sigCh := make(chan os.Signal, 1)
daprsyscall.SetupShutdownNotify(sigCh)
runStates := []runState{}
for _, app := range config.Apps {
print.StatusEvent(os.Stdout, print.LogInfo, "Validating config and starting app %q", app.RunConfig.AppID)
// Set defaults if zero value provided in config yaml.
app.RunConfig.SetDefaultFromSchema()
// Validate validates the configs for k8s and modifies appId etc.
err := app.RunConfig.ValidateK8s()
if err != nil {
print.FailureStatusEvent(os.Stderr, "Error validating run config for app %q present in %s: %s", app.RunConfig.AppID, runFilePath, err.Error())
exitWithError = true
break
}
var svc serviceConfig
// create default service config.
if app.ContainerConfiguration.CreateService {
svc = createServiceConfig(app)
}
// create default deployment config.
dep := createDeploymentConfig(app)
if err != nil {
print.FailureStatusEvent(os.Stderr, "Error creating deployment file for app %q present in %s: %s", app.RunConfig.AppID, runFilePath, err.Error())
exitWithError = true
break
}
// overwrite <app-id>/.dapr/deploy/service.yaml.
// overwrite <app-id>/.dapr/deploy/deployment.yaml.
err = writeYamlFile(app, svc, dep)
if err != nil {
print.FailureStatusEvent(os.Stderr, "Error creating deployment/service yaml files: %s", err.Error())
exitWithError = true
break
}
deployDir := app.GetDeployDir()
print.InfoStatusEvent(os.Stdout, "Deploying app %q to Kubernetes", app.AppID)
serviceFilePath := filepath.Join(deployDir, serviceFileName)
deploymentFilePath := filepath.Join(deployDir, deploymentFileName)
rState := runState{}
if app.CreateService {
print.InfoStatusEvent(os.Stdout, "Deploying service YAML %q to Kubernetes", serviceFilePath)
err = deployYamlToK8s(serviceFilePath)
if err != nil {
print.FailureStatusEvent(os.Stderr, "Error deploying service yaml file %q : %s", serviceFilePath, err.Error())
exitWithError = true
break
}
rState.serviceFilePath = serviceFilePath
}
print.InfoStatusEvent(os.Stdout, "Deploying deployment YAML %q to Kubernetes", deploymentFilePath)
err = deployYamlToK8s(deploymentFilePath)
if err != nil {
print.FailureStatusEvent(os.Stderr, "Error deploying deployment yaml file %q : %s", deploymentFilePath, err.Error())
exitWithError = true
break
}
// create log files and save state.
err = app.CreateDaprdLogFile()
if err != nil {
print.StatusEvent(os.Stderr, print.LogFailure, "Error getting daprd log file for app %q present in %s: %s", app.AppID, runFilePath, err.Error())
exitWithError = true
break
}
err = app.CreateAppLogFile()
if err != nil {
print.StatusEvent(os.Stderr, print.LogFailure, "Error getting app log file for app %q present in %s: %s", app.AppID, runFilePath, err.Error())
exitWithError = true
break
}
daprdLogWriter := runfileconfig.GetLogWriter(app.DaprdLogWriteCloser, app.DaprdLogDestination)
// appDaprdWriter := runExec.GetAppDaprdWriter(app, false).
appLogWriter := runfileconfig.GetLogWriter(app.AppLogWriteCloser, app.AppLogDestination)
customAppLogWriter := print.CustomLogWriter{W: appLogWriter}
ctx, cancel := context.WithTimeout(context.Background(), podCreationDeletionTimeout)
err = waitPodRunning(ctx, client, namespace, app.AppID)
cancel()
if err != nil {
print.WarningStatusEvent(os.Stderr, "Error deploying pod to Kubernetes. See logs directly from Kubernetes command line.")
// Close the log files since there is deployment error, and the container might be in crash loop back off state.
app.CloseAppLogFile()
app.CloseDaprdLogFile()
} else {
logContext, cancel := context.WithCancel(context.Background())
rState.logCancel = cancel
err = setupLogs(logContext, app.AppID, daprdLogWriter, customAppLogWriter, podsInterface)
if err != nil {
print.StatusEvent(os.Stderr, print.LogWarning, "Error setting up logs for app %q present in %q . See logs directly from Kubernetes command line.: %s ", app.AppID, runFilePath, err.Error())
}
}
rState.deploymentFilePath = deploymentFilePath
rState.app = app
// append runSate only on successful k8s deploy.
runStates = append(runStates, rState)
print.InfoStatusEvent(os.Stdout, "Writing log files to directory : %s", app.GetLogsDir())
}
// If all apps have been started and there are no errors in starting the apps wait for signal from sigCh.
if !exitWithError {
print.InfoStatusEvent(os.Stdout, "Starting to monitor Kubernetes pods for deletion.")
go monitorK8sPods(monitoringContext, client, namespace, runStates, sigCh)
// After all apps started wait for sigCh.
<-sigCh
monitoringCancel()
print.InfoStatusEvent(os.Stdout, "Stopping Kubernetes pods monitoring.")
// To add a new line in Stdout.
fmt.Println()
print.InfoStatusEvent(os.Stdout, "Received signal to stop. Deleting K8s Dapr app deployments.")
}
closeErr := gracefullyShutdownK8sDeployment(runStates, client, namespace)
return exitWithError, closeErr
}
func createServiceConfig(app runfileconfig.App) serviceConfig {
return serviceConfig{
Kind: serviceKind,
APIVersion: serviceAPIVersion,
Metadata: map[string]any{
nameKey: app.RunConfig.AppID,
labelsKey: map[string]string{
appLabelKey: app.AppID,
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Protocol: tcpProtocol,
Port: 80,
TargetPort: intstr.FromInt(app.AppPort),
},
},
Selector: map[string]string{
appLabelKey: app.AppID,
},
Type: loadBalanceType,
},
}
}
func createDeploymentConfig(app runfileconfig.App) deploymentConfig {
replicas := int32(1)
dep := deploymentConfig{
Kind: deploymentKind,
APIVersion: deploymentAPIVersion,
Metadata: map[string]any{
nameKey: app.AppID,
},
}
dep.Spec = appV1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
appLabelKey: app.AppID,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
appLabelKey: app.AppID,
},
Annotations: app.RunConfig.GetAnnotations(),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: app.AppID,
Image: app.ContainerImage,
Env: getEnv(app),
ImagePullPolicy: corev1.PullAlways,
},
},
},
},
}
// Set dapr.io/enable annotation.
dep.Spec.Template.ObjectMeta.Annotations[daprEnableAnnotationKey] = "true"
// set containerPort only if app port is present.
if app.AppPort != 0 {
dep.Spec.Template.Spec.Containers[0].Ports = []corev1.ContainerPort{
{
ContainerPort: int32(app.AppPort),
},
}
}
return dep
}
func getEnv(app runfileconfig.App) []corev1.EnvVar {
envs := app.GetEnv()
envVars := make([]corev1.EnvVar, len(envs))
i := 0
for k, v := range app.GetEnv() {
envVars[i] = corev1.EnvVar{
Name: k,
Value: v,
}
i++
}
return envVars
}
func writeYamlFile(app runfileconfig.App, svc serviceConfig, dep deploymentConfig) error {
var yamlBytes []byte
var err error
var writeFile io.WriteCloser
deployDir := app.GetDeployDir()
if app.CreateService {
yamlBytes, err = k8sYaml.Marshal(svc)
if err != nil {
return fmt.Errorf("error marshalling service yaml: %w", err)
}
serviceFilePath := filepath.Join(deployDir, serviceFileName)
writeFile, err = os.Create(serviceFilePath)
if err != nil {
return fmt.Errorf("error creating file %s : %w", serviceFilePath, err)
}
_, err = writeFile.Write(yamlBytes)
if err != nil {
writeFile.Close()
return fmt.Errorf("error writing to file %s : %w", serviceFilePath, err)
}
writeFile.Close()
}
yamlBytes, err = k8sYaml.Marshal(dep)
if err != nil {
return fmt.Errorf("error marshalling deployment yaml: %w", err)
}
deploymentFilePath := filepath.Join(deployDir, deploymentFileName)
writeFile, err = os.Create(deploymentFilePath)
if err != nil {
return fmt.Errorf("error creating file %s : %w", deploymentFilePath, err)
}
_, err = writeFile.Write(yamlBytes)
if err != nil {
writeFile.Close()
return fmt.Errorf("error writing to file %s : %w", deploymentFilePath, err)
}
writeFile.Close()
return nil
}
func deployYamlToK8s(yamlToDeployPath string) error {
_, err := utils.RunCmdAndWait("kubectl", "apply", "-f", yamlToDeployPath)
if err != nil {
return fmt.Errorf("error deploying the yaml %s to Kubernetes: %w", yamlToDeployPath, err)
}
return nil
}
func deleteYamlK8s(yamlToDeletePath string) error {
print.InfoStatusEvent(os.Stdout, "Deleting %q from Kubernetes", yamlToDeletePath)
_, err := utils.RunCmdAndWait("kubectl", "delete", "-f", yamlToDeletePath)
if err != nil {
return fmt.Errorf("error deploying the yaml %s to Kubernetes: %w", yamlToDeletePath, err)
}
return nil
}
func setupLogs(ctx context.Context, appID string, daprdLogWriter, appLogWriter io.Writer, podInterface podsv1.PodInterface) error {
return streamContainerLogsToDisk(ctx, appID, appLogWriter, daprdLogWriter, podInterface)
}
func gracefullyShutdownK8sDeployment(runStates []runState, client k8s.Interface, namespace string) error {
errs := make([]error, 0, len(runStates)*4)
for _, r := range runStates {
if len(r.serviceFilePath) != 0 {
errs = append(errs, deleteYamlK8s(r.serviceFilePath))
}
errs = append(errs, deleteYamlK8s(r.deploymentFilePath))
labelSelector := map[string]string{
daprAppIDKey: r.app.AppID,
}
if ok, _ := CheckPodExists(client, namespace, labelSelector, r.app.AppID); ok {
ctx, cancel := context.WithTimeout(context.Background(), podCreationDeletionTimeout)
err := waitPodDeleted(ctx, client, namespace, r.app.AppID)
cancel()
if err != nil {
// swallowing err here intentionally.
print.WarningStatusEvent(os.Stderr, "Error waiting for pods to be deleted. Final logs might only be partially available.")
}
}
// shutdown logs.
r.logCancel()
errs = append(errs, r.app.CloseAppLogFile(), r.app.CloseDaprdLogFile())
}
return errors.Join(errs...)
}
func monitorK8sPods(ctx context.Context, client k8s.Interface, namespace string, runStates []runState, sigCh chan os.Signal) {
// for each app wait for pod to be deleted, if all pods are deleted, then send shutdown signal to the cli process.
wg := sync.WaitGroup{}
for _, r := range runStates {
go func(appID string, wg *sync.WaitGroup) {
err := waitPodDeleted(ctx, client, namespace, r.app.AppID)
if err != nil && strings.Contains(err.Error(), podWatchErrTemplate) {
print.WarningStatusEvent(os.Stderr, "Error monitoring Kubernetes pod(s) for app %q.", appID)
}
wg.Done()
}(r.app.AppID, &wg)
wg.Add(1)
}
wg.Wait()
// Send signal to gracefully close log writers and shut down process.
sigCh <- syscall.SIGINT
}

51
pkg/kubernetes/stop.go Normal file
View File

@ -0,0 +1,51 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubernetes
import (
"context"
"errors"
"fmt"
"path/filepath"
corev1 "k8s.io/api/core/v1"
"github.com/dapr/cli/pkg/runfileconfig"
)
func Stop(runFilePath string, config runfileconfig.RunFileConfig) error {
errs := []error{}
// get k8s client.
client, cErr := Client()
if cErr != nil {
return fmt.Errorf("error getting k8s client for monitoring pod deletion: %w", cErr)
}
namespace := corev1.NamespaceDefault
for _, app := range config.Apps {
deployDir := app.GetDeployDir()
serviceFilePath := filepath.Join(deployDir, serviceFileName)
deploymentFilePath := filepath.Join(deployDir, deploymentFileName)
if app.CreateService {
errs = append(errs, deleteYamlK8s(serviceFilePath))
}
errs = append(errs, deleteYamlK8s(deploymentFilePath))
ctx, cancel := context.WithTimeout(context.Background(), podCreationDeletionTimeout)
// Ignoring errors here as it will anyway be printed in the other dapr cli process.
waitPodDeleted(ctx, client, namespace, app.AppID)
cancel()
}
return errors.Join(errs...)
}

View File

@ -0,0 +1,11 @@
version: 1
common:
apps:
- appDirPath: ./nodeapp/
appPort: 3000
containerImage: ghcr.io/dapr/samples/hello-k8s-node:latest
createService: true
env:
APP_PORT: 3000
- appDirPath: ./pythonapp/
containerImage: ghcr.io/dapr/samples/hello-k8s-python:latest

View File

@ -24,7 +24,7 @@ import (
)
// Uninstall removes Dapr from a Kubernetes cluster.
func Uninstall(namespace string, uninstallAll bool, timeout uint) error {
func Uninstall(namespace string, uninstallAll bool, uninstallDev bool, timeout uint) error {
config, err := helmConfig(namespace)
if err != nil {
return err
@ -48,6 +48,11 @@ func Uninstall(namespace string, uninstallAll bool, timeout uint) error {
// Deleting Dashboard here is for versions >= 1.11.
uninstallClient.Run(dashboardReleaseName)
if uninstallDev {
// uninstall dapr-dev-zipkin and dapr-dev-redis as best effort.
uninstallThirdParty()
}
_, err = uninstallClient.Run(daprReleaseName)
if err != nil {
@ -65,3 +70,14 @@ func Uninstall(namespace string, uninstallAll bool, timeout uint) error {
return nil
}
func uninstallThirdParty() {
print.InfoStatusEvent(os.Stdout, "Removing dapr-dev-redis and dapr-dev-zipkin from the cluster...")
// Uninstall dapr-dev-redis and dapr-dev-zipkin from k8s as best effort.
config, _ := helmConfig(thirdPartyDevNamespace)
uninstallClient := helm.NewUninstall(config)
uninstallClient.Run(redisReleaseName)
uninstallClient.Run(zipkinReleaseName)
}

View File

@ -57,6 +57,7 @@ type UpgradeConfig struct {
}
func Upgrade(conf UpgradeConfig) error {
helmRepo := utils.GetEnv("DAPR_HELM_REPO_URL", daprHelmRepo)
status, err := GetDaprResourcesStatus()
if err != nil {
return err
@ -75,7 +76,7 @@ func Upgrade(conf UpgradeConfig) error {
return err
}
controlPlaneChart, err := daprChart(conf.RuntimeVersion, "dapr", helmConf)
controlPlaneChart, err := getHelmChart(conf.RuntimeVersion, "dapr", helmRepo, helmConf)
if err != nil {
return err
}
@ -109,7 +110,7 @@ func Upgrade(conf UpgradeConfig) error {
var dashboardChart *chart.Chart
if conf.DashboardVersion != "" {
dashboardChart, err = daprChart(conf.DashboardVersion, dashboardReleaseName, helmConf)
dashboardChart, err = getHelmChart(conf.DashboardVersion, dashboardReleaseName, helmRepo, helmConf)
if err != nil {
return err
}
@ -176,7 +177,7 @@ func Upgrade(conf UpgradeConfig) error {
}
} else {
// We need to install Dashboard since it does not exist yet.
err = install(dashboardReleaseName, conf.DashboardVersion, InitConfiguration{
err = install(dashboardReleaseName, conf.DashboardVersion, helmRepo, InitConfiguration{
DashboardVersion: conf.DashboardVersion,
Namespace: upgradeClient.Namespace,
Wait: upgradeClient.Wait,

View File

@ -16,8 +16,10 @@ package runexec
import (
"fmt"
"io"
"os"
"os/exec"
"github.com/dapr/cli/pkg/runfileconfig"
"github.com/dapr/cli/pkg/standalone"
)
@ -129,3 +131,26 @@ func NewOutput(config *standalone.RunConfig) (*RunOutput, error) {
DaprGRPCPort: config.GRPCPort,
}, nil
}
// GetAppDaprdWriter returns the writer for writing logs common to both daprd, app and stdout.
func GetAppDaprdWriter(app runfileconfig.App, isAppCommandEmpty bool) io.Writer {
var appDaprdWriter io.Writer
if isAppCommandEmpty {
if app.DaprdLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(os.Stdout, app.DaprdLogWriteCloser)
} else {
appDaprdWriter = os.Stdout
}
} else {
if app.AppLogDestination != standalone.Console && app.DaprdLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(app.AppLogWriteCloser, app.DaprdLogWriteCloser, os.Stdout)
} else if app.AppLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(app.AppLogWriteCloser, os.Stdout)
} else if app.DaprdLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(app.DaprdLogWriteCloser, os.Stdout)
} else {
appDaprdWriter = os.Stdout
}
}
return appDaprdWriter
}

View File

@ -27,6 +27,7 @@ const (
daprdLogFileNamePrefix = "daprd"
logFileExtension = ".log"
logsDir = "logs"
deployDir = "deploy"
)
// RunFileConfig represents the complete configuration options for the run file.
@ -38,14 +39,21 @@ type RunFileConfig struct {
Name string `yaml:"name,omitempty"`
}
// ContainerConfiguration represents the application container configuration parameters.
type ContainerConfiguration struct {
ContainerImage string `yaml:"containerImage"`
CreateService bool `yaml:"createService"`
}
// App represents the configuration options for the apps in the run file.
type App struct {
standalone.RunConfig `yaml:",inline"`
AppDirPath string `yaml:"appDirPath"`
AppLogFileName string
DaprdLogFileName string
AppLogWriteCloser io.WriteCloser
DaprdLogWriteCloser io.WriteCloser
standalone.RunConfig `yaml:",inline"`
ContainerConfiguration `yaml:",inline"`
AppDirPath string `yaml:"appDirPath"`
AppLogFileName string
DaprdLogFileName string
AppLogWriteCloser io.WriteCloser
DaprdLogWriteCloser io.WriteCloser
}
// Common represents the configuration options for the common section in the run file.
@ -59,6 +67,12 @@ func (a *App) GetLogsDir() string {
return logsPath
}
func (a *App) GetDeployDir() string {
logsPath := filepath.Join(a.AppDirPath, standalone.DefaultDaprDirName, deployDir)
os.MkdirAll(logsPath, 0o755)
return logsPath
}
// CreateAppLogFile creates the log file, sets internal file handle
// and returns error if any.
func (a *App) CreateAppLogFile() error {
@ -104,14 +118,32 @@ func (a *App) createLogFile(logType string) (*os.File, error) {
func (a *App) CloseAppLogFile() error {
if a.AppLogWriteCloser != nil {
return a.AppLogWriteCloser.Close()
err := a.AppLogWriteCloser.Close()
a.AppLogWriteCloser = nil
return err
}
return nil
}
func (a *App) CloseDaprdLogFile() error {
if a.DaprdLogWriteCloser != nil {
return a.DaprdLogWriteCloser.Close()
err := a.DaprdLogWriteCloser.Close()
a.DaprdLogWriteCloser = nil
return err
}
return nil
}
// GetLogWriter returns the log writer based on the log destination.
func GetLogWriter(fileLogWriterCloser io.WriteCloser, logDestination standalone.LogDestType) io.Writer {
var logWriter io.Writer
switch logDestination {
case standalone.Console:
logWriter = os.Stdout
case standalone.File:
logWriter = fileLogWriterCloser
case standalone.FileAndConsole:
logWriter = io.MultiWriter(os.Stdout, fileLogWriterCloser)
}
return logWriter
}

View File

@ -25,13 +25,13 @@ import (
)
var (
validRunFilePath = filepath.Join("..", "testdata", "runfileconfig", "test_run_config.yaml")
invalidRunFilePath1 = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_invalid_path.yaml")
invalidRunFilePath2 = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_empty_app_dir.yaml")
runFileForPrecedenceRule = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_precedence_rule.yaml")
runFileForPrecedenceRuleDaprDir = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_precedence_rule_dapr_dir.yaml")
runFileForLogDestination = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_log_destination.yaml")
runFileForMultiResourcePaths = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_multiple_resources_paths.yaml")
validRunFilePath = filepath.Join(".", "testdata", "test_run_config.yaml")
invalidRunFilePath1 = filepath.Join(".", "testdata", "test_run_config_invalid_path.yaml")
invalidRunFilePath2 = filepath.Join(".", "testdata", "test_run_config_empty_app_dir.yaml")
runFileForPrecedenceRule = filepath.Join(".", "testdata", "test_run_config_precedence_rule.yaml")
runFileForPrecedenceRuleDaprDir = filepath.Join(".", "testdata", "test_run_config_precedence_rule_dapr_dir.yaml")
runFileForLogDestination = filepath.Join(".", "testdata", "test_run_config_log_destination.yaml")
runFileForMultiResourcePaths = filepath.Join(".", "testdata", "test_run_config_multiple_resources_paths.yaml")
)
func TestRunConfigFile(t *testing.T) {

View File

View File

View File

@ -47,39 +47,46 @@ const (
// RunConfig represents the application configuration parameters.
type RunConfig struct {
SharedRunConfig `yaml:",inline"`
AppID string `env:"APP_ID" arg:"app-id" yaml:"appID"`
AppID string `env:"APP_ID" arg:"app-id" annotation:"dapr.io/app-id" yaml:"appID"`
AppChannelAddress string `env:"APP_CHANNEL_ADDRESS" arg:"app-channel-address" ifneq:"127.0.0.1" yaml:"appChannelAddress"`
AppPort int `env:"APP_PORT" arg:"app-port" yaml:"appPort" default:"-1"`
AppPort int `env:"APP_PORT" arg:"app-port" annotation:"dapr.io/app-port" yaml:"appPort" default:"-1"`
HTTPPort int `env:"DAPR_HTTP_PORT" arg:"dapr-http-port" yaml:"daprHTTPPort" default:"-1"`
GRPCPort int `env:"DAPR_GRPC_PORT" arg:"dapr-grpc-port" yaml:"daprGRPCPort" default:"-1"`
ProfilePort int `arg:"profile-port" yaml:"profilePort" default:"-1"`
Command []string `yaml:"command"`
MetricsPort int `env:"DAPR_METRICS_PORT" arg:"metrics-port" yaml:"metricsPort" default:"-1"`
UnixDomainSocket string `arg:"unix-domain-socket" yaml:"unixDomainSocket"`
MetricsPort int `env:"DAPR_METRICS_PORT" arg:"metrics-port" annotation:"dapr.io/metrics-port" yaml:"metricsPort" default:"-1"`
UnixDomainSocket string `arg:"unix-domain-socket" annotation:"dapr.io/unix-domain-socket-path" yaml:"unixDomainSocket"`
InternalGRPCPort int `arg:"dapr-internal-grpc-port" yaml:"daprInternalGRPCPort" default:"-1"`
}
// SharedRunConfig represents the application configuration parameters, which can be shared across many apps.
type SharedRunConfig struct {
ConfigFile string `arg:"config" yaml:"configFilePath"`
AppProtocol string `arg:"app-protocol" yaml:"appProtocol" default:"http"`
APIListenAddresses string `arg:"dapr-listen-addresses" yaml:"apiListenAddresses"`
EnableProfiling bool `arg:"enable-profiling" yaml:"enableProfiling"`
LogLevel string `arg:"log-level" yaml:"logLevel"`
MaxConcurrency int `arg:"app-max-concurrency" yaml:"appMaxConcurrency" default:"-1"`
PlacementHostAddr string `arg:"placement-host-address" yaml:"placementHostAddress"`
ComponentsPath string `arg:"components-path"` // Deprecated in run template file: use ResourcesPaths instead.
ResourcesPath string `yaml:"resourcesPath"` // Deprecated in run template file: use ResourcesPaths instead.
ResourcesPaths []string `arg:"resources-path" yaml:"resourcesPaths"`
AppSSL bool `arg:"app-ssl" yaml:"appSSL"`
MaxRequestBodySize int `arg:"dapr-http-max-request-size" yaml:"daprHTTPMaxRequestSize" default:"-1"`
HTTPReadBufferSize int `arg:"dapr-http-read-buffer-size" yaml:"daprHTTPReadBufferSize" default:"-1"`
EnableAppHealth bool `arg:"enable-app-health-check" yaml:"enableAppHealthCheck"`
AppHealthPath string `arg:"app-health-check-path" yaml:"appHealthCheckPath"`
AppHealthInterval int `arg:"app-health-probe-interval" ifneq:"0" yaml:"appHealthProbeInterval"`
AppHealthTimeout int `arg:"app-health-probe-timeout" ifneq:"0" yaml:"appHealthProbeTimeout"`
AppHealthThreshold int `arg:"app-health-threshold" ifneq:"0" yaml:"appHealthThreshold"`
EnableAPILogging bool `arg:"enable-api-logging" yaml:"enableApiLogging"`
// Specifically omitted from annotations see https://github.com/dapr/cli/issues/1324
ConfigFile string `arg:"config" yaml:"configFilePath"`
AppProtocol string `arg:"app-protocol" annotation:"dapr.io/app-protocol" yaml:"appProtocol" default:"http"`
APIListenAddresses string `arg:"dapr-listen-addresses" annotation:"dapr.io/sidecar-listen-address" yaml:"apiListenAddresses"`
EnableProfiling bool `arg:"enable-profiling" annotation:"dapr.io/enable-profiling" yaml:"enableProfiling"`
LogLevel string `arg:"log-level" annotation:"dapr.io.log-level" yaml:"logLevel"`
MaxConcurrency int `arg:"app-max-concurrency" annotation:"dapr.io/app-max-concurrerncy" yaml:"appMaxConcurrency" default:"-1"`
// Speicifcally omitted from annotations similar to config file path above.
PlacementHostAddr string `arg:"placement-host-address" yaml:"placementHostAddress"`
// Speicifcally omitted from annotations similar to config file path above.
ComponentsPath string `arg:"components-path"` // Deprecated in run template file: use ResourcesPaths instead.
// Speicifcally omitted from annotations similar to config file path above.
ResourcesPath string `yaml:"resourcesPath"` // Deprecated in run template file: use ResourcesPaths instead.
// Speicifcally omitted from annotations similar to config file path above.
ResourcesPaths []string `arg:"resources-path" yaml:"resourcesPaths"`
// Speicifcally omitted from annotations as appSSL is deprecated.
AppSSL bool `arg:"app-ssl" yaml:"appSSL"`
MaxRequestBodySize int `arg:"dapr-http-max-request-size" annotation:"dapr.io/http-max-request-size" yaml:"daprHTTPMaxRequestSize" default:"-1"`
HTTPReadBufferSize int `arg:"dapr-http-read-buffer-size" annotation:"dapr.io/http-read-buffer-size" yaml:"daprHTTPReadBufferSize" default:"-1"`
EnableAppHealth bool `arg:"enable-app-health-check" annotation:"dapr.io/enable-app-health-check" yaml:"enableAppHealthCheck"`
AppHealthPath string `arg:"app-health-check-path" annotation:"dapr.io/app-health-check-path" yaml:"appHealthCheckPath"`
AppHealthInterval int `arg:"app-health-probe-interval" annotation:"dapr.io/app-health-probe-interval" ifneq:"0" yaml:"appHealthProbeInterval"`
AppHealthTimeout int `arg:"app-health-probe-timeout" annotation:"dapr.io/app-health-probe-timeout" ifneq:"0" yaml:"appHealthProbeTimeout"`
AppHealthThreshold int `arg:"app-health-threshold" annotation:"dapr.io/app-health-threshold" ifneq:"0" yaml:"appHealthThreshold"`
EnableAPILogging bool `arg:"enable-api-logging" annotation:"dapr.io/enable-api-logging" yaml:"enableApiLogging"`
// Specifically omitted from annotations see https://github.com/dapr/cli/issues/1324 .
DaprdInstallPath string `yaml:"runtimePath"`
Env map[string]string `yaml:"env"`
DaprdLogDestination LogDestType `yaml:"daprdLogDestination"`
@ -213,6 +220,36 @@ func (config *RunConfig) Validate() error {
return nil
}
func (config *RunConfig) ValidateK8s() error {
meta, err := newDaprMeta()
if err != nil {
return err
}
if config.AppID == "" {
config.AppID = meta.newAppID()
}
if config.AppPort < 0 {
config.AppPort = 0
}
err = config.validatePort("MetricsPort", &config.MetricsPort, meta)
if err != nil {
return err
}
if config.MaxConcurrency < 1 {
config.MaxConcurrency = -1
}
if config.MaxRequestBodySize < 0 {
config.MaxRequestBodySize = -1
}
if config.HTTPReadBufferSize < 0 {
config.HTTPReadBufferSize = -1
}
return nil
}
type DaprMeta struct {
ExistingIDs map[string]bool
ExistingPorts map[int]bool
@ -409,6 +446,50 @@ func (config *RunConfig) getAppProtocol() string {
}
}
func (config *RunConfig) GetEnv() map[string]string {
env := map[string]string{}
schema := reflect.ValueOf(*config)
for i := 0; i < schema.NumField(); i++ {
valueField := schema.Field(i).Interface()
typeField := schema.Type().Field(i)
key := typeField.Tag.Get("env")
if len(key) == 0 {
continue
}
if value, ok := valueField.(int); ok && value <= 0 {
// ignore unset numeric variables.
continue
}
value := fmt.Sprintf("%v", reflect.ValueOf(valueField))
env[key] = value
}
for k, v := range config.Env {
env[k] = v
}
return env
}
func (config *RunConfig) GetAnnotations() map[string]string {
annotations := map[string]string{}
schema := reflect.ValueOf(*config)
for i := 0; i < schema.NumField(); i++ {
valueField := schema.Field(i).Interface()
typeField := schema.Type().Field(i)
key := typeField.Tag.Get("annotation")
if len(key) == 0 {
continue
}
if value, ok := valueField.(int); ok && value <= 0 {
// ignore unset numeric variables.
continue
}
value := fmt.Sprintf("%v", reflect.ValueOf(valueField))
annotations[key] = value
}
return annotations
}
func GetDaprCommand(config *RunConfig) (*exec.Cmd, error) {
daprCMD, err := lookupBinaryFilePath(config.DaprdInstallPath, "daprd")
if err != nil {

View File

@ -48,6 +48,10 @@ const (
numHAPods = 13
numNonHAPods = 5
thirdPartyDevNamespace = "default"
devRedisReleaseName = "dapr-dev-redis"
devZipkinReleaseName = "dapr-dev-zipkin"
)
type VersionDetails struct {
@ -61,6 +65,7 @@ type VersionDetails struct {
}
type TestOptions struct {
DevEnabled bool
HAEnabled bool
MTLSEnabled bool
ApplyComponentChanges bool
@ -189,12 +194,12 @@ func GetTestsOnInstall(details VersionDetails, opts TestOptions) []TestCase {
func GetTestsOnUninstall(details VersionDetails, opts TestOptions) []TestCase {
return []TestCase{
{"uninstall " + details.RuntimeVersion, uninstallTest(opts.UninstallAll)}, // waits for pod deletion.
{"uninstall " + details.RuntimeVersion, uninstallTest(opts.UninstallAll, opts.DevEnabled)}, // waits for pod deletion.
{"cluster not exist", kubernetesTestOnUninstall()},
{"crds exist on uninstall " + details.RuntimeVersion, CRDTest(details, opts)},
{"clusterroles not exist " + details.RuntimeVersion, ClusterRolesTest(details, opts)},
{"clusterrolebindings not exist " + details.RuntimeVersion, ClusterRoleBindingsTest(details, opts)},
{"check components exist on uninstall " + details.RuntimeVersion, componentsTestOnUninstall(opts.UninstallAll)},
{"check components exist on uninstall " + details.RuntimeVersion, componentsTestOnUninstall(opts)},
{"check httpendpoints exist on uninstall " + details.RuntimeVersion, httpEndpointsTestOnUninstall(opts)},
{"check mtls error " + details.RuntimeVersion, uninstallMTLSTest()},
{"check status error " + details.RuntimeVersion, statusTestOnUninstall()},
@ -293,13 +298,19 @@ func ComponentsTestOnInstallUpgrade(opts TestOptions) func(t *testing.T) {
output, err = spawn.Command("kubectl", "apply", "-f", "../testdata/statestore.yaml")
t.Log(output)
require.NoError(t, err, "expected no error on kubectl apply")
require.Equal(t, "component.dapr.io/statestore created\ncomponent.dapr.io/statestore created\n", output, "expceted output to match")
// if Dev install, statestore in default namespace will already be created as part of dev install once, so the above command output will be
// changed to statestore configured for the default namespace statestore.
if opts.DevEnabled {
require.Equal(t, "component.dapr.io/statestore configured\ncomponent.dapr.io/statestore created\n", output, "expceted output to match")
} else {
require.Equal(t, "component.dapr.io/statestore created\ncomponent.dapr.io/statestore created\n", output, "expceted output to match")
}
}
t.Log("check applied component exists")
output, err := spawn.Command(daprPath, "components", "-k")
require.NoError(t, err, "expected no error on calling dapr components")
componentOutputCheck(t, output, false)
componentOutputCheck(t, opts, output)
}
}
@ -747,6 +758,10 @@ func installTest(details VersionDetails, opts TestOptions) func(t *testing.T) {
"-n", DaprTestNamespace,
"--log-as-json",
}
if opts.DevEnabled {
t.Log("install dev mode")
args = append(args, "--dev")
}
if !details.UseDaprLatestVersion {
// TODO: Pass dashboard-version also when charts are released.
args = append(args, "--runtime-version", details.RuntimeVersion)
@ -776,10 +791,13 @@ func installTest(details VersionDetails, opts TestOptions) func(t *testing.T) {
require.NoError(t, err, "init failed")
validatePodsOnInstallUpgrade(t, details)
if opts.DevEnabled {
validateThirdpartyPodsOnInit(t)
}
}
}
func uninstallTest(all bool) func(t *testing.T) {
func uninstallTest(all bool, devEnabled bool) func(t *testing.T) {
return func(t *testing.T) {
output, err := EnsureUninstall(all)
t.Log(output)
@ -792,11 +810,23 @@ func uninstallTest(all bool) func(t *testing.T) {
go waitPodDeletion(t, done, podsDeleted)
select {
case <-podsDeleted:
t.Log("pods were deleted as expected on uninstall")
return
t.Log("dapr pods were deleted as expected on uninstall")
case <-time.After(2 * time.Minute):
done <- struct{}{}
t.Error("timeout verifying pods were deleted as expectedx")
t.Error("timeout verifying pods were deleted as expected")
return
}
if devEnabled {
t.Log("waiting for dapr dev pods to be deleted")
go waitPodDeletionDev(t, done, podsDeleted)
select {
case <-podsDeleted:
t.Log("dapr dev pods were deleted as expected on uninstall dev")
return
case <-time.After(2 * time.Minute):
done <- struct{}{}
t.Error("timeout verifying pods were deleted as expected")
}
}
}
}
@ -823,7 +853,7 @@ func uninstallMTLSTest() func(t *testing.T) {
}
}
func componentsTestOnUninstall(all bool) func(t *testing.T) {
func componentsTestOnUninstall(opts TestOptions) func(t *testing.T) {
return func(t *testing.T) {
daprPath := GetDaprPath()
// On Dapr uninstall CRDs are not removed, consequently the components will not be removed.
@ -831,10 +861,10 @@ func componentsTestOnUninstall(all bool) func(t *testing.T) {
// For now the components remain.
output, err := spawn.Command(daprPath, "components", "-k")
require.NoError(t, err, "expected no error on calling dapr components")
componentOutputCheck(t, output, all)
componentOutputCheck(t, opts, output)
// If --all, then the below does not need to run.
if all {
if opts.UninstallAll {
output, err = spawn.Command("kubectl", "delete", "-f", "../testdata/namespace.yaml")
require.NoError(t, err, "expected no error on kubectl delete")
t.Log(output)
@ -898,29 +928,37 @@ func statusTestOnUninstall() func(t *testing.T) {
}
}
func componentOutputCheck(t *testing.T, output string, all bool) {
func componentOutputCheck(t *testing.T, opts TestOptions, output string) {
output = strings.TrimSpace(output) // remove empty string.
lines := strings.Split(output, "\n")
for i, line := range lines {
t.Logf("num:%d line:%+v", i, line)
}
if all {
if opts.UninstallAll {
assert.Equal(t, 2, len(lines), "expected at 0 components and 2 output lines")
return
}
lines = strings.Split(output, "\n")[2:] // remove header and warning message.
assert.Equal(t, 2, len(lines), "expected 2 components") // default and test namespace components.
if opts.DevEnabled {
// default, test statestore.
// default pubsub.
// 3 components.
assert.Equal(t, 3, len(lines), "expected 3 components")
} else {
assert.Equal(t, 2, len(lines), "expected 2 components") // default and test namespace components.
// for fresh cluster only one component yaml has been applied.
testNsFields := strings.Fields(lines[0])
defaultNsFields := strings.Fields(lines[1])
// for fresh cluster only one component yaml has been applied.
testNsFields := strings.Fields(lines[0])
defaultNsFields := strings.Fields(lines[1])
// Fields splits on space, so Created time field might be split again.
namespaceComponentOutputCheck(t, testNsFields, "test")
namespaceComponentOutputCheck(t, defaultNsFields, "default")
// Fields splits on space, so Created time field might be split again.
// Scopes are only applied in for this scenario in tests.
namespaceComponentOutputCheck(t, testNsFields, "test")
namespaceComponentOutputCheck(t, defaultNsFields, "default")
}
}
func namespaceComponentOutputCheck(t *testing.T, fields []string, namespace string) {
@ -943,6 +981,41 @@ func httpEndpointOutputCheck(t *testing.T, output string) {
assert.Contains(t, output, "httpendpoint")
}
func validateThirdpartyPodsOnInit(t *testing.T) {
ctx := context.Background()
ctxt, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
k8sClient, err := getClient()
require.NoError(t, err)
list, err := k8sClient.CoreV1().Pods(thirdPartyDevNamespace).List(ctxt, v1.ListOptions{
Limit: 100,
})
require.NoError(t, err)
notFound := map[string]struct{}{
devRedisReleaseName: {},
devZipkinReleaseName: {},
}
prefixes := map[string]string{
devRedisReleaseName: "dapr-dev-redis-master-",
devZipkinReleaseName: "dapr-dev-zipkin-",
}
for _, pod := range list.Items {
t.Log(pod.ObjectMeta.Name)
for component, prefix := range prefixes {
if pod.Status.Phase != core_v1.PodRunning {
continue
}
if !pod.Status.ContainerStatuses[0].Ready {
continue
}
if strings.HasPrefix(pod.ObjectMeta.Name, prefix) {
delete(notFound, component)
}
}
}
assert.Empty(t, notFound)
}
func validatePodsOnInstallUpgrade(t *testing.T, details VersionDetails) {
ctx := context.Background()
ctxt, cancel := context.WithTimeout(ctx, 10*time.Second)
@ -1010,6 +1083,52 @@ func validatePodsOnInstallUpgrade(t *testing.T, details VersionDetails) {
assert.Empty(t, notFound)
}
func waitPodDeletionDev(t *testing.T, done, podsDeleted chan struct{}) {
for {
select {
case <-done: // if timeout was reached.
return
default:
break
}
ctx := context.Background()
ctxt, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
k8sClient, err := getClient()
require.NoError(t, err, "error getting k8s client for pods check")
list, err := k8sClient.CoreV1().Pods(thirdPartyDevNamespace).List(ctxt, v1.ListOptions{
Limit: 100,
})
require.NoError(t, err)
found := map[string]struct{}{
devRedisReleaseName: {},
devZipkinReleaseName: {},
}
prefixes := map[string]string{
devRedisReleaseName: "dapr-dev-redis-master-",
devZipkinReleaseName: "dapr-dev-zipkin-",
}
for _, pod := range list.Items {
t.Log(pod.ObjectMeta.Name)
for component, prefix := range prefixes {
if pod.Status.Phase != core_v1.PodRunning {
continue
}
if !pod.Status.ContainerStatuses[0].Ready {
continue
}
if strings.HasPrefix(pod.ObjectMeta.Name, prefix) {
delete(found, component)
}
}
}
if len(found) == 2 {
podsDeleted <- struct{}{}
}
time.Sleep(15 * time.Second)
}
}
func waitPodDeletion(t *testing.T, done, podsDeleted chan struct{}) {
for {
select {

View File

@ -118,6 +118,40 @@ func TestKubernetesHAModeMTLSDisabled(t *testing.T) {
}
}
func TestKubernetesDev(t *testing.T) {
// ensure clean env for test
ensureCleanEnv(t, false)
// setup tests
tests := []common.TestCase{}
tests = append(tests, common.GetTestsOnInstall(currentVersionDetails, common.TestOptions{
DevEnabled: true,
HAEnabled: false,
MTLSEnabled: true,
ApplyComponentChanges: true,
CheckResourceExists: map[common.Resource]bool{
common.CustomResourceDefs: true,
common.ClusterRoles: true,
common.ClusterRoleBindings: true,
},
})...)
tests = append(tests, common.GetTestsOnUninstall(currentVersionDetails, common.TestOptions{
DevEnabled: true,
UninstallAll: true,
CheckResourceExists: map[common.Resource]bool{
common.CustomResourceDefs: false,
common.ClusterRoles: false,
common.ClusterRoleBindings: false,
},
})...)
// execute tests
for _, tc := range tests {
t.Run(tc.Name, tc.Callable)
}
}
func TestKubernetesNonHAModeMTLSEnabled(t *testing.T) {
// ensure clean env for test
ensureCleanEnv(t, false)