From 4d75238be8fb9c1f2589e6ce3e3d2e1b0f9d21c2 Mon Sep 17 00:00:00 2001 From: Long Dai Date: Thu, 28 Oct 2021 04:45:34 +0800 Subject: [PATCH] runtime: support unix domain socket (#788) * runtime: support unix domain socket Signed-off-by: Long * feedback Signed-off-by: Long * update example Signed-off-by: Long --- README.md | 16 ++ cmd/invoke.go | 15 +- cmd/publish.go | 13 +- cmd/run.go | 84 +++++++-- pkg/metadata/metadata.go | 46 ++++- pkg/standalone/client.go | 4 +- pkg/standalone/dashboard.go | 2 +- pkg/standalone/invoke.go | 17 +- pkg/standalone/invoke_test.go | 221 +++++++++++++++--------- pkg/standalone/list.go | 3 +- pkg/standalone/publish.go | 30 +++- pkg/standalone/publish_test.go | 65 ++++--- pkg/standalone/run.go | 10 +- pkg/standalone/testutils.go | 31 ++++ tests/e2e/standalone/standalone_test.go | 194 +++++++++++---------- utils/utils.go | 26 +++ 16 files changed, 546 insertions(+), 231 deletions(-) diff --git a/README.md b/README.md index 47996fcd..a2837117 100644 --- a/README.md +++ b/README.md @@ -523,6 +523,22 @@ To generate shell completion scripts: $ dapr completion ``` +### Enable Unix domain socket + +In order to enable Unix domain socket to connect Dapr API server, use the `--unix-domain-socket` flag: + +``` +$ dapr run --app-id nodeapp --unix-domain-socket node app.js +``` + +Dapr will automatically create a Unix domain socket to connect Dapr API server. + +If you want to invoke your app, also use this flag: + +``` +$ dapr invoke --app-id nodeapp --unix-domain-socket --method mymethod +``` + For more details, please run the command and check the examples to apply to your shell. ## Reference for the Dapr CLI diff --git a/cmd/invoke.go b/cmd/invoke.go index 08cb709e..3aac9a90 100644 --- a/cmd/invoke.go +++ b/cmd/invoke.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "net/http" "os" + "runtime" "github.com/dapr/cli/pkg/print" "github.com/dapr/cli/pkg/standalone" @@ -24,6 +25,7 @@ var ( invokeData string invokeVerb string invokeDataFile string + invokeSocket string ) var InvokeCmd = &cobra.Command{ @@ -35,6 +37,9 @@ dapr invoke --app-id target --method sample --data '{"key":"value"} # Invoke a sample method on target app with GET Verb dapr invoke --app-id target --method sample --verb GET + +# Invoke a sample method on target app with GET Verb using Unix domain socket +dapr invoke --unix-domain-socket --app-id target --method sample --verb GET `, Run: func(cmd *cobra.Command, args []string) { bytePayload := []byte{} @@ -54,7 +59,14 @@ dapr invoke --app-id target --method sample --verb GET bytePayload = []byte(invokeData) } client := standalone.NewClient() - response, err := client.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb) + + // TODO(@daixiang0): add Windows support + if runtime.GOOS == "windows" && invokeSocket != "" { + print.FailureStatusEvent(os.Stderr, "unix-domain-socket option still does not support Windows!") + os.Exit(1) + } + + response, err := client.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb, invokeSocket) if err != nil { err = fmt.Errorf("error invoking app %s: %s", invokeAppID, err) print.FailureStatusEvent(os.Stderr, err.Error()) @@ -75,6 +87,7 @@ func init() { InvokeCmd.Flags().StringVarP(&invokeVerb, "verb", "v", defaultHTTPVerb, "The HTTP verb to use") InvokeCmd.Flags().StringVarP(&invokeDataFile, "data-file", "f", "", "A file containing the JSON serialized data (optional)") InvokeCmd.Flags().BoolP("help", "h", false, "Print this help message") + InvokeCmd.Flags().StringVarP(&invokeSocket, "unix-domain-socket", "u", "", "Path to a unix domain socket dir. If specified, Dapr API servers will use Unix Domain Sockets") InvokeCmd.MarkFlagRequired("app-id") InvokeCmd.MarkFlagRequired("method") RootCmd.AddCommand(InvokeCmd) diff --git a/cmd/publish.go b/cmd/publish.go index d6df0e44..9a3d01d6 100644 --- a/cmd/publish.go +++ b/cmd/publish.go @@ -9,6 +9,7 @@ import ( "fmt" "io/ioutil" "os" + "runtime" "github.com/dapr/cli/pkg/print" "github.com/dapr/cli/pkg/standalone" @@ -21,6 +22,7 @@ var ( publishTopic string publishPayload string publishPayloadFile string + publishSocket string ) var PublishCmd = &cobra.Command{ @@ -29,6 +31,9 @@ var PublishCmd = &cobra.Command{ Example: ` # Publish to sample topic in target pubsub via a publishing app dapr publish --publish-app-id myapp --pubsub target --topic sample --data '{"key":"value"}' + +# Publish to sample topic in target pubsub via a publishing app using Unix domain socket +dapr publish --enable-domain-socket --publish-app-id myapp --pubsub target --topic sample --data '{"key":"value"}' `, Run: func(cmd *cobra.Command, args []string) { bytePayload := []byte{} @@ -49,7 +54,12 @@ dapr publish --publish-app-id myapp --pubsub target --topic sample --data '{"key } client := standalone.NewClient() - err = client.Publish(publishAppID, pubsubName, publishTopic, bytePayload) + // TODO(@daixiang0): add Windows support + if runtime.GOOS == "windows" && publishSocket != "" { + print.FailureStatusEvent(os.Stderr, "unix-domain-socket option still does not support Windows!") + os.Exit(1) + } + err = client.Publish(publishAppID, pubsubName, publishTopic, bytePayload, publishSocket) if err != nil { print.FailureStatusEvent(os.Stderr, fmt.Sprintf("Error publishing topic %s: %s", publishTopic, err)) os.Exit(1) @@ -65,6 +75,7 @@ func init() { PublishCmd.Flags().StringVarP(&publishTopic, "topic", "t", "", "The topic to be published to") PublishCmd.Flags().StringVarP(&publishPayload, "data", "d", "", "The JSON serialized data string (optional)") PublishCmd.Flags().StringVarP(&publishPayloadFile, "data-file", "f", "", "A file containing the JSON serialized data (optional)") + PublishCmd.Flags().StringVarP(&publishSocket, "unix-domain-socket", "u", "", "Path to a unix domain socket dir. If specified, Dapr API servers will use Unix Domain Sockets") PublishCmd.Flags().BoolP("help", "h", false, "Print this help message") PublishCmd.MarkFlagRequired("publish-app-id") PublishCmd.MarkFlagRequired("topic") diff --git a/cmd/run.go b/cmd/run.go index 27a48c75..32222c1f 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -9,6 +9,7 @@ import ( "bufio" "fmt" "os" + "runtime" "strconv" "strings" "time" @@ -36,6 +37,7 @@ var ( appSSL bool metricsPort int maxRequestBodySize int + unixDomainSocket string ) const ( @@ -66,6 +68,18 @@ var RunCmd = &cobra.Command{ fmt.Println(print.WhiteBold("WARNING: no application command found.")) } + if unixDomainSocket != "" { + // TODO(@daixiang0): add Windows support + if runtime.GOOS == "windows" { + print.FailureStatusEvent(os.Stderr, "unix-domain-socket option still does not support Windows!") + os.Exit(1) + } else { + // use unix domain socket means no port any more + port = 0 + grpcPort = 0 + } + } + output, err := standalone.Run(&standalone.RunConfig{ AppID: appID, AppPort: appPort, @@ -83,6 +97,7 @@ var RunCmd = &cobra.Command{ AppSSL: appSSL, MetricsPort: metricsPort, MaxRequestBodySize: maxRequestBodySize, + UnixDomainSocket: unixDomainSocket, }) if err != nil { print.FailureStatusEvent(os.Stderr, err.Error()) @@ -96,13 +111,21 @@ var RunCmd = &cobra.Command{ appRunning := make(chan bool, 1) go func() { - print.InfoStatusEvent( - os.Stdout, - fmt.Sprintf( + var startInfo string + if unixDomainSocket != "" { + startInfo = fmt.Sprintf( + "Starting Dapr with id %s. HTTP Socket: %v. gRPC Socket: %v.", + output.AppID, + utils.GetSocket(unixDomainSocket, output.AppID, "http"), + utils.GetSocket(unixDomainSocket, output.AppID, "grpc")) + } else { + startInfo = fmt.Sprintf( "Starting Dapr with id %s. HTTP Port: %v. gRPC Port: %v", output.AppID, output.DaprHTTPPort, - output.DaprGRPCPort)) + output.DaprGRPCPort) + } + print.InfoStatusEvent(os.Stdout, startInfo) output.DaprCMD.Stdout = os.Stdout output.DaprCMD.Stderr = os.Stderr @@ -129,18 +152,38 @@ var RunCmd = &cobra.Command{ // If app does not listen to port, we can check for Dapr's sidecar health before starting the app. // Otherwise, it creates a deadlock. sidecarUp := true - print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on HTTP port %v", output.DaprHTTPPort) - err = utils.IsDaprListeningOnPort(output.DaprHTTPPort, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second) - if err != nil { - sidecarUp = false - print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on HTTP port: %s", err.Error()) - } - print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on GRPC port %v", output.DaprGRPCPort) - err = utils.IsDaprListeningOnPort(output.DaprGRPCPort, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second) - if err != nil { - sidecarUp = false - print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on GRPC port: %s", err.Error()) + if unixDomainSocket != "" { + httpSocket := utils.GetSocket(unixDomainSocket, output.AppID, "http") + print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on HTTP socket %v", httpSocket) + err = utils.IsDaprListeningOnSocket(httpSocket, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second) + if err != nil { + sidecarUp = false + print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on HTTP socket: %s", err.Error()) + } + + grpcSocket := utils.GetSocket(unixDomainSocket, output.AppID, "grpc") + print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on GRPC socket %v", grpcSocket) + err = utils.IsDaprListeningOnSocket(grpcSocket, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second) + if err != nil { + sidecarUp = false + print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on GRPC socket: %s", err.Error()) + } + + } else { + print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on HTTP port %v", output.DaprHTTPPort) + err = utils.IsDaprListeningOnPort(output.DaprHTTPPort, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second) + if err != nil { + sidecarUp = false + print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on HTTP port: %s", err.Error()) + } + + print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on GRPC port %v", output.DaprGRPCPort) + err = utils.IsDaprListeningOnPort(output.DaprGRPCPort, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second) + if err != nil { + sidecarUp = false + print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on GRPC port: %s", err.Error()) + } } if sidecarUp { @@ -223,7 +266,7 @@ var RunCmd = &cobra.Command{ } // Metadata API is only available if app has started listening to port, so wait for app to start before calling metadata API. - err = metadata.Put(output.DaprHTTPPort, "cliPID", strconv.Itoa(os.Getpid())) + err = metadata.Put(output.DaprHTTPPort, "cliPID", strconv.Itoa(os.Getpid()), appID, unixDomainSocket) if err != nil { print.WarningStatusEvent(os.Stdout, "Could not update sidecar metadata for cliPID: %s", err.Error()) } @@ -231,7 +274,7 @@ var RunCmd = &cobra.Command{ if output.AppCMD != nil { appCommand := strings.Join(args, " ") print.InfoStatusEvent(os.Stdout, fmt.Sprintf("Updating metadata for app command: %s", appCommand)) - err = metadata.Put(output.DaprHTTPPort, "appCommand", appCommand) + err = metadata.Put(output.DaprHTTPPort, "appCommand", appCommand, appID, unixDomainSocket) if err != nil { print.WarningStatusEvent(os.Stdout, "Could not update sidecar metadata for appCommand: %s", err.Error()) } else { @@ -261,6 +304,12 @@ var RunCmd = &cobra.Command{ print.SuccessStatusEvent(os.Stdout, "Exited App successfully") } } + + if unixDomainSocket != "" { + for _, s := range []string{"http", "grpc"} { + os.Remove(utils.GetSocket(unixDomainSocket, output.AppID, s)) + } + } }, } @@ -281,6 +330,7 @@ func init() { RunCmd.Flags().IntVarP(&metricsPort, "metrics-port", "M", -1, "The port of metrics on dapr") RunCmd.Flags().BoolP("help", "h", false, "Print this help message") RunCmd.Flags().IntVarP(&maxRequestBodySize, "dapr-http-max-request-size", "", -1, "Max size of request body in MB") + RunCmd.Flags().StringVarP(&unixDomainSocket, "unix-domain-socket", "u", "", "Path to a unix domain socket dir. If specified, Dapr API servers will use Unix Domain Sockets") RootCmd.AddCommand(RunCmd) } diff --git a/pkg/metadata/metadata.go b/pkg/metadata/metadata.go index 0afb1c2d..15ec0a39 100644 --- a/pkg/metadata/metadata.go +++ b/pkg/metadata/metadata.go @@ -6,21 +6,44 @@ package metadata import ( + "context" "encoding/json" "fmt" "io/ioutil" + "net" "net/http" + "os" "strings" "github.com/dapr/cli/pkg/api" + "github.com/dapr/cli/utils" + retryablehttp "github.com/hashicorp/go-retryablehttp" ) // Get retrieves the metadata of a given app's sidecar. -func Get(httpPort int) (*api.Metadata, error) { +func Get(httpPort int, appID, socket string) (*api.Metadata, error) { url := makeMetadataGetEndpoint(httpPort) - // nolint:gosec - r, err := http.Get(url) + + var httpc http.Client + if socket != "" { + fileInfo, err := os.Stat(socket) + if err != nil { + return nil, err + } + + if fileInfo.IsDir() { + socket = utils.GetSocket(socket, appID, "http") + } + + httpc.Transport = &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", socket) + }, + } + } + + r, err := httpc.Get(url) if err != nil { return nil, err } @@ -30,9 +53,18 @@ func Get(httpPort int) (*api.Metadata, error) { } // Put sets one metadata attribute on a given app's sidecar. -func Put(httpPort int, key, value string) error { +func Put(httpPort int, key, value, appID, socket string) error { client := retryablehttp.NewClient() client.Logger = nil + + if socket != "" { + client.HTTPClient.Transport = &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", utils.GetSocket(socket, appID, "http")) + }, + } + } + url := makeMetadataPutEndpoint(httpPort, key) req, err := retryablehttp.NewRequest("PUT", url, strings.NewReader(value)) @@ -50,10 +82,16 @@ func Put(httpPort int, key, value string) error { } func makeMetadataGetEndpoint(httpPort int) string { + if httpPort == 0 { + return fmt.Sprintf("http://unix/v%s/metadata", api.RuntimeAPIVersion) + } return fmt.Sprintf("http://127.0.0.1:%v/v%s/metadata", httpPort, api.RuntimeAPIVersion) } func makeMetadataPutEndpoint(httpPort int, key string) string { + if httpPort == 0 { + return fmt.Sprintf("http://unix/v%s/metadata/%s", api.RuntimeAPIVersion, key) + } return fmt.Sprintf("http://127.0.0.1:%v/v%s/metadata/%s", httpPort, api.RuntimeAPIVersion, key) } diff --git a/pkg/standalone/client.go b/pkg/standalone/client.go index 418fd662..2846ddb2 100644 --- a/pkg/standalone/client.go +++ b/pkg/standalone/client.go @@ -15,9 +15,9 @@ type daprProcess struct { // Client is the interface the wraps all the methods exposed by the Dapr CLI. type Client interface { // Invoke is a command to invoke a remote or local dapr instance - Invoke(appID, method string, data []byte, verb string) (string, error) + Invoke(appID, method string, data []byte, verb string, socket string) (string, error) // Publish is used to publish event to a topic in a pubsub for an app ID. - Publish(publishAppID, pubsubName, topic string, payload []byte) error + Publish(publishAppID, pubsubName, topic string, payload []byte, socket string) error } type Standalone struct { diff --git a/pkg/standalone/dashboard.go b/pkg/standalone/dashboard.go index bcf68738..9dfa2ace 100644 --- a/pkg/standalone/dashboard.go +++ b/pkg/standalone/dashboard.go @@ -18,7 +18,7 @@ func NewDashboardCmd(port int) *exec.Cmd { // Use the default binary install location dashboardPath := defaultDaprBinPath() binaryName := "dashboard" - if runtime.GOOS == "windows" { + if runtime.GOOS == daprWindowsOS { binaryName = "dashboard.exe" } diff --git a/pkg/standalone/invoke.go b/pkg/standalone/invoke.go index 373c4bcf..ed418877 100644 --- a/pkg/standalone/invoke.go +++ b/pkg/standalone/invoke.go @@ -7,15 +7,18 @@ package standalone import ( "bytes" + "context" "fmt" "io/ioutil" + "net" "net/http" "github.com/dapr/cli/pkg/api" + "github.com/dapr/cli/utils" ) // Invoke is a command to invoke a remote or local dapr instance. -func (s *Standalone) Invoke(appID, method string, data []byte, verb string) (string, error) { +func (s *Standalone) Invoke(appID, method string, data []byte, verb string, path string) (string, error) { list, err := s.process.List() if err != nil { return "", err @@ -30,7 +33,17 @@ func (s *Standalone) Invoke(appID, method string, data []byte, verb string) (str } req.Header.Set("Content-Type", "application/json") - r, err := http.DefaultClient.Do(req) + var httpc http.Client + + if path != "" { + httpc.Transport = &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", utils.GetSocket(path, appID, "http")) + }, + } + } + + r, err := httpc.Do(req) if err != nil { return "", err } diff --git a/pkg/standalone/invoke_test.go b/pkg/standalone/invoke_test.go index dfb74e98..f0a4d8d1 100644 --- a/pkg/standalone/invoke_test.go +++ b/pkg/standalone/invoke_test.go @@ -6,8 +6,12 @@ package standalone import ( + "fmt" + "os" + "runtime" "testing" + "github.com/dapr/cli/utils" "github.com/stretchr/testify/assert" ) @@ -60,91 +64,144 @@ func TestInvoke(t *testing.T) { }, } - for _, tc := range testCases { - t.Run(tc.name+" get", func(t *testing.T) { - ts, port := getTestServer(tc.expectedPath, tc.resp) - ts.Start() - defer ts.Close() - tc.lo.HTTPPort = port - client := &Standalone{ - process: &mockDaprProcess{ - Lo: []ListOutput{ - tc.lo, + for _, socket := range []string{"", "/tmp"} { + // TODO(@daixiang0): add Windows support + if runtime.GOOS == "windows" && socket != "" { + continue + } + for _, tc := range testCases { + t.Run(fmt.Sprintf("%s get, socket: %v", tc.name, socket), func(t *testing.T) { + if socket != "" { + ts, l := getTestSocketServer(tc.expectedPath, tc.resp, tc.appID, socket) + go ts.Serve(l) + defer func() { + l.Close() + for _, protocol := range []string{"http", "grpc"} { + os.Remove(utils.GetSocket(socket, tc.appID, protocol)) + } + }() + } else { + ts, port := getTestServer(tc.expectedPath, tc.resp) + ts.Start() + defer ts.Close() + tc.lo.HTTPPort = port + } + + client := &Standalone{ + process: &mockDaprProcess{ + Lo: []ListOutput{ + tc.lo, + }, + Err: tc.listErr, }, - Err: tc.listErr, - }, - } - res, err := client.Invoke(tc.appID, tc.method, []byte{}, "GET") - if tc.errorExpected { - assert.Error(t, err, "expected an error") - assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") - } else { - assert.NoError(t, err, "expected no error") - assert.Equal(t, tc.resp, res, "expected response to match") - } - }) + } - t.Run(tc.name+" post", func(t *testing.T) { - ts, port := getTestServer(tc.expectedPath, tc.resp) - ts.Start() - defer ts.Close() - tc.lo.HTTPPort = port - client := &Standalone{ - process: &mockDaprProcess{ - Lo: []ListOutput{tc.lo}, - Err: tc.listErr, - }, - } - res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "POST") - if tc.errorExpected { - assert.Error(t, err, "expected an error") - assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") - } else { - assert.NoError(t, err, "expected no error") - assert.Equal(t, tc.resp, res, "expected response to match") - } - }) + res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "GET", socket) + if tc.errorExpected { + assert.Error(t, err, "expected an error") + assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") + } else { + assert.NoError(t, err, "expected no error") + assert.Equal(t, tc.resp, res, "expected response to match") + } + }) - t.Run(tc.name+" delete", func(t *testing.T) { - ts, port := getTestServer(tc.expectedPath, tc.resp) - ts.Start() - defer ts.Close() - tc.lo.HTTPPort = port - client := &Standalone{ - process: &mockDaprProcess{ - Lo: []ListOutput{tc.lo}, - Err: tc.listErr, - }, - } - res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "DELETE") - if tc.errorExpected { - assert.Error(t, err, "expected an error") - assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") - } else { - assert.NoError(t, err, "expected no error") - assert.Equal(t, tc.resp, res, "expected response to match") - } - }) + t.Run(fmt.Sprintf("%s post, socket: %v", tc.name, socket), func(t *testing.T) { + if socket != "" { + ts, l := getTestSocketServer(tc.expectedPath, tc.resp, tc.appID, socket) + go ts.Serve(l) + defer func() { + l.Close() + for _, protocol := range []string{"http", "grpc"} { + os.Remove(utils.GetSocket(socket, tc.appID, protocol)) + } + }() + } else { + ts, port := getTestServer(tc.expectedPath, tc.resp) + ts.Start() + defer ts.Close() + tc.lo.HTTPPort = port + } + client := &Standalone{ + process: &mockDaprProcess{ + Lo: []ListOutput{tc.lo}, + Err: tc.listErr, + }, + } + res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "POST", socket) + if tc.errorExpected { + assert.Error(t, err, "expected an error") + assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") + } else { + assert.NoError(t, err, "expected no error") + assert.Equal(t, tc.resp, res, "expected response to match") + } + }) - t.Run(tc.name+" put", func(t *testing.T) { - ts, port := getTestServer(tc.expectedPath, tc.resp) - ts.Start() - defer ts.Close() - tc.lo.HTTPPort = port - client := &Standalone{ - process: &mockDaprProcess{ - Lo: []ListOutput{tc.lo}, - Err: tc.listErr, - }, - } - res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "PUT") - if tc.errorExpected { - assert.Error(t, err, "expected an error") - assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") - } else { - assert.NoError(t, err, "expected no error") - assert.Equal(t, tc.resp, res, "expected response to match") - } - }) + t.Run(fmt.Sprintf("%s delete, socket: %v", tc.name, socket), func(t *testing.T) { + if socket != "" { + ts, l := getTestSocketServer(tc.expectedPath, tc.resp, tc.appID, socket) + go ts.Serve(l) + defer func() { + l.Close() + for _, protocol := range []string{"http", "grpc"} { + os.Remove(utils.GetSocket(socket, tc.appID, protocol)) + } + }() + } else { + ts, port := getTestServer(tc.expectedPath, tc.resp) + ts.Start() + defer ts.Close() + tc.lo.HTTPPort = port + } + client := &Standalone{ + process: &mockDaprProcess{ + Lo: []ListOutput{tc.lo}, + Err: tc.listErr, + }, + } + res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "DELETE", socket) + if tc.errorExpected { + assert.Error(t, err, "expected an error") + assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") + } else { + assert.NoError(t, err, "expected no error") + assert.Equal(t, tc.resp, res, "expected response to match") + } + }) + + t.Run(fmt.Sprintf("%s put, socket: %v", tc.name, socket), func(t *testing.T) { + if socket != "" { + ts, l := getTestSocketServer(tc.expectedPath, tc.resp, tc.appID, socket) + go ts.Serve(l) + defer func() { + l.Close() + for _, protocol := range []string{"http", "grpc"} { + os.Remove(utils.GetSocket(socket, tc.appID, protocol)) + } + }() + } else { + ts, port := getTestServer(tc.expectedPath, tc.resp) + ts.Start() + defer ts.Close() + tc.lo.HTTPPort = port + } + + client := &Standalone{ + process: &mockDaprProcess{ + Lo: []ListOutput{tc.lo}, + Err: tc.listErr, + }, + } + res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "PUT", socket) + if tc.errorExpected { + assert.Error(t, err, "expected an error") + assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") + } else { + assert.NoError(t, err, "expected no error") + assert.Equal(t, tc.resp, res, "expected response to match") + } + }) + } } } diff --git a/pkg/standalone/list.go b/pkg/standalone/list.go index 27762859..bafb1551 100644 --- a/pkg/standalone/list.go +++ b/pkg/standalone/list.go @@ -107,7 +107,8 @@ func List() ([]ListOutput, error) { appID := argumentsMap["--app-id"] appCmd := "" cliPIDString := "" - appMetadata, err := metadata.Get(httpPort) + socket := argumentsMap["--unix-domain-socket"] + appMetadata, err := metadata.Get(httpPort, appID, socket) if err == nil { appCmd = appMetadata.Extended["appCommand"] cliPIDString = appMetadata.Extended["cliPID"] diff --git a/pkg/standalone/publish.go b/pkg/standalone/publish.go index 73832a3a..eb27b0ad 100644 --- a/pkg/standalone/publish.go +++ b/pkg/standalone/publish.go @@ -7,15 +7,18 @@ package standalone import ( "bytes" + "context" "errors" "fmt" + "net" "net/http" "github.com/dapr/cli/pkg/api" + "github.com/dapr/cli/utils" ) // Publish publishes payload to topic in pubsub referenced by pubsubName. -func (s *Standalone) Publish(publishAppID, pubsubName, topic string, payload []byte) error { +func (s *Standalone) Publish(publishAppID, pubsubName, topic string, payload []byte, socket string) error { if publishAppID == "" { return errors.New("publishAppID is missing") } @@ -33,14 +36,25 @@ func (s *Standalone) Publish(publishAppID, pubsubName, topic string, payload []b return err } - daprHTTPPort, err := getDaprHTTPPort(l, publishAppID) + instance, err := getDaprInstance(l, publishAppID) if err != nil { return err } - url := fmt.Sprintf("http://localhost:%s/v%s/publish/%s/%s", fmt.Sprintf("%v", daprHTTPPort), api.RuntimeAPIVersion, pubsubName, topic) - // nolint: gosec - r, err := http.Post(url, "application/json", bytes.NewBuffer(payload)) + url := fmt.Sprintf("http://unix/v%s/publish/%s/%s", api.RuntimeAPIVersion, pubsubName, topic) + + var httpc http.Client + if socket != "" { + httpc.Transport = &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", utils.GetSocket(socket, publishAppID, "http")) + }, + } + } else { + url = fmt.Sprintf("http://localhost:%s/v%s/publish/%s/%s", fmt.Sprintf("%v", instance.HTTPPort), api.RuntimeAPIVersion, pubsubName, topic) + } + + r, err := httpc.Post(url, "application/json", bytes.NewBuffer(payload)) if err != nil { return err } @@ -52,11 +66,11 @@ func (s *Standalone) Publish(publishAppID, pubsubName, topic string, payload []b return nil } -func getDaprHTTPPort(list []ListOutput, publishAppID string) (int, error) { +func getDaprInstance(list []ListOutput, publishAppID string) (ListOutput, error) { for i := 0; i < len(list); i++ { if list[i].AppID == publishAppID { - return list[i].HTTPPort, nil + return list[i], nil } } - return 0, errors.New("couldn't find a running Dapr instance") + return ListOutput{}, errors.New("couldn't find a running Dapr instance") } diff --git a/pkg/standalone/publish_test.go b/pkg/standalone/publish_test.go index ff1a26ee..95c5fbad 100644 --- a/pkg/standalone/publish_test.go +++ b/pkg/standalone/publish_test.go @@ -6,8 +6,11 @@ package standalone import ( + "os" + "runtime" "testing" + "github.com/dapr/cli/utils" "github.com/stretchr/testify/assert" ) @@ -27,7 +30,7 @@ func TestPublish(t *testing.T) { errString string }{ { - name: "test empty topic", + name: "test empty appID", publishAppID: "", payload: []byte("test"), pubsubName: "test", @@ -74,7 +77,7 @@ func TestPublish(t *testing.T) { errorExpected: true, }, { - name: "successful call", + name: "successful call not found", publishAppID: "myAppID", pubsubName: "testPubsubName", topic: "testTopic", @@ -98,25 +101,43 @@ func TestPublish(t *testing.T) { }, }, } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ts, port := getTestServer(tc.expectedPath, tc.resp) - ts.Start() - defer ts.Close() - tc.lo.HTTPPort = port - client := &Standalone{ - process: &mockDaprProcess{ - Lo: []ListOutput{tc.lo}, - Err: tc.listErr, - }, - } - err := client.Publish(tc.publishAppID, tc.pubsubName, tc.topic, tc.payload) - if tc.errorExpected { - assert.Error(t, err, "expected an error") - assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") - } else { - assert.NoError(t, err, "expected no error") - } - }) + for _, socket := range []string{"", "/tmp"} { + // TODO(@daixiang0): add Windows support + if runtime.GOOS == "windows" && socket != "" { + continue + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if socket != "" { + ts, l := getTestSocketServer(tc.expectedPath, tc.resp, tc.publishAppID, socket) + go ts.Serve(l) + defer func() { + l.Close() + for _, protocol := range []string{"http", "grpc"} { + os.Remove(utils.GetSocket(socket, tc.publishAppID, protocol)) + } + }() + } else { + ts, port := getTestServer(tc.expectedPath, tc.resp) + ts.Start() + defer ts.Close() + tc.lo.HTTPPort = port + } + + client := &Standalone{ + process: &mockDaprProcess{ + Lo: []ListOutput{tc.lo}, + Err: tc.listErr, + }, + } + err := client.Publish(tc.publishAppID, tc.pubsubName, tc.topic, tc.payload, socket) + if tc.errorExpected { + assert.Error(t, err, "expected an error") + assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") + } else { + assert.NoError(t, err, "expected no error") + } + }) + } } } diff --git a/pkg/standalone/run.go b/pkg/standalone/run.go index 1d598f5c..a1e6f3fe 100644 --- a/pkg/standalone/run.go +++ b/pkg/standalone/run.go @@ -41,6 +41,7 @@ type RunConfig struct { AppSSL bool MetricsPort int MaxRequestBodySize int + UnixDomainSocket string } // RunOutput represents the run output. @@ -52,7 +53,8 @@ type RunOutput struct { AppCMD *exec.Cmd } -func getDaprCommand(appID string, daprHTTPPort int, daprGRPCPort int, appPort int, configFile, protocol string, enableProfiling bool, profilePort int, logLevel string, maxConcurrency int, placementHostAddr string, componentsPath string, appSSL bool, metricsPort int, requestBodySize int) (*exec.Cmd, int, int, int, error) { +func getDaprCommand(appID string, daprHTTPPort int, daprGRPCPort int, appPort int, configFile, protocol string, enableProfiling bool, + profilePort int, logLevel string, maxConcurrency int, placementHostAddr string, componentsPath string, appSSL bool, metricsPort int, requestBodySize int, unixDomainSocket string) (*exec.Cmd, int, int, int, error) { if daprHTTPPort < 0 { port, err := freeport.GetFreePort() if err != nil { @@ -144,6 +146,10 @@ func getDaprCommand(appID string, daprHTTPPort int, daprGRPCPort int, appPort in args = append(args, "--app-ssl") } + if unixDomainSocket != "" { + args = append(args, "--unix-domain-socket", unixDomainSocket) + } + cmd := exec.Command(daprCMD, args...) return cmd, daprHTTPPort, daprGRPCPort, metricsPort, nil } @@ -210,7 +216,7 @@ func Run(config *RunConfig) (*RunOutput, error) { return nil, err } - daprCMD, daprHTTPPort, daprGRPCPort, metricsPort, err := getDaprCommand(appID, config.HTTPPort, config.GRPCPort, config.AppPort, config.ConfigFile, config.Protocol, config.EnableProfiling, config.ProfilePort, config.LogLevel, config.MaxConcurrency, config.PlacementHostAddr, config.ComponentsPath, config.AppSSL, config.MetricsPort, config.MaxRequestBodySize) + daprCMD, daprHTTPPort, daprGRPCPort, metricsPort, err := getDaprCommand(appID, config.HTTPPort, config.GRPCPort, config.AppPort, config.ConfigFile, config.Protocol, config.EnableProfiling, config.ProfilePort, config.LogLevel, config.MaxConcurrency, config.PlacementHostAddr, config.ComponentsPath, config.AppSSL, config.MetricsPort, config.MaxRequestBodySize, config.UnixDomainSocket) if err != nil { return nil, err } diff --git a/pkg/standalone/testutils.go b/pkg/standalone/testutils.go index 06be42a3..b375c1a4 100644 --- a/pkg/standalone/testutils.go +++ b/pkg/standalone/testutils.go @@ -7,11 +7,16 @@ package standalone import ( "bytes" + "fmt" "net" "net/http" "net/http/httptest" + + "github.com/dapr/cli/utils" ) +const SocketFormat = "/tmp/dapr-%s-http.socket" + type mockDaprProcess struct { Lo []ListOutput Err error @@ -40,3 +45,29 @@ func getTestServer(expectedPath, resp string) (*httptest.Server, int) { return ts, ts.Listener.Addr().(*net.TCPAddr).Port } + +func getTestSocketServer(expectedPath, resp, appID, path string) (*http.Server, net.Listener) { + s := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if expectedPath != "" && r.RequestURI != expectedPath { + w.WriteHeader(http.StatusInternalServerError) + + return + } + if r.Method == http.MethodGet { + w.Write([]byte(resp)) + } else { + buf := new(bytes.Buffer) + buf.ReadFrom(r.Body) + w.Write(buf.Bytes()) + } + }), + } + + socket := utils.GetSocket(path, appID, "http") + l, err := net.Listen("unix", socket) + if err != nil { + panic(fmt.Sprintf("httptest: failed to listen on %v: %v", socket, err)) + } + return s, l +} diff --git a/tests/e2e/standalone/standalone_test.go b/tests/e2e/standalone/standalone_test.go index 0012e926..e6c626e0 100644 --- a/tests/e2e/standalone/standalone_test.go +++ b/tests/e2e/standalone/standalone_test.go @@ -1,3 +1,4 @@ +//go:build e2e // +build e2e // ------------------------------------------------------------ @@ -36,6 +37,8 @@ const ( daprDashboardVersion = "0.8.0" ) +var socketCases = []string{"", "/tmp"} + func TestStandaloneInstall(t *testing.T) { // Ensure a clean environment uninstall() @@ -346,23 +349,26 @@ func testInstall(t *testing.T) { func testRun(t *testing.T) { daprPath := getDaprPath() - t.Run("Normal exit", func(t *testing.T) { - output, err := spawn.Command(daprPath, "run", "--", "bash", "-c", "echo test") - t.Log(output) - require.NoError(t, err, "run failed") - assert.Contains(t, output, "Exited App successfully") - assert.Contains(t, output, "Exited Dapr successfully") - }) + for _, path := range socketCases { + t.Run(fmt.Sprintf("Normal exit, socket: %s", path), func(t *testing.T) { + output, err := spawn.Command(daprPath, "run", "--unix-domain-socket", path, "--", "bash", "-c", "echo test") + t.Log(output) + require.NoError(t, err, "run failed") + assert.Contains(t, output, "Exited App successfully") + assert.Contains(t, output, "Exited Dapr successfully") + }) - t.Run("Error exit", func(t *testing.T) { - output, err := spawn.Command(daprPath, "run", "--", "bash", "-c", "exit 1") - t.Log(output) - require.NoError(t, err, "run failed") - assert.Contains(t, output, "The App process exited with error code: exit status 1") - assert.Contains(t, output, "Exited Dapr successfully") - }) + t.Run(fmt.Sprintf("Error exit, socket: %s", path), func(t *testing.T) { + output, err := spawn.Command(daprPath, "run", "--unix-domain-socket", path, "--", "bash", "-c", "exit 1") + t.Log(output) + require.NoError(t, err, "run failed") + assert.Contains(t, output, "The App process exited with error code: exit status 1") + assert.Contains(t, output, "Exited Dapr successfully") + }) - t.Run("API shutdown", func(t *testing.T) { + } + + t.Run("API shutdown without socket", func(t *testing.T) { // Test that the CLI exits on a daprd shutdown. output, err := spawn.Command(daprPath, "run", "--dapr-http-port", "9999", "--", "bash", "-c", "curl -v -X POST http://localhost:9999/v1.0/shutdown; sleep 10; exit 1") t.Log(output) @@ -371,6 +377,14 @@ func testRun(t *testing.T) { assert.Contains(t, output, "Exited Dapr successfully") }) + t.Run("API shutdown with socket", func(t *testing.T) { + // Test that the CLI exits on a daprd shutdown. + output, err := spawn.Command(daprPath, "run", "--app-id", "testapp", "--unix-domain-socket", "/tmp", "--", "bash", "-c", "curl --unix-socket /tmp/dapr-testapp-http.socket -v -X POST http://unix/v1.0/shutdown; sleep 10; exit 1") + t.Log(output) + require.NoError(t, err, "run failed") + assert.Contains(t, output, "Exited App successfully", "App should be shutdown before it has a chance to return non-zero") + assert.Contains(t, output, "Exited Dapr successfully") + }) } func executeAgainstRunningDapr(t *testing.T, f func(), daprArgs ...string) { @@ -468,48 +482,49 @@ func testPublish(t *testing.T) { }() daprPath := getDaprPath() - executeAgainstRunningDapr(t, func() { - t.Run("publish from file", func(t *testing.T) { - output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--pubsub", "pubsub", "--topic", "sample", "--data-file", "../testdata/message.json") + for _, path := range socketCases { + executeAgainstRunningDapr(t, func() { + t.Run(fmt.Sprintf("publish from file with socket %s", path), func(t *testing.T) { + output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data-file", "../testdata/message.json") + t.Log(output) + assert.NoError(t, err, "unable to publish from --data-file") + assert.Contains(t, output, "Event published successfully") + + event := <-events + assert.Equal(t, map[string]interface{}{"dapr": "is_great"}, event.Data) + }) + + t.Run(fmt.Sprintf("publish from string with socket %s", path), func(t *testing.T) { + output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data", "{\"cli\": \"is_working\"}") + t.Log(output) + assert.NoError(t, err, "unable to publish from --data") + assert.Contains(t, output, "Event published successfully") + + event := <-events + assert.Equal(t, map[string]interface{}{"cli": "is_working"}, event.Data) + }) + + t.Run(fmt.Sprintf("publish from non-existent file fails with socket %s", path), func(t *testing.T) { + output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data-file", "a/file/that/does/not/exist") + t.Log(output) + assert.Contains(t, output, "Error reading payload from 'a/file/that/does/not/exist'. Error: ") + assert.Error(t, err, "a non-existent --data-file should fail with error") + }) + + t.Run(fmt.Sprintf("publish only one of data and data-file with socket %s", path), func(t *testing.T) { + output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data-file", "../testdata/message.json", "--data", "{\"cli\": \"is_working\"}") + t.Log(output) + assert.Error(t, err, "--data and --data-file should not be allowed together") + assert.Contains(t, output, "Only one of --data and --data-file allowed in the same publish command") + + }) + + output, err := spawn.Command(getDaprPath(), "stop", "--app-id", "pub_e2e") t.Log(output) - assert.NoError(t, err, "unable to publish from --data-file") - assert.Contains(t, output, "Event published successfully") - - event := <-events - assert.Equal(t, map[string]interface{}{"dapr": "is_great"}, event.Data) - }) - - t.Run("publish from string", func(t *testing.T) { - output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--pubsub", "pubsub", "--topic", "sample", "--data", "{\"cli\": \"is_working\"}") - t.Log(output) - assert.NoError(t, err, "unable to publish from --data") - assert.Contains(t, output, "Event published successfully") - - event := <-events - assert.Equal(t, map[string]interface{}{"cli": "is_working"}, event.Data) - }) - - t.Run("publish from non-existent file fails", func(t *testing.T) { - output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--pubsub", "pubsub", "--topic", "sample", "--data-file", "a/file/that/does/not/exist") - t.Log(output) - assert.Contains(t, output, "Error reading payload from 'a/file/that/does/not/exist'. Error: ") - assert.Error(t, err, "a non-existent --data-file should fail with error") - }) - - t.Run("publish only one of data and data-file", func(t *testing.T) { - output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--pubsub", "pubsub", "--topic", "sample", "--data-file", "../testdata/message.json", "--data", "{\"cli\": \"is_working\"}") - t.Log(output) - assert.Error(t, err, "--data and --data-file should not be allowed together") - assert.Contains(t, output, "Only one of --data and --data-file allowed in the same publish command") - - }) - - output, err := spawn.Command(getDaprPath(), "stop", "--app-id", "pub_e2e") - t.Log(output) - require.NoError(t, err, "dapr stop failed") - assert.Contains(t, output, "app stopped successfully: pub_e2e") - }, "run", "--app-id", "pub_e2e", "--app-port", "9988") - + require.NoError(t, err, "dapr stop failed") + assert.Contains(t, output, "app stopped successfully: pub_e2e") + }, "run", "--app-id", "pub_e2e", "--app-port", "9988", "--unix-domain-socket", path) + } } func testInvoke(t *testing.T) { @@ -534,42 +549,45 @@ func testInvoke(t *testing.T) { }() daprPath := getDaprPath() - executeAgainstRunningDapr(t, func() { - t.Run("data from file", func(t *testing.T) { - output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--method", "test", "--data-file", "../testdata/message.json") - t.Log(output) - assert.NoError(t, err, "unable to invoke with --data-file") - assert.Contains(t, output, "App invoked successfully") - assert.Contains(t, output, "{\"dapr\": \"is_great\"}") - }) + for _, path := range socketCases { + executeAgainstRunningDapr(t, func() { + t.Run(fmt.Sprintf("data from file with socket %s", path), func(t *testing.T) { + output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--unix-domain-socket", path, "--method", "test", "--data-file", "../testdata/message.json") + t.Log(output) + assert.NoError(t, err, "unable to invoke with --data-file") + assert.Contains(t, output, "App invoked successfully") + assert.Contains(t, output, "{\"dapr\": \"is_great\"}") + }) - t.Run("data from string", func(t *testing.T) { - output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--method", "test", "--data", "{\"cli\": \"is_working\"}") - t.Log(output) - assert.NoError(t, err, "unable to invoke with --data") - assert.Contains(t, output, "{\"cli\": \"is_working\"}") - assert.Contains(t, output, "App invoked successfully") - }) + t.Run(fmt.Sprintf("data from string with socket %s", path), func(t *testing.T) { + output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--unix-domain-socket", path, "--method", "test", "--data", "{\"cli\": \"is_working\"}") + t.Log(output) + assert.NoError(t, err, "unable to invoke with --data") + assert.Contains(t, output, "{\"cli\": \"is_working\"}") + assert.Contains(t, output, "App invoked successfully") + }) - t.Run("data from non-existent file fails", func(t *testing.T) { - output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--method", "test", "--data-file", "a/file/that/does/not/exist") - t.Log(output) - assert.Error(t, err, "a non-existent --data-file should fail with error") - assert.Contains(t, output, "Error reading payload from 'a/file/that/does/not/exist'. Error: ") - }) + t.Run(fmt.Sprintf("data from non-existent file fails with socket %s", path), func(t *testing.T) { + output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--unix-domain-socket", path, "--method", "test", "--data-file", "a/file/that/does/not/exist") + t.Log(output) + assert.Error(t, err, "a non-existent --data-file should fail with error") + assert.Contains(t, output, "Error reading payload from 'a/file/that/does/not/exist'. Error: ") + }) - t.Run("invoke only one of data and data-file", func(t *testing.T) { - output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--method", "test", "--data-file", "../testdata/message.json", "--data", "{\"cli\": \"is_working\"}") - t.Log(output) - assert.Error(t, err, "--data and --data-file should not be allowed together") - assert.Contains(t, output, "Only one of --data and --data-file allowed in the same invoke command") - }) + t.Run(fmt.Sprintf("invoke only one of data and data-file with socket %s", path), func(t *testing.T) { + output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--unix-domain-socket", path, "--method", "test", "--data-file", "../testdata/message.json", "--data", "{\"cli\": \"is_working\"}") + t.Log(output) + assert.Error(t, err, "--data and --data-file should not be allowed together") + assert.Contains(t, output, "Only one of --data and --data-file allowed in the same invoke command") + }) - output, err := spawn.Command(getDaprPath(), "stop", "--app-id", "invoke_e2e") - t.Log(output) - require.NoError(t, err, "dapr stop failed") - assert.Contains(t, output, "app stopped successfully: invoke_e2e") - }, "run", "--app-id", "invoke_e2e", "--app-port", "9987") + output, err := spawn.Command(getDaprPath(), "stop", "--app-id", "invoke_e2e") + t.Log(output) + require.NoError(t, err, "dapr stop failed") + assert.Contains(t, output, "app stopped successfully: invoke_e2e") + }, "run", "--app-id", "invoke_e2e", "--app-port", "9987", "--unix-domain-socket", path) + + } } diff --git a/utils/utils.go b/utils/utils.go index 207aea74..43d72f5c 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -26,6 +26,10 @@ import ( "gopkg.in/yaml.v2" ) +const ( + socketFormat = "%s/dapr-%s-%s.socket" +) + // PrintTable to print in the table format. func PrintTable(csvContent string) { WriteTable(os.Stdout, csvContent) @@ -152,6 +156,24 @@ func IsDaprListeningOnPort(port int, timeout time.Duration) error { } } +func IsDaprListeningOnSocket(socket string, timeout time.Duration) error { + start := time.Now() + for { + conn, err := net.DialTimeout("unix", socket, timeout) + if err == nil { + conn.Close() + return nil + } + + if time.Since(start).Seconds() >= timeout.Seconds() { + // Give up. + return err + } + + time.Sleep(time.Second) + } +} + func MarshalAndWriteTable(writer io.Writer, in interface{}) error { table, err := gocsv.MarshalString(in) if err != nil { @@ -195,3 +217,7 @@ func IsAddressLegal(address string) bool { } return isLegal } + +func GetSocket(path, appID, protocol string) string { + return fmt.Sprintf(socketFormat, path, appID, protocol) +}