initial commit for multi-app run k8s impl

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
This commit is contained in:
Mukundan Sundararajan 2023-08-03 12:22:20 +05:30
parent 60f50e3497
commit 7b55500324
39 changed files with 1214 additions and 143 deletions

2
.gitignore vendored
View File

@ -24,6 +24,8 @@ cli
# CLI's auto-generated components directory # CLI's auto-generated components directory
**/components **/components
# Auto generated deploy dir inside .dapr directory
**/.dapr/deploy
# Auto generated logs dir inside .dapr directory # Auto generated logs dir inside .dapr directory
**/.dapr/logs **/.dapr/logs

View File

@ -33,6 +33,7 @@ var (
wait bool wait bool
timeout uint timeout uint
slimMode bool slimMode bool
devMode bool
runtimeVersion string runtimeVersion string
dashboardVersion string dashboardVersion string
allNamespaces bool allNamespaces bool
@ -127,6 +128,7 @@ dapr init --runtime-path <path-to-install-directory>
DashboardVersion: dashboardVersion, DashboardVersion: dashboardVersion,
EnableMTLS: enableMTLS, EnableMTLS: enableMTLS,
EnableHA: enableHA, EnableHA: enableHA,
EnableDev: devMode,
Args: values, Args: values,
Wait: wait, Wait: wait,
Timeout: timeout, Timeout: timeout,
@ -202,6 +204,7 @@ func init() {
defaultContainerRuntime := string(utils.DOCKER) defaultContainerRuntime := string(utils.DOCKER)
InitCmd.Flags().BoolVarP(&kubernetesMode, "kubernetes", "k", false, "Deploy Dapr to a Kubernetes cluster") InitCmd.Flags().BoolVarP(&kubernetesMode, "kubernetes", "k", false, "Deploy Dapr to a Kubernetes cluster")
InitCmd.Flags().BoolVarP(&devMode, "dev", "", false, "Use Dev mode. Deploy Redis, Zipkin alson in the Kubernetes cluster")
InitCmd.Flags().BoolVarP(&wait, "wait", "", false, "Wait for Kubernetes initialization to complete") InitCmd.Flags().BoolVarP(&wait, "wait", "", false, "Wait for Kubernetes initialization to complete")
InitCmd.Flags().UintVarP(&timeout, "timeout", "", 300, "The wait timeout for the Kubernetes installation") InitCmd.Flags().UintVarP(&timeout, "timeout", "", 300, "The wait timeout for the Kubernetes installation")
InitCmd.Flags().BoolVarP(&slimMode, "slim", "s", false, "Exclude placement service, Redis and Zipkin containers from self-hosted installation") InitCmd.Flags().BoolVarP(&slimMode, "slim", "s", false, "Exclude placement service, Redis and Zipkin containers from self-hosted installation")

View File

@ -27,11 +27,12 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/dapr/cli/pkg/kubernetes"
"github.com/dapr/cli/pkg/metadata" "github.com/dapr/cli/pkg/metadata"
"github.com/dapr/cli/pkg/print" "github.com/dapr/cli/pkg/print"
runExec "github.com/dapr/cli/pkg/runexec" runExec "github.com/dapr/cli/pkg/runexec"
"github.com/dapr/cli/pkg/runfileconfig"
"github.com/dapr/cli/pkg/standalone" "github.com/dapr/cli/pkg/standalone"
"github.com/dapr/cli/pkg/standalone/runfileconfig"
daprsyscall "github.com/dapr/cli/pkg/syscall" daprsyscall "github.com/dapr/cli/pkg/syscall"
"github.com/dapr/cli/utils" "github.com/dapr/cli/utils"
) )
@ -64,6 +65,7 @@ var (
apiListenAddresses string apiListenAddresses string
runFilePath string runFilePath string
appChannelAddress string appChannelAddress string
enableRunK8s bool
) )
const ( const (
@ -121,7 +123,7 @@ dapr run --run-file /path/to/directory
print.FailureStatusEvent(os.Stderr, "Failed to get run file path: %v", err) print.FailureStatusEvent(os.Stderr, "Failed to get run file path: %v", err)
os.Exit(1) os.Exit(1)
} }
executeRunWithAppsConfigFile(runConfigFilePath) executeRunWithAppsConfigFile(runConfigFilePath, enableRunK8s)
return return
} }
if len(args) == 0 { if len(args) == 0 {
@ -461,6 +463,7 @@ func init() {
RunCmd.Flags().IntVar(&appHealthTimeout, "app-health-probe-timeout", 0, "Timeout for app health probes in milliseconds") RunCmd.Flags().IntVar(&appHealthTimeout, "app-health-probe-timeout", 0, "Timeout for app health probes in milliseconds")
RunCmd.Flags().IntVar(&appHealthThreshold, "app-health-threshold", 0, "Number of consecutive failures for the app to be considered unhealthy") RunCmd.Flags().IntVar(&appHealthThreshold, "app-health-threshold", 0, "Number of consecutive failures for the app to be considered unhealthy")
RunCmd.Flags().BoolVar(&enableAPILogging, "enable-api-logging", false, "Log API calls at INFO verbosity. Valid values are: true or false") RunCmd.Flags().BoolVar(&enableAPILogging, "enable-api-logging", false, "Log API calls at INFO verbosity. Valid values are: true or false")
RunCmd.Flags().BoolVarP(&enableRunK8s, "kubernetes", "k", false, "Run the multi-app run template against Kubernetes environment.")
RunCmd.Flags().StringVar(&apiListenAddresses, "dapr-listen-addresses", "", "Comma separated list of IP addresses that sidecar will listen to") RunCmd.Flags().StringVar(&apiListenAddresses, "dapr-listen-addresses", "", "Comma separated list of IP addresses that sidecar will listen to")
RunCmd.Flags().StringVarP(&runFilePath, "run-file", "f", "", "Path to the run template file for the list of apps to run") RunCmd.Flags().StringVarP(&runFilePath, "run-file", "f", "", "Path to the run template file for the list of apps to run")
RunCmd.Flags().StringVarP(&appChannelAddress, "app-channel-address", "", utils.DefaultAppChannelAddress, "The network address the application listens on") RunCmd.Flags().StringVarP(&appChannelAddress, "app-channel-address", "", utils.DefaultAppChannelAddress, "The network address the application listens on")
@ -511,11 +514,11 @@ func executeRun(runTemplateName, runFilePath string, apps []runfileconfig.App) (
// A custom writer used for trimming ASCII color codes from logs when writing to files. // A custom writer used for trimming ASCII color codes from logs when writing to files.
var customAppLogWriter io.Writer var customAppLogWriter io.Writer
daprdLogWriterCloser := getLogWriter(app.DaprdLogWriteCloser, app.DaprdLogDestination) daprdLogWriterCloser := runfileconfig.GetLogWriter(app.DaprdLogWriteCloser, app.DaprdLogDestination)
if len(runConfig.Command) == 0 { if len(runConfig.Command) == 0 {
print.StatusEvent(os.Stdout, print.LogWarning, "No application command found for app %q present in %s", runConfig.AppID, runFilePath) print.StatusEvent(os.Stdout, print.LogWarning, "No application command found for app %q present in %s", runConfig.AppID, runFilePath)
appDaprdWriter = getAppDaprdWriter(app, true) appDaprdWriter = runExec.GetAppDaprdWriter(app, true)
appLogWriter = app.DaprdLogWriteCloser appLogWriter = app.DaprdLogWriteCloser
} else { } else {
err = app.CreateAppLogFile() err = app.CreateAppLogFile()
@ -524,8 +527,8 @@ func executeRun(runTemplateName, runFilePath string, apps []runfileconfig.App) (
exitWithError = true exitWithError = true
break break
} }
appDaprdWriter = getAppDaprdWriter(app, false) appDaprdWriter = runExec.GetAppDaprdWriter(app, false)
appLogWriter = getLogWriter(app.AppLogWriteCloser, app.AppLogDestination) appLogWriter = runfileconfig.GetLogWriter(app.AppLogWriteCloser, app.AppLogDestination)
} }
customAppLogWriter = print.CustomLogWriter{W: appLogWriter} customAppLogWriter = print.CustomLogWriter{W: appLogWriter}
runState, err := startDaprdAndAppProcesses(&runConfig, app.AppDirPath, sigCh, runState, err := startDaprdAndAppProcesses(&runConfig, app.AppDirPath, sigCh,
@ -592,43 +595,6 @@ func executeRun(runTemplateName, runFilePath string, apps []runfileconfig.App) (
return exitWithError, closeError return exitWithError, closeError
} }
// getAppDaprdWriter returns the writer for writing logs common to both daprd, app and stdout.
func getAppDaprdWriter(app runfileconfig.App, isAppCommandEmpty bool) io.Writer {
var appDaprdWriter io.Writer
if isAppCommandEmpty {
if app.DaprdLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(os.Stdout, app.DaprdLogWriteCloser)
} else {
appDaprdWriter = os.Stdout
}
} else {
if app.AppLogDestination != standalone.Console && app.DaprdLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(app.AppLogWriteCloser, app.DaprdLogWriteCloser, os.Stdout)
} else if app.AppLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(app.AppLogWriteCloser, os.Stdout)
} else if app.DaprdLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(app.DaprdLogWriteCloser, os.Stdout)
} else {
appDaprdWriter = os.Stdout
}
}
return appDaprdWriter
}
// getLogWriter returns the log writer based on the log destination.
func getLogWriter(fileLogWriterCloser io.WriteCloser, logDestination standalone.LogDestType) io.Writer {
var logWriter io.Writer
switch logDestination {
case standalone.Console:
logWriter = os.Stdout
case standalone.File:
logWriter = fileLogWriterCloser
case standalone.FileAndConsole:
logWriter = io.MultiWriter(os.Stdout, fileLogWriterCloser)
}
return logWriter
}
func logInformationalStatusToStdout(app runfileconfig.App) { func logInformationalStatusToStdout(app runfileconfig.App) {
print.InfoStatusEvent(os.Stdout, "Started Dapr with app id %q. HTTP Port: %d. gRPC Port: %d", print.InfoStatusEvent(os.Stdout, "Started Dapr with app id %q. HTTP Port: %d. gRPC Port: %d",
app.AppID, app.RunConfig.HTTPPort, app.RunConfig.GRPCPort) app.AppID, app.RunConfig.HTTPPort, app.RunConfig.GRPCPort)
@ -654,9 +620,8 @@ func gracefullyShutdownAppsAndCloseResources(runState []*runExec.RunExec, apps [
return err return err
} }
func executeRunWithAppsConfigFile(runFilePath string) { func executeRunWithAppsConfigFile(runFilePath string, k8sEnabled bool) {
config := runfileconfig.RunFileConfig{} config, apps, err := getRunConfigFromRunFile(runFilePath)
apps, err := config.GetApps(runFilePath)
if err != nil { if err != nil {
print.StatusEvent(os.Stdout, print.LogFailure, "Error getting apps from config file: %s", err) print.StatusEvent(os.Stdout, print.LogFailure, "Error getting apps from config file: %s", err)
os.Exit(1) os.Exit(1)
@ -665,7 +630,13 @@ func executeRunWithAppsConfigFile(runFilePath string) {
print.StatusEvent(os.Stdout, print.LogFailure, "No apps to run") print.StatusEvent(os.Stdout, print.LogFailure, "No apps to run")
os.Exit(1) os.Exit(1)
} }
exitWithError, closeErr := executeRun(config.Name, runFilePath, apps) var exitWithError bool
var closeErr error
if !k8sEnabled {
exitWithError, closeErr = executeRun(config.Name, runFilePath, apps)
} else {
exitWithError, closeErr = kubernetes.Run(runFilePath, config)
}
if exitWithError { if exitWithError {
if closeErr != nil { if closeErr != nil {
print.StatusEvent(os.Stdout, print.LogFailure, "Error closing resources: %s", closeErr) print.StatusEvent(os.Stdout, print.LogFailure, "Error closing resources: %s", closeErr)
@ -674,6 +645,12 @@ func executeRunWithAppsConfigFile(runFilePath string) {
} }
} }
func getRunConfigFromRunFile(runFilePath string) (runfileconfig.RunFileConfig, []runfileconfig.App, error) {
config := runfileconfig.RunFileConfig{}
apps, err := config.GetApps(runFilePath)
return config, apps, err
}
// startDaprdAndAppProcesses is a function to start the App process and the associated Daprd process. // startDaprdAndAppProcesses is a function to start the App process and the associated Daprd process.
// This should be called as a blocking function call. // This should be called as a blocking function call.
func startDaprdAndAppProcesses(runConfig *standalone.RunConfig, commandDir string, sigCh chan os.Signal, func startDaprdAndAppProcesses(runConfig *standalone.RunConfig, commandDir string, sigCh chan os.Signal,

View File

@ -21,11 +21,15 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/dapr/cli/pkg/kubernetes"
"github.com/dapr/cli/pkg/print" "github.com/dapr/cli/pkg/print"
"github.com/dapr/cli/pkg/standalone" "github.com/dapr/cli/pkg/standalone"
) )
var stopAppID string var (
stopAppID string
stopK8s bool
)
var StopCmd = &cobra.Command{ var StopCmd = &cobra.Command{
Use: "stop", Use: "stop",
@ -52,6 +56,7 @@ dapr stop --run-file /path/to/directory
print.FailureStatusEvent(os.Stderr, "Failed to get run file path: %v", err) print.FailureStatusEvent(os.Stderr, "Failed to get run file path: %v", err)
os.Exit(1) os.Exit(1)
} }
if !stopK8s {
err = executeStopWithRunFile(runFilePath) err = executeStopWithRunFile(runFilePath)
if err != nil { if err != nil {
print.FailureStatusEvent(os.Stderr, "Failed to stop Dapr and app processes: %s", err) print.FailureStatusEvent(os.Stderr, "Failed to stop Dapr and app processes: %s", err)
@ -60,6 +65,15 @@ dapr stop --run-file /path/to/directory
} }
return return
} }
config, _, cErr := getRunConfigFromRunFile(runFilePath)
if cErr != nil {
print.FailureStatusEvent(os.Stderr, "Failed to parse run template file %q: %s", runFilePath, cErr.Error())
}
err = kubernetes.Stop(runFilePath, config)
if err != nil {
print.FailureStatusEvent(os.Stderr, "Error stopping deployments from multi-app run template: %v", err)
}
}
if stopAppID != "" { if stopAppID != "" {
args = append(args, stopAppID) args = append(args, stopAppID)
} }
@ -83,6 +97,7 @@ dapr stop --run-file /path/to/directory
func init() { func init() {
StopCmd.Flags().StringVarP(&stopAppID, "app-id", "a", "", "The application id to be stopped") StopCmd.Flags().StringVarP(&stopAppID, "app-id", "a", "", "The application id to be stopped")
StopCmd.Flags().StringVarP(&runFilePath, "run-file", "f", "", "Path to the run template file for the list of apps to stop") StopCmd.Flags().StringVarP(&runFilePath, "run-file", "f", "", "Path to the run template file for the list of apps to stop")
StopCmd.Flags().BoolVarP(&stopK8s, "kubernetes", "k", false, "Stop deployments in Kunernetes based on multi-app run file")
StopCmd.Flags().BoolP("help", "h", false, "Print this help message") StopCmd.Flags().BoolP("help", "h", false, "Print this help message")
RootCmd.AddCommand(StopCmd) RootCmd.AddCommand(StopCmd)
} }

View File

@ -30,6 +30,7 @@ import (
var ( var (
uninstallNamespace string uninstallNamespace string
uninstallKubernetes bool uninstallKubernetes bool
uninstallDev bool
uninstallAll bool uninstallAll bool
uninstallContainerRuntime string uninstallContainerRuntime string
) )
@ -66,7 +67,7 @@ dapr uninstall --runtime-path <path-to-install-directory>
} }
print.InfoStatusEvent(os.Stdout, "Removing Dapr from your cluster...") print.InfoStatusEvent(os.Stdout, "Removing Dapr from your cluster...")
err = kubernetes.Uninstall(uninstallNamespace, uninstallAll, timeout) err = kubernetes.Uninstall(uninstallNamespace, uninstallAll, uninstallDev, timeout)
} else { } else {
if !utils.IsValidContainerRuntime(uninstallContainerRuntime) { if !utils.IsValidContainerRuntime(uninstallContainerRuntime) {
print.FailureStatusEvent(os.Stdout, "Invalid container runtime. Supported values are docker and podman.") print.FailureStatusEvent(os.Stdout, "Invalid container runtime. Supported values are docker and podman.")
@ -87,6 +88,7 @@ dapr uninstall --runtime-path <path-to-install-directory>
func init() { func init() {
UninstallCmd.Flags().BoolVarP(&uninstallKubernetes, "kubernetes", "k", false, "Uninstall Dapr from a Kubernetes cluster") UninstallCmd.Flags().BoolVarP(&uninstallKubernetes, "kubernetes", "k", false, "Uninstall Dapr from a Kubernetes cluster")
UninstallCmd.Flags().BoolVarP(&uninstallDev, "dev", "", false, "Uninstall Dapr Redis and Zipking installations from Kubernetes cluster")
UninstallCmd.Flags().UintVarP(&timeout, "timeout", "", 300, "The timeout for the Kubernetes uninstall") UninstallCmd.Flags().UintVarP(&timeout, "timeout", "", 300, "The timeout for the Kubernetes uninstall")
UninstallCmd.Flags().BoolVar(&uninstallAll, "all", false, "Remove .dapr directory, Redis, Placement and Zipkin containers on local machine, and CRDs on a Kubernetes cluster") UninstallCmd.Flags().BoolVar(&uninstallAll, "all", false, "Remove .dapr directory, Redis, Placement and Zipkin containers on local machine, and CRDs on a Kubernetes cluster")
UninstallCmd.Flags().String("network", "", "The Docker network from which to remove the Dapr runtime") UninstallCmd.Flags().String("network", "", "The Docker network from which to remove the Dapr runtime")

View File

@ -38,7 +38,15 @@ import (
const ( const (
daprReleaseName = "dapr" daprReleaseName = "dapr"
dashboardReleaseName = "dapr-dashboard" dashboardReleaseName = "dapr-dashboard"
thirdPartyDevNamespace = "default"
zipkinChartName = "zipkin"
redisChartName = "redis"
zipkinReleaseName = "dapr-dev-zipkin"
redisReleaseName = "dapr-dev-redis"
redisVersion = "6.2"
bitnamiHelmRepo = "https://charts.bitnami.com/bitnami"
daprHelmRepo = "https://dapr.github.io/helm-charts" daprHelmRepo = "https://dapr.github.io/helm-charts"
zipkinHelmRepo = "https://openzipkin.github.io/zipkin"
latestVersion = "latest" latestVersion = "latest"
) )
@ -48,6 +56,7 @@ type InitConfiguration struct {
Namespace string Namespace string
EnableMTLS bool EnableMTLS bool
EnableHA bool EnableHA bool
EnableDev bool
Args []string Args []string
Wait bool Wait bool
Timeout uint Timeout uint
@ -60,7 +69,8 @@ type InitConfiguration struct {
// Init deploys the Dapr operator using the supplied runtime version. // Init deploys the Dapr operator using the supplied runtime version.
func Init(config InitConfiguration) error { func Init(config InitConfiguration) error {
err := installWithConsole(daprReleaseName, config.Version, "Dapr control plane", config) helmRepoDapr := utils.GetEnv("DAPR_HELM_REPO_URL", daprHelmRepo)
err := installWithConsole(daprReleaseName, config.Version, helmRepoDapr, "Dapr control plane", config)
if err != nil { if err != nil {
return err return err
} }
@ -75,19 +85,53 @@ func Init(config InitConfiguration) error {
} }
} }
err = installWithConsole(dashboardReleaseName, config.DashboardVersion, "Dapr dashboard", config) err = installWithConsole(dashboardReleaseName, config.DashboardVersion, helmRepoDapr, "Dapr dashboard", config)
if err != nil { if err != nil {
return err return err
} }
if config.EnableDev {
redisChartVals := []string{
"image.tag=" + redisVersion,
}
err = installThirdPartyWithConsole(redisReleaseName, redisChartName, latestVersion, bitnamiHelmRepo, "Dapr Redis", redisChartVals, config)
if err != nil {
return err
}
err = installThirdPartyWithConsole(zipkinReleaseName, zipkinChartName, latestVersion, zipkinHelmRepo, "Dapr Zipkin", []string{}, config)
if err != nil {
return err
}
err = initDevConfigs()
if err != nil {
return err
}
}
return nil return nil
} }
func installWithConsole(releaseName string, releaseVersion string, prettyName string, config InitConfiguration) error { func installThirdPartyWithConsole(releaseName, chartName, releaseVersion, helmRepo string, prettyName string, chartValues []string, config InitConfiguration) error {
installSpinning := print.Spinner(os.Stdout, "Deploying the "+prettyName+" with "+releaseVersion+" version to your cluster...") installSpinning := print.Spinner(os.Stdout, "Deploying the "+prettyName+" with "+releaseVersion+" version to your cluster...")
defer installSpinning(print.Failure) defer installSpinning(print.Failure)
err := install(releaseName, releaseVersion, config) // releaseVersion of chart will always be latest version.
err := installThirdParty(releaseName, chartName, latestVersion, helmRepo, chartValues, config)
if err != nil {
return err
}
installSpinning(print.Success)
return nil
}
func installWithConsole(releaseName, releaseVersion, helmRepo string, prettyName string, config InitConfiguration) error {
installSpinning := print.Spinner(os.Stdout, "Deploying the "+prettyName+" with "+releaseVersion+" version to your cluster...")
defer installSpinning(print.Failure)
err := install(releaseName, releaseVersion, helmRepo, config)
if err != nil { if err != nil {
return err return err
} }
@ -156,9 +200,9 @@ func locateChartFile(dirPath string) (string, error) {
return filepath.Join(dirPath, files[0].Name()), nil return filepath.Join(dirPath, files[0].Name()), nil
} }
func daprChart(version string, releaseName string, config *helm.Configuration) (*chart.Chart, error) { func getHelmChart(version, releaseName, helmRepo string, config *helm.Configuration) (*chart.Chart, error) {
pull := helm.NewPullWithOpts(helm.WithConfig(config)) pull := helm.NewPullWithOpts(helm.WithConfig(config))
pull.RepoURL = utils.GetEnv("DAPR_HELM_REPO_URL", daprHelmRepo) pull.RepoURL = helmRepo
pull.Username = utils.GetEnv("DAPR_HELM_REPO_USERNAME", "") pull.Username = utils.GetEnv("DAPR_HELM_REPO_USERNAME", "")
pull.Password = utils.GetEnv("DAPR_HELM_REPO_PASSWORD", "") pull.Password = utils.GetEnv("DAPR_HELM_REPO_PASSWORD", "")
@ -188,7 +232,7 @@ func daprChart(version string, releaseName string, config *helm.Configuration) (
return loader.Load(chartPath) return loader.Load(chartPath)
} }
func chartValues(config InitConfiguration, version string) (map[string]interface{}, error) { func daprChartValues(config InitConfiguration, version string) (map[string]interface{}, error) {
chartVals := map[string]interface{}{} chartVals := map[string]interface{}{}
err := utils.ValidateImageVariant(config.ImageVariant) err := utils.ValidateImageVariant(config.ImageVariant)
if err != nil { if err != nil {
@ -227,7 +271,7 @@ func chartValues(config InitConfiguration, version string) (map[string]interface
return chartVals, nil return chartVals, nil
} }
func install(releaseName string, releaseVersion string, config InitConfiguration) error { func install(releaseName, releaseVersion, helmRepo string, config InitConfiguration) error {
err := createNamespace(config.Namespace) err := createNamespace(config.Namespace)
if err != nil { if err != nil {
return err return err
@ -238,7 +282,7 @@ func install(releaseName string, releaseVersion string, config InitConfiguration
return err return err
} }
daprChart, err := daprChart(releaseVersion, releaseName, helmConf) daprChart, err := getHelmChart(releaseVersion, releaseName, helmRepo, helmConf)
if err != nil { if err != nil {
return err return err
} }
@ -261,7 +305,7 @@ func install(releaseName string, releaseVersion string, config InitConfiguration
installClient.Wait = config.Wait installClient.Wait = config.Wait
installClient.Timeout = time.Duration(config.Timeout) * time.Second installClient.Timeout = time.Duration(config.Timeout) * time.Second
values, err := chartValues(config, version) values, err := daprChartValues(config, version)
if err != nil { if err != nil {
return err return err
} }
@ -273,6 +317,38 @@ func install(releaseName string, releaseVersion string, config InitConfiguration
return nil return nil
} }
func installThirdParty(releaseName, chartName, releaseVersion, helmRepo string, chartVals []string, config InitConfiguration) error {
helmConf, err := helmConfig(thirdPartyDevNamespace)
if err != nil {
return err
}
helmChart, err := getHelmChart(releaseVersion, chartName, helmRepo, helmConf)
if err != nil {
return err
}
installClient := helm.NewInstall(helmConf)
installClient.ReleaseName = releaseName
installClient.Namespace = thirdPartyDevNamespace
installClient.Wait = config.Wait
installClient.Timeout = time.Duration(config.Timeout) * time.Second
values := map[string]interface{}{}
for _, val := range chartVals {
if err = strvals.ParseInto(val, values); err != nil {
return err
}
}
if _, err = installClient.Run(helmChart, values); err != nil {
return err
}
return nil
}
func debugLogf(format string, v ...interface{}) { func debugLogf(format string, v ...interface{}) {
} }
@ -290,3 +366,78 @@ func confirmExist(cfg *helm.Configuration, releaseName string) (bool, error) {
return true, nil return true, nil
} }
func checkAndOverWriteFile(filePath string, b []byte) error {
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
// #nosec G306
if err = os.WriteFile(filePath, b, 0o644); err != nil {
return err
}
}
return nil
}
func initDevConfigs() error {
redisStatestore := `
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: daprdevstatestore
spec:
type: state.redis
version: v1
metadata:
# These settings will work out of the box if you use helm install
# bitnami/redis. If you have your own setup, replace
# redis-master:6379 with your own Redis master address, and the
# Redis password with your own Secret's name. For more information,
# see https://docs.dapr.io/operations/components/component-secrets .
- name: redisHost
value: dapr-dev-redis-master:6379
- name: redisPassword
secretKeyRef:
name: dapr-dev-redis
key: redis-password
auth:
secretStore: kubernetes
`
zipkinConfig := `
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: daprdevzipkinconfig
spec:
tracing:
samplingRate: "1"
zipkin:
endpointAddress: "http://dapr-dev-zipkin.default.svc.cluster.local:9411/api/v2/spans"
`
tempDirPath, err := createTempDir()
defer os.RemoveAll(tempDirPath)
if err != nil {
return err
}
redisPath := filepath.Join(tempDirPath, "redis-statestore.yaml")
err = checkAndOverWriteFile(redisPath, []byte(redisStatestore))
if err != nil {
return err
}
_, err = utils.RunCmdAndWait("kubectl", "apply", "-f", redisPath)
if err != nil {
return err
}
zipkinPath := filepath.Join(tempDirPath, "zipkin-config.yaml")
err = checkAndOverWriteFile(zipkinPath, []byte(zipkinConfig))
if err != nil {
return err
}
_, err = utils.RunCmdAndWait("kubectl", "apply", "-f", zipkinPath)
if err != nil {
return err
}
return nil
}

View File

@ -14,17 +14,29 @@ limitations under the License.
package kubernetes package kubernetes
import ( import (
"bufio"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
"strings"
"time"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/dapr/cli/pkg/print"
) )
const ( const (
daprdContainerName = "daprd" daprdContainerName = "daprd"
appIDContainerArgName = "--app-id" appIDContainerArgName = "--app-id"
maxListingRetry = 10
listingDelay = 200 * time.Microsecond
streamingDelay = 100 * time.Millisecond
) )
// Logs fetches Dapr sidecar logs from Kubernetes. // Logs fetches Dapr sidecar logs from Kubernetes.
@ -84,3 +96,99 @@ func Logs(appID, podName, namespace string) error {
return nil return nil
} }
// streamContainerLogsToDisk streams all containers logs for the given selector to a given disk directory.
func streamContainerLogsToDisk(ctx context.Context, appID string, appLogWriter, daprdLogWriter io.Writer, podClient v1.PodInterface) error {
var err error
var podList *corev1.PodList
counter := 0
for {
podList, err = getPods(ctx, appID, podClient)
if err != nil {
return fmt.Errorf("error listing the pod with label %s=%s: %w", daprAppIDKey, appID, err)
}
if len(podList.Items) != 0 {
break
}
counter++
if counter == maxListingRetry {
return fmt.Errorf("error getting logs: error listing the pod with label %s=%s after %d retires", daprAppIDKey, appID, maxListingRetry)
}
// Retry after a delay.
time.Sleep(listingDelay)
}
for _, pod := range podList.Items {
print.InfoStatusEvent(os.Stdout, "Streaming logs for containers in pod %q", pod.GetName())
for _, container := range pod.Spec.Containers {
fileWriter := daprdLogWriter
if container.Name != daprdContainerName {
fileWriter = appLogWriter
}
// create a go routine for each container to stream logs into file/console.
go func(pod, containerName, appID string, fileWriter io.Writer) {
loop:
for {
req := podClient.GetLogs(pod, &corev1.PodLogOptions{
Container: containerName,
Follow: true,
})
stream, err := req.Stream(ctx)
if err != nil {
switch {
case strings.Contains(err.Error(), "Pending"):
// Retry after a delay.
time.Sleep(streamingDelay)
continue loop
case strings.Contains(err.Error(), "ContainerCreating"):
// Retry after a delay.
time.Sleep(streamingDelay)
continue loop
case errors.Is(err, context.Canceled):
return
default:
return
}
}
defer stream.Close()
if containerName != daprdContainerName {
streamScanner := bufio.NewScanner(stream)
for streamScanner.Scan() {
fmt.Fprintln(fileWriter, print.Blue(fmt.Sprintf("== APP - %s == %s", appID, streamScanner.Text())))
}
} else {
_, err = io.Copy(fileWriter, stream)
if err != nil {
switch {
case errors.Is(err, context.Canceled):
return
default:
return
}
}
}
return
}
}(pod.GetName(), container.Name, appID, fileWriter)
}
}
return nil
}
func getPods(ctx context.Context, appID string, podClient v1.PodInterface) (*corev1.PodList, error) {
listCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
labelSelector := fmt.Sprintf("%s=%s", daprAppIDKey, appID)
fmt.Println("Select", labelSelector)
podList, err := podClient.List(listCtx, metav1.ListOptions{
LabelSelector: labelSelector,
})
cancel()
if err != nil {
return nil, err
}
return podList, nil
}

View File

@ -15,24 +15,31 @@ package kubernetes
import ( import (
"context" "context"
"errors"
"fmt"
"strings" "strings"
core_v1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
k8s "k8s.io/client-go/kubernetes" k8s "k8s.io/client-go/kubernetes"
) )
func ListPodsInterface(client k8s.Interface, labelSelector map[string]string) (*core_v1.PodList, error) { const podWatchErrTemplate = "error creating pod watcher"
opts := v1.ListOptions{}
var errPodUnknown error = errors.New("pod in unknown/failed state")
func ListPodsInterface(client k8s.Interface, labelSelector map[string]string) (*corev1.PodList, error) {
opts := metav1.ListOptions{}
if labelSelector != nil { if labelSelector != nil {
opts.LabelSelector = labels.FormatLabels(labelSelector) opts.LabelSelector = labels.FormatLabels(labelSelector)
} }
return client.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), opts) return client.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), opts)
} }
func ListPods(client *k8s.Clientset, namespace string, labelSelector map[string]string) (*core_v1.PodList, error) { func ListPods(client *k8s.Clientset, namespace string, labelSelector map[string]string) (*corev1.PodList, error) {
opts := v1.ListOptions{} opts := metav1.ListOptions{}
if labelSelector != nil { if labelSelector != nil {
opts.LabelSelector = labels.FormatLabels(labelSelector) opts.LabelSelector = labels.FormatLabels(labelSelector)
} }
@ -41,8 +48,8 @@ func ListPods(client *k8s.Clientset, namespace string, labelSelector map[string]
// CheckPodExists returns a boolean representing the pod's existence and the namespace that the given pod resides in, // CheckPodExists returns a boolean representing the pod's existence and the namespace that the given pod resides in,
// or empty if not present in the given namespace. // or empty if not present in the given namespace.
func CheckPodExists(client *k8s.Clientset, namespace string, labelSelector map[string]string, deployName string) (bool, string) { func CheckPodExists(client k8s.Interface, namespace string, labelSelector map[string]string, deployName string) (bool, string) {
opts := v1.ListOptions{} opts := metav1.ListOptions{}
if labelSelector != nil { if labelSelector != nil {
opts.LabelSelector = labels.FormatLabels(labelSelector) opts.LabelSelector = labels.FormatLabels(labelSelector)
} }
@ -53,7 +60,7 @@ func CheckPodExists(client *k8s.Clientset, namespace string, labelSelector map[s
} }
for _, pod := range podList.Items { for _, pod := range podList.Items {
if pod.Status.Phase == core_v1.PodRunning { if pod.Status.Phase == corev1.PodRunning {
if strings.HasPrefix(pod.Name, deployName) { if strings.HasPrefix(pod.Name, deployName) {
return true, pod.Namespace return true, pod.Namespace
} }
@ -61,3 +68,61 @@ func CheckPodExists(client *k8s.Clientset, namespace string, labelSelector map[s
} }
return false, "" return false, ""
} }
func createPodWatcher(ctx context.Context, client k8s.Interface, namespace, appID string) (watch.Interface, error) {
labelSelector := fmt.Sprintf("%s=%s", daprAppIDKey, appID)
opts := metav1.ListOptions{
TypeMeta: metav1.TypeMeta{},
LabelSelector: labelSelector,
}
return client.CoreV1().Pods(namespace).Watch(ctx, opts)
}
func waitPodDeleted(ctx context.Context, client k8s.Interface, namespace, appID string) error {
watcher, err := createPodWatcher(ctx, client, namespace, appID)
if err != nil {
return fmt.Errorf("%s : %w", podWatchErrTemplate, err)
}
defer watcher.Stop()
for {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Deleted {
return nil
}
case <-ctx.Done():
return fmt.Errorf("error context cancelled while waiting for pod deletion: %w", context.Canceled)
}
}
}
func waitPodRunning(ctx context.Context, client k8s.Interface, namespace, appID string) error {
watcher, err := createPodWatcher(ctx, client, namespace, appID)
if err != nil {
return fmt.Errorf("%s : %w", podWatchErrTemplate, err)
}
defer watcher.Stop()
for {
select {
case event := <-watcher.ResultChan():
pod := event.Object.(*corev1.Pod)
if pod.Status.Phase == corev1.PodRunning {
return nil
} else if pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodUnknown {
return fmt.Errorf("error waiting for pod run: %w", errPodUnknown)
}
case <-ctx.Done():
return fmt.Errorf("error context cancelled while waiting for pod run: %w", context.Canceled)
}
}
}

View File

@ -112,7 +112,8 @@ func renewCertificate(rootCert, issuerCert, issuerKey []byte, timeout uint, imag
return err return err
} }
daprChart, err := daprChart(daprVersion, "dapr", helmConf) helmRepo := utils.GetEnv("DAPR_HELM_REPO_URL", daprHelmRepo)
daprChart, err := getHelmChart(daprVersion, "dapr", helmRepo, helmConf)
if err != nil { if err != nil {
return err return err
} }

View File

@ -13,24 +13,416 @@ limitations under the License.
package kubernetes package kubernetes
// RunConfig represents the application configuration parameters. import (
type RunConfig struct { "context"
AppID string "errors"
AppPort int "fmt"
HTTPPort int "io"
GRPCPort int "os"
CodeDirectory string "path/filepath"
Arguments []string "strings"
Image string "sync"
"syscall"
"time"
appV1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
k8s "k8s.io/client-go/kubernetes"
podsv1 "k8s.io/client-go/kubernetes/typed/core/v1"
// Specifically use k8s sig yaml to marshal into json, then convert to yaml.
k8sYaml "sigs.k8s.io/yaml"
"github.com/dapr/cli/pkg/print"
"github.com/dapr/cli/pkg/runfileconfig"
daprsyscall "github.com/dapr/cli/pkg/syscall"
"github.com/dapr/cli/utils"
"github.com/dapr/kit/ptr"
)
const (
serviceKind = "Service"
deploymentKind = "Deployment"
serviceAPIVersion = "v1"
deploymentAPIVersion = "apps/v1"
loadBalanceType = "LoadBalancer"
daprEnableAnnotationKey = "dapr.io/enabled"
serviceFileName = "service.yaml"
deploymentFileName = "deployment.yaml"
appLabelKey = "app"
nameKey = "name"
labelsKey = "labels"
tcpProtocol = "TCP"
podCreationDeletionTimeout = 1 * time.Minute
)
type deploymentConfig struct {
Kind string `json:"kind"`
APIVersion string `json:"apiVersion"`
Metadata map[string]any `json:"metadata"`
Spec appV1.DeploymentSpec `json:"spec"`
} }
// RunOutput represents the run output. type serviceConfig struct {
type RunOutput struct { Kind string `json:"kind"`
Message string APIVersion string `json:"apiVersion"`
Metadata map[string]any `json:"metadata"`
Spec corev1.ServiceSpec `json:"spec"`
} }
// Run executes the application based on the run configuration. type runState struct {
func Run(config *RunConfig) (*RunOutput, error) { serviceFilePath string
//nolint deploymentFilePath string
return nil, nil app runfileconfig.App
logCancel context.CancelFunc
}
// Run executes the application based on the run file configuration.
// Run creates a temporary `deploy` folder within the app/.dapr directory and then applies that to the context pointed to
// kubectl client.
func Run(runFilePath string, config runfileconfig.RunFileConfig) (bool, error) {
// At this point, we expect the runfile to be parsed and the values within config
// Validations and default setting will only be done after this point.
var exitWithError bool
// get k8s client PodsInterface.
client, cErr := Client()
if cErr != nil {
// exit with error.
return true, fmt.Errorf("error getting k8s client: %w", cErr)
}
namespace := corev1.NamespaceDefault
podsInterface := client.CoreV1().Pods(namespace)
// setup a monitoring context for shutdown call from another cli process.
monitoringContext, monitoringCancel := context.WithCancel(context.Background())
defer monitoringCancel()
// setup shutdown notify channel.
sigCh := make(chan os.Signal, 1)
daprsyscall.SetupShutdownNotify(sigCh)
runStates := []runState{}
for _, app := range config.Apps {
print.StatusEvent(os.Stdout, print.LogInfo, "Validating config and starting app %q", app.RunConfig.AppID)
// Set defaults if zero value provided in config yaml.
app.RunConfig.SetDefaultFromSchema()
// Validate validates the configs for k8s and modifies appId etc.
err := app.RunConfig.ValidateK8s()
if err != nil {
print.FailureStatusEvent(os.Stderr, "Error validating run config for app %q present in %s: %s", app.RunConfig.AppID, runFilePath, err.Error())
exitWithError = true
break
}
var svc serviceConfig
// create default service config.
if app.ContainerConfiguration.CreateService {
svc = createServiceConfig(app)
}
// create default deployment config.
dep := createDeploymentConfig(app)
if err != nil {
print.FailureStatusEvent(os.Stderr, "Error creating deployment file for app %q present in %s: %s", app.RunConfig.AppID, runFilePath, err.Error())
exitWithError = true
break
}
// overwrite <app-id>/.dapr/deploy/service.yaml.
// overwrite <app-id>/.dapr/deploy/deployment.yaml.
err = writeYamlFile(app, svc, dep)
if err != nil {
print.FailureStatusEvent(os.Stderr, "Error creating deployment/service yaml files: %s", err.Error())
exitWithError = true
break
}
deployDir := app.GetDeployDir()
print.InfoStatusEvent(os.Stdout, "Deploying app %q to Kubernetes", app.AppID)
serviceFilePath := filepath.Join(deployDir, serviceFileName)
deploymentFilePath := filepath.Join(deployDir, deploymentFileName)
rState := runState{}
if app.CreateService {
print.InfoStatusEvent(os.Stdout, "Deploying service YAML %q to Kubernetes", serviceFilePath)
err = deployYamlToK8s(serviceFilePath)
if err != nil {
print.FailureStatusEvent(os.Stderr, "Error deploying service yaml file %q : %s", serviceFilePath, err.Error())
exitWithError = true
break
}
rState.serviceFilePath = serviceFilePath
}
print.InfoStatusEvent(os.Stdout, "Deploying deployment YAML %q to Kubernetes", deploymentFilePath)
err = deployYamlToK8s(deploymentFilePath)
if err != nil {
print.FailureStatusEvent(os.Stderr, "Error deploying deployment yaml file %q : %s", deploymentFilePath, err.Error())
exitWithError = true
break
}
// create log files and save state.
err = app.CreateDaprdLogFile()
if err != nil {
print.StatusEvent(os.Stderr, print.LogFailure, "Error getting daprd log file for app %q present in %s: %s", app.AppID, runFilePath, err.Error())
exitWithError = true
break
}
err = app.CreateAppLogFile()
if err != nil {
print.StatusEvent(os.Stderr, print.LogFailure, "Error getting app log file for app %q present in %s: %s", app.AppID, runFilePath, err.Error())
exitWithError = true
break
}
daprdLogWriter := runfileconfig.GetLogWriter(app.DaprdLogWriteCloser, app.DaprdLogDestination)
// appDaprdWriter := runExec.GetAppDaprdWriter(app, false).
appLogWriter := runfileconfig.GetLogWriter(app.AppLogWriteCloser, app.AppLogDestination)
ctx, cancel := context.WithTimeout(context.Background(), podCreationDeletionTimeout)
err = waitPodRunning(ctx, client, namespace, app.AppID)
cancel()
if err != nil {
print.WarningStatusEvent(os.Stderr, "Error deploying pod to Kubernetes. See logs directly from Kubernetes command line.")
// Close the log files since there is deployment error, and the container might be in crash loop back off state.
app.CloseAppLogFile()
app.CloseDaprdLogFile()
} else {
logContext, cancel := context.WithCancel(context.Background())
rState.logCancel = cancel
err = setupLogs(logContext, app.AppID, daprdLogWriter, appLogWriter, podsInterface)
if err != nil {
print.StatusEvent(os.Stderr, print.LogWarning, "Error setting up logs for app %q present in %q . See logs directly from Kubernetes command line.: %s ", app.AppID, runFilePath, err.Error())
}
}
rState.deploymentFilePath = deploymentFilePath
rState.app = app
// append runSate only on successful k8s deploy.
runStates = append(runStates, rState)
print.InfoStatusEvent(os.Stdout, "Writing log files to directory : %s", app.GetLogsDir())
}
// If all apps have been started and there are no errors in starting the apps wait for signal from sigCh.
if !exitWithError {
print.InfoStatusEvent(os.Stdout, "Starting to monitor Kubernetes pods for deletion.")
go monitorK8sPods(monitoringContext, client, namespace, runStates, sigCh)
// After all apps started wait for sigCh.
<-sigCh
monitoringCancel()
print.InfoStatusEvent(os.Stdout, "Stopping Kubernetes pods monitoring.")
// To add a new line in Stdout.
fmt.Println()
print.InfoStatusEvent(os.Stdout, "Received signal to stop. Deleting K8s Dapr app deployments.")
}
closeErr := gracefullyShutdownK8sDeployment(runStates, client, namespace)
return exitWithError, closeErr
}
func createServiceConfig(app runfileconfig.App) serviceConfig {
return serviceConfig{
Kind: serviceKind,
APIVersion: serviceAPIVersion,
Metadata: map[string]any{
nameKey: app.RunConfig.AppID,
labelsKey: map[string]string{
appLabelKey: app.AppID,
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Protocol: tcpProtocol,
Port: 80,
TargetPort: intstr.FromInt(app.AppPort),
},
},
Selector: map[string]string{
appLabelKey: app.AppID,
},
Type: loadBalanceType,
},
}
}
func createDeploymentConfig(app runfileconfig.App) deploymentConfig {
dep := deploymentConfig{
Kind: deploymentKind,
APIVersion: deploymentAPIVersion,
Metadata: map[string]any{
nameKey: app.AppID,
},
}
dep.Spec = appV1.DeploymentSpec{
Replicas: ptr.Of[int32](1),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
appLabelKey: app.AppID,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
appLabelKey: app.AppID,
},
Annotations: app.RunConfig.GetAnnotations(),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: app.AppID,
Image: app.ContainerImage,
Env: getEnv(app),
ImagePullPolicy: corev1.PullAlways,
},
},
},
},
}
// Set dapr.io/enable annotation.
dep.Spec.Template.ObjectMeta.Annotations[daprEnableAnnotationKey] = "true"
// set containerPort only if app port is present.
if app.AppPort != 0 {
dep.Spec.Template.Spec.Containers[0].Ports = []corev1.ContainerPort{
{
ContainerPort: int32(app.AppPort),
},
}
}
return dep
}
func getEnv(app runfileconfig.App) []corev1.EnvVar {
envs := app.GetEnv()
envVars := make([]corev1.EnvVar, len(envs))
i := 0
for k, v := range app.GetEnv() {
envVars[i] = corev1.EnvVar{
Name: k,
Value: v,
}
i++
}
return envVars
}
func writeYamlFile(app runfileconfig.App, svc serviceConfig, dep deploymentConfig) error {
var yamlBytes []byte
var err error
var writeFile io.WriteCloser
deployDir := app.GetDeployDir()
if app.CreateService {
yamlBytes, err = k8sYaml.Marshal(svc)
if err != nil {
return fmt.Errorf("error marshalling service yaml: %w", err)
}
serviceFilePath := filepath.Join(deployDir, serviceFileName)
writeFile, err = os.Create(serviceFilePath)
if err != nil {
return fmt.Errorf("error creating file %s : %w", serviceFilePath, err)
}
_, err = writeFile.Write(yamlBytes)
if err != nil {
writeFile.Close()
return fmt.Errorf("error writing to file %s : %w", serviceFilePath, err)
}
writeFile.Close()
}
yamlBytes, err = k8sYaml.Marshal(dep)
if err != nil {
return fmt.Errorf("error marshalling deployment yaml: %w", err)
}
deploymentFilePath := filepath.Join(deployDir, deploymentFileName)
writeFile, err = os.Create(deploymentFilePath)
if err != nil {
return fmt.Errorf("error creating file %s : %w", deploymentFilePath, err)
}
_, err = writeFile.Write(yamlBytes)
if err != nil {
writeFile.Close()
return fmt.Errorf("error writing to file %s : %w", deploymentFilePath, err)
}
writeFile.Close()
return nil
}
func deployYamlToK8s(yamlToDeployPath string) error {
_, err := utils.RunCmdAndWait("kubectl", "apply", "-f", yamlToDeployPath)
if err != nil {
return fmt.Errorf("error deploying the yaml %s to Kubernetes: %w", yamlToDeployPath, err)
}
return nil
}
func deleteYamlK8s(yamlToDeletePath string) error {
print.InfoStatusEvent(os.Stdout, "Deleting %q from Kubernetes", yamlToDeletePath)
_, err := utils.RunCmdAndWait("kubectl", "delete", "-f", yamlToDeletePath)
if err != nil {
return fmt.Errorf("error deploying the yaml %s to Kubernetes: %w", yamlToDeletePath, err)
}
return nil
}
func setupLogs(ctx context.Context, appID string, daprdLogWriter, appLogWriter io.Writer, podInterface podsv1.PodInterface) error {
return streamContainerLogsToDisk(ctx, appID, appLogWriter, daprdLogWriter, podInterface)
}
func gracefullyShutdownK8sDeployment(runStates []runState, client k8s.Interface, namespace string) error {
errs := make([]error, 0, len(runStates)*4)
for _, r := range runStates {
if len(r.serviceFilePath) != 0 {
errs = append(errs, deleteYamlK8s(r.serviceFilePath))
}
errs = append(errs, deleteYamlK8s(r.deploymentFilePath))
labelSelector := map[string]string{
daprAppIDKey: r.app.AppID,
}
if ok, _ := CheckPodExists(client, namespace, labelSelector, r.app.AppID); ok {
ctx, cancel := context.WithTimeout(context.Background(), podCreationDeletionTimeout)
err := waitPodDeleted(ctx, client, namespace, r.app.AppID)
cancel()
if err != nil {
// swallowing err here intentionally.
print.WarningStatusEvent(os.Stderr, "Error waiting for pods to be deleted. Final logs might only be partially available.")
}
}
// shutdown logs.
r.logCancel()
errs = append(errs, r.app.CloseAppLogFile(), r.app.CloseDaprdLogFile())
}
return errors.Join(errs...)
}
func monitorK8sPods(ctx context.Context, client k8s.Interface, namespace string, runStates []runState, sigCh chan os.Signal) {
// for each app wait for pod to be deleted, if all pods are deleted, then send shutdown signal to the cli process.
wg := sync.WaitGroup{}
for _, r := range runStates {
go func(appID string, wg *sync.WaitGroup) {
err := waitPodDeleted(ctx, client, namespace, r.app.AppID)
if err != nil && strings.Contains(err.Error(), podWatchErrTemplate) {
print.WarningStatusEvent(os.Stderr, "Error monitoring Kubernetes pod(s) for app %q.", appID)
}
wg.Done()
}(r.app.AppID, &wg)
wg.Add(1)
}
wg.Wait()
// Send signal to gracefully close log writers and shut down process.
sigCh <- syscall.SIGINT
} }

51
pkg/kubernetes/stop.go Normal file
View File

@ -0,0 +1,51 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubernetes
import (
"context"
"errors"
"fmt"
"path/filepath"
corev1 "k8s.io/api/core/v1"
"github.com/dapr/cli/pkg/runfileconfig"
)
func Stop(runFilePath string, config runfileconfig.RunFileConfig) error {
errs := []error{}
// get k8s client.
client, cErr := Client()
if cErr != nil {
return fmt.Errorf("error getting k8s client for monitoring pod deletion: %w", cErr)
}
namespace := corev1.NamespaceDefault
for _, app := range config.Apps {
deployDir := app.GetDeployDir()
serviceFilePath := filepath.Join(deployDir, serviceFileName)
deploymentFilePath := filepath.Join(deployDir, deploymentFileName)
if app.CreateService {
errs = append(errs, deleteYamlK8s(serviceFilePath))
}
errs = append(errs, deleteYamlK8s(deploymentFilePath))
ctx, cancel := context.WithTimeout(context.Background(), podCreationDeletionTimeout)
// Ignoring errors here as it will anyway be printed in the other dapr cli process.
waitPodDeleted(ctx, client, namespace, app.AppID)
cancel()
}
return errors.Join(errs...)
}

View File

@ -0,0 +1,11 @@
version: 1
common:
apps:
- appDirPath: ./nodeapp/
appPort: 3000
containerImage: ghcr.io/dapr/samples/hello-k8s-node:latest
createService: true
env:
APP_PORT: 3000
- appDirPath: ./pythonapp/
containerImage: ghcr.io/dapr/samples/hello-k8s-python:latest

View File

@ -24,7 +24,7 @@ import (
) )
// Uninstall removes Dapr from a Kubernetes cluster. // Uninstall removes Dapr from a Kubernetes cluster.
func Uninstall(namespace string, uninstallAll bool, timeout uint) error { func Uninstall(namespace string, uninstallAll bool, uninstallDev bool, timeout uint) error {
config, err := helmConfig(namespace) config, err := helmConfig(namespace)
if err != nil { if err != nil {
return err return err
@ -48,6 +48,11 @@ func Uninstall(namespace string, uninstallAll bool, timeout uint) error {
// Deleting Dashboard here is for versions >= 1.11. // Deleting Dashboard here is for versions >= 1.11.
uninstallClient.Run(dashboardReleaseName) uninstallClient.Run(dashboardReleaseName)
if uninstallDev {
// uninstall dapr-dev-zipkin and dapr-dev-redis as best effort.
uninstallThirdParty()
}
_, err = uninstallClient.Run(daprReleaseName) _, err = uninstallClient.Run(daprReleaseName)
if err != nil { if err != nil {
@ -65,3 +70,14 @@ func Uninstall(namespace string, uninstallAll bool, timeout uint) error {
return nil return nil
} }
func uninstallThirdParty() {
print.InfoStatusEvent(os.Stdout, "Removing dapr-dev-redis and dapr-dev-zipkin from the cluster...")
// Uninstall dapr-dev-redis and dapr-dev-zipkin from k8s as best effort.
config, _ := helmConfig(thirdPartyDevNamespace)
uninstallClient := helm.NewUninstall(config)
uninstallClient.Run(redisReleaseName)
uninstallClient.Run(zipkinReleaseName)
}

View File

@ -57,6 +57,7 @@ type UpgradeConfig struct {
} }
func Upgrade(conf UpgradeConfig) error { func Upgrade(conf UpgradeConfig) error {
helmRepo := utils.GetEnv("DAPR_HELM_REPO_URL", daprHelmRepo)
status, err := GetDaprResourcesStatus() status, err := GetDaprResourcesStatus()
if err != nil { if err != nil {
return err return err
@ -75,7 +76,7 @@ func Upgrade(conf UpgradeConfig) error {
return err return err
} }
controlPlaneChart, err := daprChart(conf.RuntimeVersion, "dapr", helmConf) controlPlaneChart, err := getHelmChart(conf.RuntimeVersion, "dapr", helmRepo, helmConf)
if err != nil { if err != nil {
return err return err
} }
@ -109,7 +110,7 @@ func Upgrade(conf UpgradeConfig) error {
var dashboardChart *chart.Chart var dashboardChart *chart.Chart
if conf.DashboardVersion != "" { if conf.DashboardVersion != "" {
dashboardChart, err = daprChart(conf.DashboardVersion, dashboardReleaseName, helmConf) dashboardChart, err = getHelmChart(conf.DashboardVersion, dashboardReleaseName, helmRepo, helmConf)
if err != nil { if err != nil {
return err return err
} }
@ -176,7 +177,7 @@ func Upgrade(conf UpgradeConfig) error {
} }
} else { } else {
// We need to install Dashboard since it does not exist yet. // We need to install Dashboard since it does not exist yet.
err = install(dashboardReleaseName, conf.DashboardVersion, InitConfiguration{ err = install(dashboardReleaseName, conf.DashboardVersion, helmRepo, InitConfiguration{
DashboardVersion: conf.DashboardVersion, DashboardVersion: conf.DashboardVersion,
Namespace: upgradeClient.Namespace, Namespace: upgradeClient.Namespace,
Wait: upgradeClient.Wait, Wait: upgradeClient.Wait,

View File

@ -16,8 +16,10 @@ package runexec
import ( import (
"fmt" "fmt"
"io" "io"
"os"
"os/exec" "os/exec"
"github.com/dapr/cli/pkg/runfileconfig"
"github.com/dapr/cli/pkg/standalone" "github.com/dapr/cli/pkg/standalone"
) )
@ -129,3 +131,26 @@ func NewOutput(config *standalone.RunConfig) (*RunOutput, error) {
DaprGRPCPort: config.GRPCPort, DaprGRPCPort: config.GRPCPort,
}, nil }, nil
} }
// GetAppDaprdWriter returns the writer for writing logs common to both daprd, app and stdout.
func GetAppDaprdWriter(app runfileconfig.App, isAppCommandEmpty bool) io.Writer {
var appDaprdWriter io.Writer
if isAppCommandEmpty {
if app.DaprdLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(os.Stdout, app.DaprdLogWriteCloser)
} else {
appDaprdWriter = os.Stdout
}
} else {
if app.AppLogDestination != standalone.Console && app.DaprdLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(app.AppLogWriteCloser, app.DaprdLogWriteCloser, os.Stdout)
} else if app.AppLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(app.AppLogWriteCloser, os.Stdout)
} else if app.DaprdLogDestination != standalone.Console {
appDaprdWriter = io.MultiWriter(app.DaprdLogWriteCloser, os.Stdout)
} else {
appDaprdWriter = os.Stdout
}
}
return appDaprdWriter
}

View File

@ -27,6 +27,7 @@ const (
daprdLogFileNamePrefix = "daprd" daprdLogFileNamePrefix = "daprd"
logFileExtension = ".log" logFileExtension = ".log"
logsDir = "logs" logsDir = "logs"
deployDir = "deploy"
) )
// RunFileConfig represents the complete configuration options for the run file. // RunFileConfig represents the complete configuration options for the run file.
@ -38,9 +39,16 @@ type RunFileConfig struct {
Name string `yaml:"name,omitempty"` Name string `yaml:"name,omitempty"`
} }
// ContainerConfiguration represents the application container configuration parameters.
type ContainerConfiguration struct {
ContainerImage string `yaml:"containerImage"`
CreateService bool `yaml:"createService"`
}
// App represents the configuration options for the apps in the run file. // App represents the configuration options for the apps in the run file.
type App struct { type App struct {
standalone.RunConfig `yaml:",inline"` standalone.RunConfig `yaml:",inline"`
ContainerConfiguration `yaml:",inline"`
AppDirPath string `yaml:"appDirPath"` AppDirPath string `yaml:"appDirPath"`
AppLogFileName string AppLogFileName string
DaprdLogFileName string DaprdLogFileName string
@ -59,6 +67,12 @@ func (a *App) GetLogsDir() string {
return logsPath return logsPath
} }
func (a *App) GetDeployDir() string {
logsPath := filepath.Join(a.AppDirPath, standalone.DefaultDaprDirName, deployDir)
os.MkdirAll(logsPath, 0o755)
return logsPath
}
// CreateAppLogFile creates the log file, sets internal file handle // CreateAppLogFile creates the log file, sets internal file handle
// and returns error if any. // and returns error if any.
func (a *App) CreateAppLogFile() error { func (a *App) CreateAppLogFile() error {
@ -104,14 +118,32 @@ func (a *App) createLogFile(logType string) (*os.File, error) {
func (a *App) CloseAppLogFile() error { func (a *App) CloseAppLogFile() error {
if a.AppLogWriteCloser != nil { if a.AppLogWriteCloser != nil {
return a.AppLogWriteCloser.Close() err := a.AppLogWriteCloser.Close()
a.AppLogWriteCloser = nil
return err
} }
return nil return nil
} }
func (a *App) CloseDaprdLogFile() error { func (a *App) CloseDaprdLogFile() error {
if a.DaprdLogWriteCloser != nil { if a.DaprdLogWriteCloser != nil {
return a.DaprdLogWriteCloser.Close() err := a.DaprdLogWriteCloser.Close()
a.DaprdLogWriteCloser = nil
return err
} }
return nil return nil
} }
// GetLogWriter returns the log writer based on the log destination.
func GetLogWriter(fileLogWriterCloser io.WriteCloser, logDestination standalone.LogDestType) io.Writer {
var logWriter io.Writer
switch logDestination {
case standalone.Console:
logWriter = os.Stdout
case standalone.File:
logWriter = fileLogWriterCloser
case standalone.FileAndConsole:
logWriter = io.MultiWriter(os.Stdout, fileLogWriterCloser)
}
return logWriter
}

View File

@ -25,13 +25,13 @@ import (
) )
var ( var (
validRunFilePath = filepath.Join("..", "testdata", "runfileconfig", "test_run_config.yaml") validRunFilePath = filepath.Join(".", "testdata", "test_run_config.yaml")
invalidRunFilePath1 = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_invalid_path.yaml") invalidRunFilePath1 = filepath.Join(".", "testdata", "test_run_config_invalid_path.yaml")
invalidRunFilePath2 = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_empty_app_dir.yaml") invalidRunFilePath2 = filepath.Join(".", "testdata", "test_run_config_empty_app_dir.yaml")
runFileForPrecedenceRule = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_precedence_rule.yaml") runFileForPrecedenceRule = filepath.Join(".", "testdata", "test_run_config_precedence_rule.yaml")
runFileForPrecedenceRuleDaprDir = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_precedence_rule_dapr_dir.yaml") runFileForPrecedenceRuleDaprDir = filepath.Join(".", "testdata", "test_run_config_precedence_rule_dapr_dir.yaml")
runFileForLogDestination = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_log_destination.yaml") runFileForLogDestination = filepath.Join(".", "testdata", "test_run_config_log_destination.yaml")
runFileForMultiResourcePaths = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_multiple_resources_paths.yaml") runFileForMultiResourcePaths = filepath.Join(".", "testdata", "test_run_config_multiple_resources_paths.yaml")
) )
func TestRunConfigFile(t *testing.T) { func TestRunConfigFile(t *testing.T) {

View File

View File

View File

@ -47,39 +47,46 @@ const (
// RunConfig represents the application configuration parameters. // RunConfig represents the application configuration parameters.
type RunConfig struct { type RunConfig struct {
SharedRunConfig `yaml:",inline"` SharedRunConfig `yaml:",inline"`
AppID string `env:"APP_ID" arg:"app-id" yaml:"appID"` AppID string `env:"APP_ID" arg:"app-id" annotation:"dapr.io/app-id" yaml:"appID"`
AppChannelAddress string `env:"APP_CHANNEL_ADDRESS" arg:"app-channel-address" ifneq:"127.0.0.1" yaml:"appChannelAddress"` AppChannelAddress string `env:"APP_CHANNEL_ADDRESS" arg:"app-channel-address" ifneq:"127.0.0.1" yaml:"appChannelAddress"`
AppPort int `env:"APP_PORT" arg:"app-port" yaml:"appPort" default:"-1"` AppPort int `env:"APP_PORT" arg:"app-port" annotation:"dapr.io/app-port" yaml:"appPort" default:"-1"`
HTTPPort int `env:"DAPR_HTTP_PORT" arg:"dapr-http-port" yaml:"daprHTTPPort" default:"-1"` HTTPPort int `env:"DAPR_HTTP_PORT" arg:"dapr-http-port" yaml:"daprHTTPPort" default:"-1"`
GRPCPort int `env:"DAPR_GRPC_PORT" arg:"dapr-grpc-port" yaml:"daprGRPCPort" default:"-1"` GRPCPort int `env:"DAPR_GRPC_PORT" arg:"dapr-grpc-port" yaml:"daprGRPCPort" default:"-1"`
ProfilePort int `arg:"profile-port" yaml:"profilePort" default:"-1"` ProfilePort int `arg:"profile-port" yaml:"profilePort" default:"-1"`
Command []string `yaml:"command"` Command []string `yaml:"command"`
MetricsPort int `env:"DAPR_METRICS_PORT" arg:"metrics-port" yaml:"metricsPort" default:"-1"` MetricsPort int `env:"DAPR_METRICS_PORT" arg:"metrics-port" annotation:"dapr.io/metrics-port" yaml:"metricsPort" default:"-1"`
UnixDomainSocket string `arg:"unix-domain-socket" yaml:"unixDomainSocket"` UnixDomainSocket string `arg:"unix-domain-socket" annotation:"dapr.io/unix-domain-socket-path" yaml:"unixDomainSocket"`
InternalGRPCPort int `arg:"dapr-internal-grpc-port" yaml:"daprInternalGRPCPort" default:"-1"` InternalGRPCPort int `arg:"dapr-internal-grpc-port" yaml:"daprInternalGRPCPort" default:"-1"`
} }
// SharedRunConfig represents the application configuration parameters, which can be shared across many apps. // SharedRunConfig represents the application configuration parameters, which can be shared across many apps.
type SharedRunConfig struct { type SharedRunConfig struct {
// Specifically omitted from annotations see https://github.com/dapr/cli/issues/1324
ConfigFile string `arg:"config" yaml:"configFilePath"` ConfigFile string `arg:"config" yaml:"configFilePath"`
AppProtocol string `arg:"app-protocol" yaml:"appProtocol" default:"http"` AppProtocol string `arg:"app-protocol" annotation:"dapr.io/app-protocol" yaml:"appProtocol" default:"http"`
APIListenAddresses string `arg:"dapr-listen-addresses" yaml:"apiListenAddresses"` APIListenAddresses string `arg:"dapr-listen-addresses" annotation:"dapr.io/sidecar-listen-address" yaml:"apiListenAddresses"`
EnableProfiling bool `arg:"enable-profiling" yaml:"enableProfiling"` EnableProfiling bool `arg:"enable-profiling" annotation:"dapr.io/enable-profiling" yaml:"enableProfiling"`
LogLevel string `arg:"log-level" yaml:"logLevel"` LogLevel string `arg:"log-level" annotation:"dapr.io.log-level" yaml:"logLevel"`
MaxConcurrency int `arg:"app-max-concurrency" yaml:"appMaxConcurrency" default:"-1"` MaxConcurrency int `arg:"app-max-concurrency" annotation:"dapr.io/app-max-concurrerncy" yaml:"appMaxConcurrency" default:"-1"`
// Speicifcally omitted from annotations similar to config file path above.
PlacementHostAddr string `arg:"placement-host-address" yaml:"placementHostAddress"` PlacementHostAddr string `arg:"placement-host-address" yaml:"placementHostAddress"`
// Speicifcally omitted from annotations similar to config file path above.
ComponentsPath string `arg:"components-path"` // Deprecated in run template file: use ResourcesPaths instead. ComponentsPath string `arg:"components-path"` // Deprecated in run template file: use ResourcesPaths instead.
// Speicifcally omitted from annotations similar to config file path above.
ResourcesPath string `yaml:"resourcesPath"` // Deprecated in run template file: use ResourcesPaths instead. ResourcesPath string `yaml:"resourcesPath"` // Deprecated in run template file: use ResourcesPaths instead.
// Speicifcally omitted from annotations similar to config file path above.
ResourcesPaths []string `arg:"resources-path" yaml:"resourcesPaths"` ResourcesPaths []string `arg:"resources-path" yaml:"resourcesPaths"`
// Speicifcally omitted from annotations as appSSL is deprecated.
AppSSL bool `arg:"app-ssl" yaml:"appSSL"` AppSSL bool `arg:"app-ssl" yaml:"appSSL"`
MaxRequestBodySize int `arg:"dapr-http-max-request-size" yaml:"daprHTTPMaxRequestSize" default:"-1"` MaxRequestBodySize int `arg:"dapr-http-max-request-size" annotation:"dapr.io/http-max-request-size" yaml:"daprHTTPMaxRequestSize" default:"-1"`
HTTPReadBufferSize int `arg:"dapr-http-read-buffer-size" yaml:"daprHTTPReadBufferSize" default:"-1"` HTTPReadBufferSize int `arg:"dapr-http-read-buffer-size" annotation:"dapr.io/http-read-buffer-size" yaml:"daprHTTPReadBufferSize" default:"-1"`
EnableAppHealth bool `arg:"enable-app-health-check" yaml:"enableAppHealthCheck"` EnableAppHealth bool `arg:"enable-app-health-check" annotation:"dapr.io/enable-app-health-check" yaml:"enableAppHealthCheck"`
AppHealthPath string `arg:"app-health-check-path" yaml:"appHealthCheckPath"` AppHealthPath string `arg:"app-health-check-path" annotation:"dapr.io/app-health-check-path" yaml:"appHealthCheckPath"`
AppHealthInterval int `arg:"app-health-probe-interval" ifneq:"0" yaml:"appHealthProbeInterval"` AppHealthInterval int `arg:"app-health-probe-interval" annotation:"dapr.io/app-health-probe-interval" ifneq:"0" yaml:"appHealthProbeInterval"`
AppHealthTimeout int `arg:"app-health-probe-timeout" ifneq:"0" yaml:"appHealthProbeTimeout"` AppHealthTimeout int `arg:"app-health-probe-timeout" annotation:"dapr.io/app-health-probe-timeout" ifneq:"0" yaml:"appHealthProbeTimeout"`
AppHealthThreshold int `arg:"app-health-threshold" ifneq:"0" yaml:"appHealthThreshold"` AppHealthThreshold int `arg:"app-health-threshold" annotation:"dapr.io/app-health-threshold" ifneq:"0" yaml:"appHealthThreshold"`
EnableAPILogging bool `arg:"enable-api-logging" yaml:"enableApiLogging"` EnableAPILogging bool `arg:"enable-api-logging" annotation:"dapr.io/enable-api-logging" yaml:"enableApiLogging"`
// Specifically omitted from annotations see https://github.com/dapr/cli/issues/1324 .
DaprdInstallPath string `yaml:"runtimePath"` DaprdInstallPath string `yaml:"runtimePath"`
Env map[string]string `yaml:"env"` Env map[string]string `yaml:"env"`
DaprdLogDestination LogDestType `yaml:"daprdLogDestination"` DaprdLogDestination LogDestType `yaml:"daprdLogDestination"`
@ -213,6 +220,36 @@ func (config *RunConfig) Validate() error {
return nil return nil
} }
func (config *RunConfig) ValidateK8s() error {
meta, err := newDaprMeta()
if err != nil {
return err
}
if config.AppID == "" {
config.AppID = meta.newAppID()
}
if config.AppPort < 0 {
config.AppPort = 0
}
err = config.validatePort("MetricsPort", &config.MetricsPort, meta)
if err != nil {
return err
}
if config.MaxConcurrency < 1 {
config.MaxConcurrency = -1
}
if config.MaxRequestBodySize < 0 {
config.MaxRequestBodySize = -1
}
if config.HTTPReadBufferSize < 0 {
config.HTTPReadBufferSize = -1
}
return nil
}
type DaprMeta struct { type DaprMeta struct {
ExistingIDs map[string]bool ExistingIDs map[string]bool
ExistingPorts map[int]bool ExistingPorts map[int]bool
@ -409,6 +446,50 @@ func (config *RunConfig) getAppProtocol() string {
} }
} }
func (config *RunConfig) GetEnv() map[string]string {
env := map[string]string{}
schema := reflect.ValueOf(*config)
for i := 0; i < schema.NumField(); i++ {
valueField := schema.Field(i).Interface()
typeField := schema.Type().Field(i)
key := typeField.Tag.Get("env")
if len(key) == 0 {
continue
}
if value, ok := valueField.(int); ok && value <= 0 {
// ignore unset numeric variables.
continue
}
value := fmt.Sprintf("%v", reflect.ValueOf(valueField))
env[key] = value
}
for k, v := range config.Env {
env[k] = v
}
return env
}
func (config *RunConfig) GetAnnotations() map[string]string {
annotations := map[string]string{}
schema := reflect.ValueOf(*config)
for i := 0; i < schema.NumField(); i++ {
valueField := schema.Field(i).Interface()
typeField := schema.Type().Field(i)
key := typeField.Tag.Get("annotation")
if len(key) == 0 {
continue
}
if value, ok := valueField.(int); ok && value <= 0 {
// ignore unset numeric variables.
continue
}
value := fmt.Sprintf("%v", reflect.ValueOf(valueField))
annotations[key] = value
}
return annotations
}
func GetDaprCommand(config *RunConfig) (*exec.Cmd, error) { func GetDaprCommand(config *RunConfig) (*exec.Cmd, error) {
daprCMD, err := lookupBinaryFilePath(config.DaprdInstallPath, "daprd") daprCMD, err := lookupBinaryFilePath(config.DaprdInstallPath, "daprd")
if err != nil { if err != nil {

View File

@ -48,6 +48,10 @@ const (
numHAPods = 13 numHAPods = 13
numNonHAPods = 5 numNonHAPods = 5
thirdPartyDevNamespace = "default"
devRedisReleaseName = "dapr-dev-redis"
devZipkinReleaseName = "dapr-dev-zipkin"
) )
type VersionDetails struct { type VersionDetails struct {
@ -61,6 +65,7 @@ type VersionDetails struct {
} }
type TestOptions struct { type TestOptions struct {
DevEnabled bool
HAEnabled bool HAEnabled bool
MTLSEnabled bool MTLSEnabled bool
ApplyComponentChanges bool ApplyComponentChanges bool
@ -189,7 +194,7 @@ func GetTestsOnInstall(details VersionDetails, opts TestOptions) []TestCase {
func GetTestsOnUninstall(details VersionDetails, opts TestOptions) []TestCase { func GetTestsOnUninstall(details VersionDetails, opts TestOptions) []TestCase {
return []TestCase{ return []TestCase{
{"uninstall " + details.RuntimeVersion, uninstallTest(opts.UninstallAll)}, // waits for pod deletion. {"uninstall " + details.RuntimeVersion, uninstallTest(opts.UninstallAll, opts.DevEnabled)}, // waits for pod deletion.
{"cluster not exist", kubernetesTestOnUninstall()}, {"cluster not exist", kubernetesTestOnUninstall()},
{"crds exist on uninstall " + details.RuntimeVersion, CRDTest(details, opts)}, {"crds exist on uninstall " + details.RuntimeVersion, CRDTest(details, opts)},
{"clusterroles not exist " + details.RuntimeVersion, ClusterRolesTest(details, opts)}, {"clusterroles not exist " + details.RuntimeVersion, ClusterRolesTest(details, opts)},
@ -747,6 +752,10 @@ func installTest(details VersionDetails, opts TestOptions) func(t *testing.T) {
"-n", DaprTestNamespace, "-n", DaprTestNamespace,
"--log-as-json", "--log-as-json",
} }
if opts.DevEnabled {
t.Log("install dev mode")
args = append(args, "--dev")
}
if !details.UseDaprLatestVersion { if !details.UseDaprLatestVersion {
// TODO: Pass dashboard-version also when charts are released. // TODO: Pass dashboard-version also when charts are released.
args = append(args, "--runtime-version", details.RuntimeVersion) args = append(args, "--runtime-version", details.RuntimeVersion)
@ -776,10 +785,13 @@ func installTest(details VersionDetails, opts TestOptions) func(t *testing.T) {
require.NoError(t, err, "init failed") require.NoError(t, err, "init failed")
validatePodsOnInstallUpgrade(t, details) validatePodsOnInstallUpgrade(t, details)
if opts.DevEnabled {
validateThirdpartyPodsOnInit(t)
}
} }
} }
func uninstallTest(all bool) func(t *testing.T) { func uninstallTest(all bool, devEnabled bool) func(t *testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
output, err := EnsureUninstall(all) output, err := EnsureUninstall(all)
t.Log(output) t.Log(output)
@ -792,11 +804,23 @@ func uninstallTest(all bool) func(t *testing.T) {
go waitPodDeletion(t, done, podsDeleted) go waitPodDeletion(t, done, podsDeleted)
select { select {
case <-podsDeleted: case <-podsDeleted:
t.Log("pods were deleted as expected on uninstall") t.Log("dapr pods were deleted as expected on uninstall")
case <-time.After(2 * time.Minute):
done <- struct{}{}
t.Error("timeout verifying pods were deleted as expected")
return
}
if devEnabled {
t.Log("waiting for dapr dev pods to be deleted")
go waitPodDeletionDev(t, done, podsDeleted)
select {
case <-podsDeleted:
t.Log("dapr dev pods were deleted as expected on uninstall dev")
return return
case <-time.After(2 * time.Minute): case <-time.After(2 * time.Minute):
done <- struct{}{} done <- struct{}{}
t.Error("timeout verifying pods were deleted as expectedx") t.Error("timeout verifying pods were deleted as expected")
}
} }
} }
} }
@ -943,6 +967,41 @@ func httpEndpointOutputCheck(t *testing.T, output string) {
assert.Contains(t, output, "httpendpoint") assert.Contains(t, output, "httpendpoint")
} }
func validateThirdpartyPodsOnInit(t *testing.T) {
ctx := context.Background()
ctxt, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
k8sClient, err := getClient()
require.NoError(t, err)
list, err := k8sClient.CoreV1().Pods(thirdPartyDevNamespace).List(ctxt, v1.ListOptions{
Limit: 100,
})
require.NoError(t, err)
notFound := map[string]struct{}{
devRedisReleaseName: {},
devZipkinReleaseName: {},
}
prefixes := map[string]string{
devRedisReleaseName: "dapr-dev-redis-master-",
devZipkinReleaseName: "dapr-dev-zipkin-",
}
for _, pod := range list.Items {
t.Log(pod.ObjectMeta.Name)
for component, prefix := range prefixes {
if pod.Status.Phase != core_v1.PodRunning {
continue
}
if !pod.Status.ContainerStatuses[0].Ready {
continue
}
if strings.HasPrefix(pod.ObjectMeta.Name, prefix) {
delete(notFound, component)
}
}
}
assert.Empty(t, notFound)
}
func validatePodsOnInstallUpgrade(t *testing.T, details VersionDetails) { func validatePodsOnInstallUpgrade(t *testing.T, details VersionDetails) {
ctx := context.Background() ctx := context.Background()
ctxt, cancel := context.WithTimeout(ctx, 10*time.Second) ctxt, cancel := context.WithTimeout(ctx, 10*time.Second)
@ -1010,6 +1069,52 @@ func validatePodsOnInstallUpgrade(t *testing.T, details VersionDetails) {
assert.Empty(t, notFound) assert.Empty(t, notFound)
} }
func waitPodDeletionDev(t *testing.T, done, podsDeleted chan struct{}) {
for {
select {
case <-done: // if timeout was reached.
return
default:
break
}
ctx := context.Background()
ctxt, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
k8sClient, err := getClient()
require.NoError(t, err, "error getting k8s client for pods check")
list, err := k8sClient.CoreV1().Pods(thirdPartyDevNamespace).List(ctxt, v1.ListOptions{
Limit: 100,
})
require.NoError(t, err)
found := map[string]struct{}{
devRedisReleaseName: {},
devZipkinReleaseName: {},
}
prefixes := map[string]string{
devRedisReleaseName: "dapr-dev-redis-master-",
devZipkinReleaseName: "dapr-dev-zipkin-",
}
for _, pod := range list.Items {
t.Log(pod.ObjectMeta.Name)
for component, prefix := range prefixes {
if pod.Status.Phase != core_v1.PodRunning {
continue
}
if !pod.Status.ContainerStatuses[0].Ready {
continue
}
if strings.HasPrefix(pod.ObjectMeta.Name, prefix) {
delete(found, component)
}
}
}
if len(found) == 2 {
podsDeleted <- struct{}{}
}
time.Sleep(15 * time.Second)
}
}
func waitPodDeletion(t *testing.T, done, podsDeleted chan struct{}) { func waitPodDeletion(t *testing.T, done, podsDeleted chan struct{}) {
for { for {
select { select {

View File

@ -118,6 +118,39 @@ func TestKubernetesHAModeMTLSDisabled(t *testing.T) {
} }
} }
func TestKubernetesDev(t *testing.T) {
// ensure clean env for test
ensureCleanEnv(t, false)
// setup tests
tests := []common.TestCase{}
tests = append(tests, common.GetTestsOnInstall(currentVersionDetails, common.TestOptions{
DevEnabled: true,
HAEnabled: false,
MTLSEnabled: true,
ApplyComponentChanges: true,
CheckResourceExists: map[common.Resource]bool{
common.CustomResourceDefs: true,
common.ClusterRoles: true,
common.ClusterRoleBindings: true,
},
})...)
tests = append(tests, common.GetTestsOnUninstall(currentVersionDetails, common.TestOptions{
DevEnabled: true,
CheckResourceExists: map[common.Resource]bool{
common.CustomResourceDefs: true,
common.ClusterRoles: false,
common.ClusterRoleBindings: false,
},
})...)
// execute tests
for _, tc := range tests {
t.Run(tc.Name, tc.Callable)
}
}
func TestKubernetesNonHAModeMTLSEnabled(t *testing.T) { func TestKubernetesNonHAModeMTLSEnabled(t *testing.T) {
// ensure clean env for test // ensure clean env for test
ensureCleanEnv(t, false) ensureCleanEnv(t, false)