cli/cmd/run.go

338 lines
12 KiB
Go

// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation and Dapr Contributors.
// Licensed under the MIT License.
// ------------------------------------------------------------
package cmd
import (
"bufio"
"fmt"
"os"
"runtime"
"strconv"
"strings"
"time"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/dapr/cli/pkg/metadata"
"github.com/dapr/cli/pkg/print"
"github.com/dapr/cli/pkg/standalone"
"github.com/dapr/cli/utils"
)
var (
appPort int
profilePort int
appID string
configFile string
port int
grpcPort int
maxConcurrency int
enableProfiling bool
logLevel string
protocol string
componentsPath string
appSSL bool
metricsPort int
maxRequestBodySize int
unixDomainSocket string
)
const (
runtimeWaitTimeoutInSeconds = 60
)
var RunCmd = &cobra.Command{
Use: "run",
Short: "Run Dapr and (optionally) your application side by side. Supported platforms: Self-hosted",
Example: `
# Run a .NET application:
dapr run --app-id myapp --app-port 5000 -- dotnet run
# Run a Java application:
dapr run --app-id myapp -- java -jar myapp.jar
# Run a NodeJs application that listens to port 3000:
dapr run --app-id myapp --app-port 3000 -- node myapp.js
# Run a Python application:
dapr run --app-id myapp -- python myapp.py
# Run sidecar only:
dapr run --app-id myapp
`,
Args: cobra.MinimumNArgs(0),
PreRun: func(cmd *cobra.Command, args []string) {
viper.BindPFlag("placement-host-address", cmd.Flags().Lookup("placement-host-address"))
},
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
fmt.Println(print.WhiteBold("WARNING: no application command found."))
}
if unixDomainSocket != "" {
// TODO(@daixiang0): add Windows support
if runtime.GOOS == "windows" {
print.FailureStatusEvent(os.Stderr, "The unix-domain-socket option is not supported on Windows")
os.Exit(1)
} else {
// use unix domain socket means no port any more
print.WarningStatusEvent(os.Stdout, "Unix domain sockets are currently a preview feature")
port = 0
grpcPort = 0
}
}
output, err := standalone.Run(&standalone.RunConfig{
AppID: appID,
AppPort: appPort,
HTTPPort: port,
GRPCPort: grpcPort,
ConfigFile: configFile,
Arguments: args,
EnableProfiling: enableProfiling,
ProfilePort: profilePort,
LogLevel: logLevel,
MaxConcurrency: maxConcurrency,
Protocol: protocol,
PlacementHostAddr: viper.GetString("placement-host-address"),
ComponentsPath: componentsPath,
AppSSL: appSSL,
MetricsPort: metricsPort,
MaxRequestBodySize: maxRequestBodySize,
UnixDomainSocket: unixDomainSocket,
})
if err != nil {
print.FailureStatusEvent(os.Stderr, err.Error())
return
}
sigCh := make(chan os.Signal, 1)
setupShutdownNotify(sigCh)
daprRunning := make(chan bool, 1)
appRunning := make(chan bool, 1)
go func() {
var startInfo string
if unixDomainSocket != "" {
startInfo = fmt.Sprintf(
"Starting Dapr with id %s. HTTP Socket: %v. gRPC Socket: %v.",
output.AppID,
utils.GetSocket(unixDomainSocket, output.AppID, "http"),
utils.GetSocket(unixDomainSocket, output.AppID, "grpc"))
} else {
startInfo = fmt.Sprintf(
"Starting Dapr with id %s. HTTP Port: %v. gRPC Port: %v",
output.AppID,
output.DaprHTTPPort,
output.DaprGRPCPort)
}
print.InfoStatusEvent(os.Stdout, startInfo)
output.DaprCMD.Stdout = os.Stdout
output.DaprCMD.Stderr = os.Stderr
err = output.DaprCMD.Start()
if err != nil {
print.FailureStatusEvent(os.Stderr, err.Error())
os.Exit(1)
}
go func() {
daprdErr := output.DaprCMD.Wait()
if daprdErr != nil {
print.FailureStatusEvent(os.Stderr, "The daprd process exited with error code: %s", daprdErr.Error())
} else {
print.SuccessStatusEvent(os.Stdout, "Exited Dapr successfully")
}
sigCh <- os.Interrupt
}()
if appPort <= 0 {
// If app does not listen to port, we can check for Dapr's sidecar health before starting the app.
// Otherwise, it creates a deadlock.
sidecarUp := true
if unixDomainSocket != "" {
httpSocket := utils.GetSocket(unixDomainSocket, output.AppID, "http")
print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on HTTP socket %v", httpSocket)
err = utils.IsDaprListeningOnSocket(httpSocket, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second)
if err != nil {
sidecarUp = false
print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on HTTP socket: %s", err.Error())
}
grpcSocket := utils.GetSocket(unixDomainSocket, output.AppID, "grpc")
print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on GRPC socket %v", grpcSocket)
err = utils.IsDaprListeningOnSocket(grpcSocket, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second)
if err != nil {
sidecarUp = false
print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on GRPC socket: %s", err.Error())
}
} else {
print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on HTTP port %v", output.DaprHTTPPort)
err = utils.IsDaprListeningOnPort(output.DaprHTTPPort, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second)
if err != nil {
sidecarUp = false
print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on HTTP port: %s", err.Error())
}
print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on GRPC port %v", output.DaprGRPCPort)
err = utils.IsDaprListeningOnPort(output.DaprGRPCPort, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second)
if err != nil {
sidecarUp = false
print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on GRPC port: %s", err.Error())
}
}
if sidecarUp {
print.InfoStatusEvent(os.Stdout, "Dapr sidecar is up and running.")
} else {
print.WarningStatusEvent(os.Stdout, "Dapr sidecar might not be responding.")
}
}
daprRunning <- true
}()
<-daprRunning
go func() {
if output.AppCMD == nil {
appRunning <- true
return
}
stdErrPipe, pipeErr := output.AppCMD.StderrPipe()
if pipeErr != nil {
print.FailureStatusEvent(os.Stderr, fmt.Sprintf("Error creating stderr for App: %s", err.Error()))
appRunning <- false
return
}
stdOutPipe, pipeErr := output.AppCMD.StdoutPipe()
if pipeErr != nil {
print.FailureStatusEvent(os.Stderr, fmt.Sprintf("Error creating stdout for App: %s", err.Error()))
appRunning <- false
return
}
errScanner := bufio.NewScanner(stdErrPipe)
outScanner := bufio.NewScanner(stdOutPipe)
go func() {
for errScanner.Scan() {
fmt.Println(print.Blue(fmt.Sprintf("== APP == %s", errScanner.Text())))
}
}()
go func() {
for outScanner.Scan() {
fmt.Println(print.Blue(fmt.Sprintf("== APP == %s", outScanner.Text())))
}
}()
err = output.AppCMD.Start()
if err != nil {
print.FailureStatusEvent(os.Stderr, err.Error())
appRunning <- false
return
}
go func() {
appErr := output.AppCMD.Wait()
if appErr != nil {
print.FailureStatusEvent(os.Stderr, "The App process exited with error code: %s", appErr.Error())
} else {
print.SuccessStatusEvent(os.Stdout, "Exited App successfully")
}
sigCh <- os.Interrupt
}()
appRunning <- true
}()
appRunStatus := <-appRunning
if !appRunStatus {
// Start App failed, try to stop Dapr and exit.
err = output.DaprCMD.Process.Kill()
if err != nil {
print.FailureStatusEvent(os.Stderr, fmt.Sprintf("Start App failed, try to stop Dapr Error: %s", err))
} else {
print.SuccessStatusEvent(os.Stdout, "Start App failed, try to stop Dapr successfully")
}
os.Exit(1)
}
// Metadata API is only available if app has started listening to port, so wait for app to start before calling metadata API.
err = metadata.Put(output.DaprHTTPPort, "cliPID", strconv.Itoa(os.Getpid()), appID, unixDomainSocket)
if err != nil {
print.WarningStatusEvent(os.Stdout, "Could not update sidecar metadata for cliPID: %s", err.Error())
}
if output.AppCMD != nil {
appCommand := strings.Join(args, " ")
print.InfoStatusEvent(os.Stdout, fmt.Sprintf("Updating metadata for app command: %s", appCommand))
err = metadata.Put(output.DaprHTTPPort, "appCommand", appCommand, appID, unixDomainSocket)
if err != nil {
print.WarningStatusEvent(os.Stdout, "Could not update sidecar metadata for appCommand: %s", err.Error())
} else {
print.SuccessStatusEvent(os.Stdout, "You're up and running! Both Dapr and your app logs will appear here.\n")
}
} else {
print.SuccessStatusEvent(os.Stdout, "You're up and running! Dapr logs will appear here.\n")
}
<-sigCh
print.InfoStatusEvent(os.Stdout, "\nterminated signal received: shutting down")
if output.DaprCMD.ProcessState == nil || !output.DaprCMD.ProcessState.Exited() {
err = output.DaprCMD.Process.Kill()
if err != nil {
print.FailureStatusEvent(os.Stderr, fmt.Sprintf("Error exiting Dapr: %s", err))
} else {
print.SuccessStatusEvent(os.Stdout, "Exited Dapr successfully")
}
}
if output.AppCMD != nil && (output.AppCMD.ProcessState == nil || !output.AppCMD.ProcessState.Exited()) {
err = output.AppCMD.Process.Kill()
if err != nil {
print.FailureStatusEvent(os.Stderr, fmt.Sprintf("Error exiting App: %s", err))
} else {
print.SuccessStatusEvent(os.Stdout, "Exited App successfully")
}
}
if unixDomainSocket != "" {
for _, s := range []string{"http", "grpc"} {
os.Remove(utils.GetSocket(unixDomainSocket, output.AppID, s))
}
}
},
}
func init() {
RunCmd.Flags().IntVarP(&appPort, "app-port", "p", -1, "The port your application is listening on")
RunCmd.Flags().StringVarP(&appID, "app-id", "a", "", "The id for your application, used for service discovery")
RunCmd.Flags().StringVarP(&configFile, "config", "c", standalone.DefaultConfigFilePath(), "Dapr configuration file")
RunCmd.Flags().IntVarP(&port, "dapr-http-port", "H", -1, "The HTTP port for Dapr to listen on")
RunCmd.Flags().IntVarP(&grpcPort, "dapr-grpc-port", "G", -1, "The gRPC port for Dapr to listen on")
RunCmd.Flags().BoolVar(&enableProfiling, "enable-profiling", false, "Enable pprof profiling via an HTTP endpoint")
RunCmd.Flags().IntVarP(&profilePort, "profile-port", "", -1, "The port for the profile server to listen on")
RunCmd.Flags().StringVarP(&logLevel, "log-level", "", "info", "The log verbosity. Valid values are: debug, info, warn, error, fatal, or panic")
RunCmd.Flags().IntVarP(&maxConcurrency, "app-max-concurrency", "", -1, "The concurrency level of the application, otherwise is unlimited")
RunCmd.Flags().StringVarP(&protocol, "app-protocol", "P", "http", "The protocol (gRPC or HTTP) Dapr uses to talk to the application")
RunCmd.Flags().StringVarP(&componentsPath, "components-path", "d", standalone.DefaultComponentsDirPath(), "The path for components directory")
RunCmd.Flags().String("placement-host-address", "localhost", "The address of the placement service. Format is either <hostname> for default port or <hostname>:<port> for custom port")
RunCmd.Flags().BoolVar(&appSSL, "app-ssl", false, "Enable https when Dapr invokes the application")
RunCmd.Flags().IntVarP(&metricsPort, "metrics-port", "M", -1, "The port of metrics on dapr")
RunCmd.Flags().BoolP("help", "h", false, "Print this help message")
RunCmd.Flags().IntVarP(&maxRequestBodySize, "dapr-http-max-request-size", "", -1, "Max size of request body in MB")
RunCmd.Flags().StringVarP(&unixDomainSocket, "unix-domain-socket", "u", "", "Path to a unix domain socket dir. If specified, Dapr API servers will use Unix Domain Sockets")
RootCmd.AddCommand(RunCmd)
}