runtime: support unix domain socket (#788)

* runtime: support unix domain socket

Signed-off-by: Long <long.dai@intel.com>

* feedback

Signed-off-by: Long <long.dai@intel.com>

* update example

Signed-off-by: Long <long.dai@intel.com>
This commit is contained in:
Long Dai 2021-10-28 04:45:34 +08:00 committed by GitHub
parent 892bc96599
commit 4d75238be8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 546 additions and 231 deletions

View File

@ -523,6 +523,22 @@ To generate shell completion scripts:
$ dapr completion
```
### Enable Unix domain socket
In order to enable Unix domain socket to connect Dapr API server, use the `--unix-domain-socket` flag:
```
$ dapr run --app-id nodeapp --unix-domain-socket node app.js
```
Dapr will automatically create a Unix domain socket to connect Dapr API server.
If you want to invoke your app, also use this flag:
```
$ dapr invoke --app-id nodeapp --unix-domain-socket --method mymethod
```
For more details, please run the command and check the examples to apply to your shell.
## Reference for the Dapr CLI

View File

@ -10,6 +10,7 @@ import (
"io/ioutil"
"net/http"
"os"
"runtime"
"github.com/dapr/cli/pkg/print"
"github.com/dapr/cli/pkg/standalone"
@ -24,6 +25,7 @@ var (
invokeData string
invokeVerb string
invokeDataFile string
invokeSocket string
)
var InvokeCmd = &cobra.Command{
@ -35,6 +37,9 @@ dapr invoke --app-id target --method sample --data '{"key":"value"}
# Invoke a sample method on target app with GET Verb
dapr invoke --app-id target --method sample --verb GET
# Invoke a sample method on target app with GET Verb using Unix domain socket
dapr invoke --unix-domain-socket --app-id target --method sample --verb GET
`,
Run: func(cmd *cobra.Command, args []string) {
bytePayload := []byte{}
@ -54,7 +59,14 @@ dapr invoke --app-id target --method sample --verb GET
bytePayload = []byte(invokeData)
}
client := standalone.NewClient()
response, err := client.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb)
// TODO(@daixiang0): add Windows support
if runtime.GOOS == "windows" && invokeSocket != "" {
print.FailureStatusEvent(os.Stderr, "unix-domain-socket option still does not support Windows!")
os.Exit(1)
}
response, err := client.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb, invokeSocket)
if err != nil {
err = fmt.Errorf("error invoking app %s: %s", invokeAppID, err)
print.FailureStatusEvent(os.Stderr, err.Error())
@ -75,6 +87,7 @@ func init() {
InvokeCmd.Flags().StringVarP(&invokeVerb, "verb", "v", defaultHTTPVerb, "The HTTP verb to use")
InvokeCmd.Flags().StringVarP(&invokeDataFile, "data-file", "f", "", "A file containing the JSON serialized data (optional)")
InvokeCmd.Flags().BoolP("help", "h", false, "Print this help message")
InvokeCmd.Flags().StringVarP(&invokeSocket, "unix-domain-socket", "u", "", "Path to a unix domain socket dir. If specified, Dapr API servers will use Unix Domain Sockets")
InvokeCmd.MarkFlagRequired("app-id")
InvokeCmd.MarkFlagRequired("method")
RootCmd.AddCommand(InvokeCmd)

View File

@ -9,6 +9,7 @@ import (
"fmt"
"io/ioutil"
"os"
"runtime"
"github.com/dapr/cli/pkg/print"
"github.com/dapr/cli/pkg/standalone"
@ -21,6 +22,7 @@ var (
publishTopic string
publishPayload string
publishPayloadFile string
publishSocket string
)
var PublishCmd = &cobra.Command{
@ -29,6 +31,9 @@ var PublishCmd = &cobra.Command{
Example: `
# Publish to sample topic in target pubsub via a publishing app
dapr publish --publish-app-id myapp --pubsub target --topic sample --data '{"key":"value"}'
# Publish to sample topic in target pubsub via a publishing app using Unix domain socket
dapr publish --enable-domain-socket --publish-app-id myapp --pubsub target --topic sample --data '{"key":"value"}'
`,
Run: func(cmd *cobra.Command, args []string) {
bytePayload := []byte{}
@ -49,7 +54,12 @@ dapr publish --publish-app-id myapp --pubsub target --topic sample --data '{"key
}
client := standalone.NewClient()
err = client.Publish(publishAppID, pubsubName, publishTopic, bytePayload)
// TODO(@daixiang0): add Windows support
if runtime.GOOS == "windows" && publishSocket != "" {
print.FailureStatusEvent(os.Stderr, "unix-domain-socket option still does not support Windows!")
os.Exit(1)
}
err = client.Publish(publishAppID, pubsubName, publishTopic, bytePayload, publishSocket)
if err != nil {
print.FailureStatusEvent(os.Stderr, fmt.Sprintf("Error publishing topic %s: %s", publishTopic, err))
os.Exit(1)
@ -65,6 +75,7 @@ func init() {
PublishCmd.Flags().StringVarP(&publishTopic, "topic", "t", "", "The topic to be published to")
PublishCmd.Flags().StringVarP(&publishPayload, "data", "d", "", "The JSON serialized data string (optional)")
PublishCmd.Flags().StringVarP(&publishPayloadFile, "data-file", "f", "", "A file containing the JSON serialized data (optional)")
PublishCmd.Flags().StringVarP(&publishSocket, "unix-domain-socket", "u", "", "Path to a unix domain socket dir. If specified, Dapr API servers will use Unix Domain Sockets")
PublishCmd.Flags().BoolP("help", "h", false, "Print this help message")
PublishCmd.MarkFlagRequired("publish-app-id")
PublishCmd.MarkFlagRequired("topic")

View File

@ -9,6 +9,7 @@ import (
"bufio"
"fmt"
"os"
"runtime"
"strconv"
"strings"
"time"
@ -36,6 +37,7 @@ var (
appSSL bool
metricsPort int
maxRequestBodySize int
unixDomainSocket string
)
const (
@ -66,6 +68,18 @@ var RunCmd = &cobra.Command{
fmt.Println(print.WhiteBold("WARNING: no application command found."))
}
if unixDomainSocket != "" {
// TODO(@daixiang0): add Windows support
if runtime.GOOS == "windows" {
print.FailureStatusEvent(os.Stderr, "unix-domain-socket option still does not support Windows!")
os.Exit(1)
} else {
// use unix domain socket means no port any more
port = 0
grpcPort = 0
}
}
output, err := standalone.Run(&standalone.RunConfig{
AppID: appID,
AppPort: appPort,
@ -83,6 +97,7 @@ var RunCmd = &cobra.Command{
AppSSL: appSSL,
MetricsPort: metricsPort,
MaxRequestBodySize: maxRequestBodySize,
UnixDomainSocket: unixDomainSocket,
})
if err != nil {
print.FailureStatusEvent(os.Stderr, err.Error())
@ -96,13 +111,21 @@ var RunCmd = &cobra.Command{
appRunning := make(chan bool, 1)
go func() {
print.InfoStatusEvent(
os.Stdout,
fmt.Sprintf(
var startInfo string
if unixDomainSocket != "" {
startInfo = fmt.Sprintf(
"Starting Dapr with id %s. HTTP Socket: %v. gRPC Socket: %v.",
output.AppID,
utils.GetSocket(unixDomainSocket, output.AppID, "http"),
utils.GetSocket(unixDomainSocket, output.AppID, "grpc"))
} else {
startInfo = fmt.Sprintf(
"Starting Dapr with id %s. HTTP Port: %v. gRPC Port: %v",
output.AppID,
output.DaprHTTPPort,
output.DaprGRPCPort))
output.DaprGRPCPort)
}
print.InfoStatusEvent(os.Stdout, startInfo)
output.DaprCMD.Stdout = os.Stdout
output.DaprCMD.Stderr = os.Stderr
@ -129,18 +152,38 @@ var RunCmd = &cobra.Command{
// If app does not listen to port, we can check for Dapr's sidecar health before starting the app.
// Otherwise, it creates a deadlock.
sidecarUp := true
print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on HTTP port %v", output.DaprHTTPPort)
err = utils.IsDaprListeningOnPort(output.DaprHTTPPort, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second)
if err != nil {
sidecarUp = false
print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on HTTP port: %s", err.Error())
}
print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on GRPC port %v", output.DaprGRPCPort)
err = utils.IsDaprListeningOnPort(output.DaprGRPCPort, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second)
if err != nil {
sidecarUp = false
print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on GRPC port: %s", err.Error())
if unixDomainSocket != "" {
httpSocket := utils.GetSocket(unixDomainSocket, output.AppID, "http")
print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on HTTP socket %v", httpSocket)
err = utils.IsDaprListeningOnSocket(httpSocket, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second)
if err != nil {
sidecarUp = false
print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on HTTP socket: %s", err.Error())
}
grpcSocket := utils.GetSocket(unixDomainSocket, output.AppID, "grpc")
print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on GRPC socket %v", grpcSocket)
err = utils.IsDaprListeningOnSocket(grpcSocket, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second)
if err != nil {
sidecarUp = false
print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on GRPC socket: %s", err.Error())
}
} else {
print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on HTTP port %v", output.DaprHTTPPort)
err = utils.IsDaprListeningOnPort(output.DaprHTTPPort, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second)
if err != nil {
sidecarUp = false
print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on HTTP port: %s", err.Error())
}
print.InfoStatusEvent(os.Stdout, "Checking if Dapr sidecar is listening on GRPC port %v", output.DaprGRPCPort)
err = utils.IsDaprListeningOnPort(output.DaprGRPCPort, time.Duration(runtimeWaitTimeoutInSeconds)*time.Second)
if err != nil {
sidecarUp = false
print.WarningStatusEvent(os.Stdout, "Dapr sidecar is not listening on GRPC port: %s", err.Error())
}
}
if sidecarUp {
@ -223,7 +266,7 @@ var RunCmd = &cobra.Command{
}
// Metadata API is only available if app has started listening to port, so wait for app to start before calling metadata API.
err = metadata.Put(output.DaprHTTPPort, "cliPID", strconv.Itoa(os.Getpid()))
err = metadata.Put(output.DaprHTTPPort, "cliPID", strconv.Itoa(os.Getpid()), appID, unixDomainSocket)
if err != nil {
print.WarningStatusEvent(os.Stdout, "Could not update sidecar metadata for cliPID: %s", err.Error())
}
@ -231,7 +274,7 @@ var RunCmd = &cobra.Command{
if output.AppCMD != nil {
appCommand := strings.Join(args, " ")
print.InfoStatusEvent(os.Stdout, fmt.Sprintf("Updating metadata for app command: %s", appCommand))
err = metadata.Put(output.DaprHTTPPort, "appCommand", appCommand)
err = metadata.Put(output.DaprHTTPPort, "appCommand", appCommand, appID, unixDomainSocket)
if err != nil {
print.WarningStatusEvent(os.Stdout, "Could not update sidecar metadata for appCommand: %s", err.Error())
} else {
@ -261,6 +304,12 @@ var RunCmd = &cobra.Command{
print.SuccessStatusEvent(os.Stdout, "Exited App successfully")
}
}
if unixDomainSocket != "" {
for _, s := range []string{"http", "grpc"} {
os.Remove(utils.GetSocket(unixDomainSocket, output.AppID, s))
}
}
},
}
@ -281,6 +330,7 @@ func init() {
RunCmd.Flags().IntVarP(&metricsPort, "metrics-port", "M", -1, "The port of metrics on dapr")
RunCmd.Flags().BoolP("help", "h", false, "Print this help message")
RunCmd.Flags().IntVarP(&maxRequestBodySize, "dapr-http-max-request-size", "", -1, "Max size of request body in MB")
RunCmd.Flags().StringVarP(&unixDomainSocket, "unix-domain-socket", "u", "", "Path to a unix domain socket dir. If specified, Dapr API servers will use Unix Domain Sockets")
RootCmd.AddCommand(RunCmd)
}

View File

@ -6,21 +6,44 @@
package metadata
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"strings"
"github.com/dapr/cli/pkg/api"
"github.com/dapr/cli/utils"
retryablehttp "github.com/hashicorp/go-retryablehttp"
)
// Get retrieves the metadata of a given app's sidecar.
func Get(httpPort int) (*api.Metadata, error) {
func Get(httpPort int, appID, socket string) (*api.Metadata, error) {
url := makeMetadataGetEndpoint(httpPort)
// nolint:gosec
r, err := http.Get(url)
var httpc http.Client
if socket != "" {
fileInfo, err := os.Stat(socket)
if err != nil {
return nil, err
}
if fileInfo.IsDir() {
socket = utils.GetSocket(socket, appID, "http")
}
httpc.Transport = &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", socket)
},
}
}
r, err := httpc.Get(url)
if err != nil {
return nil, err
}
@ -30,9 +53,18 @@ func Get(httpPort int) (*api.Metadata, error) {
}
// Put sets one metadata attribute on a given app's sidecar.
func Put(httpPort int, key, value string) error {
func Put(httpPort int, key, value, appID, socket string) error {
client := retryablehttp.NewClient()
client.Logger = nil
if socket != "" {
client.HTTPClient.Transport = &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", utils.GetSocket(socket, appID, "http"))
},
}
}
url := makeMetadataPutEndpoint(httpPort, key)
req, err := retryablehttp.NewRequest("PUT", url, strings.NewReader(value))
@ -50,10 +82,16 @@ func Put(httpPort int, key, value string) error {
}
func makeMetadataGetEndpoint(httpPort int) string {
if httpPort == 0 {
return fmt.Sprintf("http://unix/v%s/metadata", api.RuntimeAPIVersion)
}
return fmt.Sprintf("http://127.0.0.1:%v/v%s/metadata", httpPort, api.RuntimeAPIVersion)
}
func makeMetadataPutEndpoint(httpPort int, key string) string {
if httpPort == 0 {
return fmt.Sprintf("http://unix/v%s/metadata/%s", api.RuntimeAPIVersion, key)
}
return fmt.Sprintf("http://127.0.0.1:%v/v%s/metadata/%s", httpPort, api.RuntimeAPIVersion, key)
}

View File

@ -15,9 +15,9 @@ type daprProcess struct {
// Client is the interface the wraps all the methods exposed by the Dapr CLI.
type Client interface {
// Invoke is a command to invoke a remote or local dapr instance
Invoke(appID, method string, data []byte, verb string) (string, error)
Invoke(appID, method string, data []byte, verb string, socket string) (string, error)
// Publish is used to publish event to a topic in a pubsub for an app ID.
Publish(publishAppID, pubsubName, topic string, payload []byte) error
Publish(publishAppID, pubsubName, topic string, payload []byte, socket string) error
}
type Standalone struct {

View File

@ -18,7 +18,7 @@ func NewDashboardCmd(port int) *exec.Cmd {
// Use the default binary install location
dashboardPath := defaultDaprBinPath()
binaryName := "dashboard"
if runtime.GOOS == "windows" {
if runtime.GOOS == daprWindowsOS {
binaryName = "dashboard.exe"
}

View File

@ -7,15 +7,18 @@ package standalone
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net"
"net/http"
"github.com/dapr/cli/pkg/api"
"github.com/dapr/cli/utils"
)
// Invoke is a command to invoke a remote or local dapr instance.
func (s *Standalone) Invoke(appID, method string, data []byte, verb string) (string, error) {
func (s *Standalone) Invoke(appID, method string, data []byte, verb string, path string) (string, error) {
list, err := s.process.List()
if err != nil {
return "", err
@ -30,7 +33,17 @@ func (s *Standalone) Invoke(appID, method string, data []byte, verb string) (str
}
req.Header.Set("Content-Type", "application/json")
r, err := http.DefaultClient.Do(req)
var httpc http.Client
if path != "" {
httpc.Transport = &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", utils.GetSocket(path, appID, "http"))
},
}
}
r, err := httpc.Do(req)
if err != nil {
return "", err
}

View File

@ -6,8 +6,12 @@
package standalone
import (
"fmt"
"os"
"runtime"
"testing"
"github.com/dapr/cli/utils"
"github.com/stretchr/testify/assert"
)
@ -60,91 +64,144 @@ func TestInvoke(t *testing.T) {
},
}
for _, tc := range testCases {
t.Run(tc.name+" get", func(t *testing.T) {
ts, port := getTestServer(tc.expectedPath, tc.resp)
ts.Start()
defer ts.Close()
tc.lo.HTTPPort = port
client := &Standalone{
process: &mockDaprProcess{
Lo: []ListOutput{
tc.lo,
for _, socket := range []string{"", "/tmp"} {
// TODO(@daixiang0): add Windows support
if runtime.GOOS == "windows" && socket != "" {
continue
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("%s get, socket: %v", tc.name, socket), func(t *testing.T) {
if socket != "" {
ts, l := getTestSocketServer(tc.expectedPath, tc.resp, tc.appID, socket)
go ts.Serve(l)
defer func() {
l.Close()
for _, protocol := range []string{"http", "grpc"} {
os.Remove(utils.GetSocket(socket, tc.appID, protocol))
}
}()
} else {
ts, port := getTestServer(tc.expectedPath, tc.resp)
ts.Start()
defer ts.Close()
tc.lo.HTTPPort = port
}
client := &Standalone{
process: &mockDaprProcess{
Lo: []ListOutput{
tc.lo,
},
Err: tc.listErr,
},
Err: tc.listErr,
},
}
res, err := client.Invoke(tc.appID, tc.method, []byte{}, "GET")
if tc.errorExpected {
assert.Error(t, err, "expected an error")
assert.Equal(t, tc.errString, err.Error(), "expected error strings to match")
} else {
assert.NoError(t, err, "expected no error")
assert.Equal(t, tc.resp, res, "expected response to match")
}
})
}
t.Run(tc.name+" post", func(t *testing.T) {
ts, port := getTestServer(tc.expectedPath, tc.resp)
ts.Start()
defer ts.Close()
tc.lo.HTTPPort = port
client := &Standalone{
process: &mockDaprProcess{
Lo: []ListOutput{tc.lo},
Err: tc.listErr,
},
}
res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "POST")
if tc.errorExpected {
assert.Error(t, err, "expected an error")
assert.Equal(t, tc.errString, err.Error(), "expected error strings to match")
} else {
assert.NoError(t, err, "expected no error")
assert.Equal(t, tc.resp, res, "expected response to match")
}
})
res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "GET", socket)
if tc.errorExpected {
assert.Error(t, err, "expected an error")
assert.Equal(t, tc.errString, err.Error(), "expected error strings to match")
} else {
assert.NoError(t, err, "expected no error")
assert.Equal(t, tc.resp, res, "expected response to match")
}
})
t.Run(tc.name+" delete", func(t *testing.T) {
ts, port := getTestServer(tc.expectedPath, tc.resp)
ts.Start()
defer ts.Close()
tc.lo.HTTPPort = port
client := &Standalone{
process: &mockDaprProcess{
Lo: []ListOutput{tc.lo},
Err: tc.listErr,
},
}
res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "DELETE")
if tc.errorExpected {
assert.Error(t, err, "expected an error")
assert.Equal(t, tc.errString, err.Error(), "expected error strings to match")
} else {
assert.NoError(t, err, "expected no error")
assert.Equal(t, tc.resp, res, "expected response to match")
}
})
t.Run(fmt.Sprintf("%s post, socket: %v", tc.name, socket), func(t *testing.T) {
if socket != "" {
ts, l := getTestSocketServer(tc.expectedPath, tc.resp, tc.appID, socket)
go ts.Serve(l)
defer func() {
l.Close()
for _, protocol := range []string{"http", "grpc"} {
os.Remove(utils.GetSocket(socket, tc.appID, protocol))
}
}()
} else {
ts, port := getTestServer(tc.expectedPath, tc.resp)
ts.Start()
defer ts.Close()
tc.lo.HTTPPort = port
}
client := &Standalone{
process: &mockDaprProcess{
Lo: []ListOutput{tc.lo},
Err: tc.listErr,
},
}
res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "POST", socket)
if tc.errorExpected {
assert.Error(t, err, "expected an error")
assert.Equal(t, tc.errString, err.Error(), "expected error strings to match")
} else {
assert.NoError(t, err, "expected no error")
assert.Equal(t, tc.resp, res, "expected response to match")
}
})
t.Run(tc.name+" put", func(t *testing.T) {
ts, port := getTestServer(tc.expectedPath, tc.resp)
ts.Start()
defer ts.Close()
tc.lo.HTTPPort = port
client := &Standalone{
process: &mockDaprProcess{
Lo: []ListOutput{tc.lo},
Err: tc.listErr,
},
}
res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "PUT")
if tc.errorExpected {
assert.Error(t, err, "expected an error")
assert.Equal(t, tc.errString, err.Error(), "expected error strings to match")
} else {
assert.NoError(t, err, "expected no error")
assert.Equal(t, tc.resp, res, "expected response to match")
}
})
t.Run(fmt.Sprintf("%s delete, socket: %v", tc.name, socket), func(t *testing.T) {
if socket != "" {
ts, l := getTestSocketServer(tc.expectedPath, tc.resp, tc.appID, socket)
go ts.Serve(l)
defer func() {
l.Close()
for _, protocol := range []string{"http", "grpc"} {
os.Remove(utils.GetSocket(socket, tc.appID, protocol))
}
}()
} else {
ts, port := getTestServer(tc.expectedPath, tc.resp)
ts.Start()
defer ts.Close()
tc.lo.HTTPPort = port
}
client := &Standalone{
process: &mockDaprProcess{
Lo: []ListOutput{tc.lo},
Err: tc.listErr,
},
}
res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "DELETE", socket)
if tc.errorExpected {
assert.Error(t, err, "expected an error")
assert.Equal(t, tc.errString, err.Error(), "expected error strings to match")
} else {
assert.NoError(t, err, "expected no error")
assert.Equal(t, tc.resp, res, "expected response to match")
}
})
t.Run(fmt.Sprintf("%s put, socket: %v", tc.name, socket), func(t *testing.T) {
if socket != "" {
ts, l := getTestSocketServer(tc.expectedPath, tc.resp, tc.appID, socket)
go ts.Serve(l)
defer func() {
l.Close()
for _, protocol := range []string{"http", "grpc"} {
os.Remove(utils.GetSocket(socket, tc.appID, protocol))
}
}()
} else {
ts, port := getTestServer(tc.expectedPath, tc.resp)
ts.Start()
defer ts.Close()
tc.lo.HTTPPort = port
}
client := &Standalone{
process: &mockDaprProcess{
Lo: []ListOutput{tc.lo},
Err: tc.listErr,
},
}
res, err := client.Invoke(tc.appID, tc.method, []byte(tc.resp), "PUT", socket)
if tc.errorExpected {
assert.Error(t, err, "expected an error")
assert.Equal(t, tc.errString, err.Error(), "expected error strings to match")
} else {
assert.NoError(t, err, "expected no error")
assert.Equal(t, tc.resp, res, "expected response to match")
}
})
}
}
}

View File

@ -107,7 +107,8 @@ func List() ([]ListOutput, error) {
appID := argumentsMap["--app-id"]
appCmd := ""
cliPIDString := ""
appMetadata, err := metadata.Get(httpPort)
socket := argumentsMap["--unix-domain-socket"]
appMetadata, err := metadata.Get(httpPort, appID, socket)
if err == nil {
appCmd = appMetadata.Extended["appCommand"]
cliPIDString = appMetadata.Extended["cliPID"]

View File

@ -7,15 +7,18 @@ package standalone
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"net/http"
"github.com/dapr/cli/pkg/api"
"github.com/dapr/cli/utils"
)
// Publish publishes payload to topic in pubsub referenced by pubsubName.
func (s *Standalone) Publish(publishAppID, pubsubName, topic string, payload []byte) error {
func (s *Standalone) Publish(publishAppID, pubsubName, topic string, payload []byte, socket string) error {
if publishAppID == "" {
return errors.New("publishAppID is missing")
}
@ -33,14 +36,25 @@ func (s *Standalone) Publish(publishAppID, pubsubName, topic string, payload []b
return err
}
daprHTTPPort, err := getDaprHTTPPort(l, publishAppID)
instance, err := getDaprInstance(l, publishAppID)
if err != nil {
return err
}
url := fmt.Sprintf("http://localhost:%s/v%s/publish/%s/%s", fmt.Sprintf("%v", daprHTTPPort), api.RuntimeAPIVersion, pubsubName, topic)
// nolint: gosec
r, err := http.Post(url, "application/json", bytes.NewBuffer(payload))
url := fmt.Sprintf("http://unix/v%s/publish/%s/%s", api.RuntimeAPIVersion, pubsubName, topic)
var httpc http.Client
if socket != "" {
httpc.Transport = &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", utils.GetSocket(socket, publishAppID, "http"))
},
}
} else {
url = fmt.Sprintf("http://localhost:%s/v%s/publish/%s/%s", fmt.Sprintf("%v", instance.HTTPPort), api.RuntimeAPIVersion, pubsubName, topic)
}
r, err := httpc.Post(url, "application/json", bytes.NewBuffer(payload))
if err != nil {
return err
}
@ -52,11 +66,11 @@ func (s *Standalone) Publish(publishAppID, pubsubName, topic string, payload []b
return nil
}
func getDaprHTTPPort(list []ListOutput, publishAppID string) (int, error) {
func getDaprInstance(list []ListOutput, publishAppID string) (ListOutput, error) {
for i := 0; i < len(list); i++ {
if list[i].AppID == publishAppID {
return list[i].HTTPPort, nil
return list[i], nil
}
}
return 0, errors.New("couldn't find a running Dapr instance")
return ListOutput{}, errors.New("couldn't find a running Dapr instance")
}

View File

@ -6,8 +6,11 @@
package standalone
import (
"os"
"runtime"
"testing"
"github.com/dapr/cli/utils"
"github.com/stretchr/testify/assert"
)
@ -27,7 +30,7 @@ func TestPublish(t *testing.T) {
errString string
}{
{
name: "test empty topic",
name: "test empty appID",
publishAppID: "",
payload: []byte("test"),
pubsubName: "test",
@ -74,7 +77,7 @@ func TestPublish(t *testing.T) {
errorExpected: true,
},
{
name: "successful call",
name: "successful call not found",
publishAppID: "myAppID",
pubsubName: "testPubsubName",
topic: "testTopic",
@ -98,25 +101,43 @@ func TestPublish(t *testing.T) {
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ts, port := getTestServer(tc.expectedPath, tc.resp)
ts.Start()
defer ts.Close()
tc.lo.HTTPPort = port
client := &Standalone{
process: &mockDaprProcess{
Lo: []ListOutput{tc.lo},
Err: tc.listErr,
},
}
err := client.Publish(tc.publishAppID, tc.pubsubName, tc.topic, tc.payload)
if tc.errorExpected {
assert.Error(t, err, "expected an error")
assert.Equal(t, tc.errString, err.Error(), "expected error strings to match")
} else {
assert.NoError(t, err, "expected no error")
}
})
for _, socket := range []string{"", "/tmp"} {
// TODO(@daixiang0): add Windows support
if runtime.GOOS == "windows" && socket != "" {
continue
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if socket != "" {
ts, l := getTestSocketServer(tc.expectedPath, tc.resp, tc.publishAppID, socket)
go ts.Serve(l)
defer func() {
l.Close()
for _, protocol := range []string{"http", "grpc"} {
os.Remove(utils.GetSocket(socket, tc.publishAppID, protocol))
}
}()
} else {
ts, port := getTestServer(tc.expectedPath, tc.resp)
ts.Start()
defer ts.Close()
tc.lo.HTTPPort = port
}
client := &Standalone{
process: &mockDaprProcess{
Lo: []ListOutput{tc.lo},
Err: tc.listErr,
},
}
err := client.Publish(tc.publishAppID, tc.pubsubName, tc.topic, tc.payload, socket)
if tc.errorExpected {
assert.Error(t, err, "expected an error")
assert.Equal(t, tc.errString, err.Error(), "expected error strings to match")
} else {
assert.NoError(t, err, "expected no error")
}
})
}
}
}

View File

@ -41,6 +41,7 @@ type RunConfig struct {
AppSSL bool
MetricsPort int
MaxRequestBodySize int
UnixDomainSocket string
}
// RunOutput represents the run output.
@ -52,7 +53,8 @@ type RunOutput struct {
AppCMD *exec.Cmd
}
func getDaprCommand(appID string, daprHTTPPort int, daprGRPCPort int, appPort int, configFile, protocol string, enableProfiling bool, profilePort int, logLevel string, maxConcurrency int, placementHostAddr string, componentsPath string, appSSL bool, metricsPort int, requestBodySize int) (*exec.Cmd, int, int, int, error) {
func getDaprCommand(appID string, daprHTTPPort int, daprGRPCPort int, appPort int, configFile, protocol string, enableProfiling bool,
profilePort int, logLevel string, maxConcurrency int, placementHostAddr string, componentsPath string, appSSL bool, metricsPort int, requestBodySize int, unixDomainSocket string) (*exec.Cmd, int, int, int, error) {
if daprHTTPPort < 0 {
port, err := freeport.GetFreePort()
if err != nil {
@ -144,6 +146,10 @@ func getDaprCommand(appID string, daprHTTPPort int, daprGRPCPort int, appPort in
args = append(args, "--app-ssl")
}
if unixDomainSocket != "" {
args = append(args, "--unix-domain-socket", unixDomainSocket)
}
cmd := exec.Command(daprCMD, args...)
return cmd, daprHTTPPort, daprGRPCPort, metricsPort, nil
}
@ -210,7 +216,7 @@ func Run(config *RunConfig) (*RunOutput, error) {
return nil, err
}
daprCMD, daprHTTPPort, daprGRPCPort, metricsPort, err := getDaprCommand(appID, config.HTTPPort, config.GRPCPort, config.AppPort, config.ConfigFile, config.Protocol, config.EnableProfiling, config.ProfilePort, config.LogLevel, config.MaxConcurrency, config.PlacementHostAddr, config.ComponentsPath, config.AppSSL, config.MetricsPort, config.MaxRequestBodySize)
daprCMD, daprHTTPPort, daprGRPCPort, metricsPort, err := getDaprCommand(appID, config.HTTPPort, config.GRPCPort, config.AppPort, config.ConfigFile, config.Protocol, config.EnableProfiling, config.ProfilePort, config.LogLevel, config.MaxConcurrency, config.PlacementHostAddr, config.ComponentsPath, config.AppSSL, config.MetricsPort, config.MaxRequestBodySize, config.UnixDomainSocket)
if err != nil {
return nil, err
}

View File

@ -7,11 +7,16 @@ package standalone
import (
"bytes"
"fmt"
"net"
"net/http"
"net/http/httptest"
"github.com/dapr/cli/utils"
)
const SocketFormat = "/tmp/dapr-%s-http.socket"
type mockDaprProcess struct {
Lo []ListOutput
Err error
@ -40,3 +45,29 @@ func getTestServer(expectedPath, resp string) (*httptest.Server, int) {
return ts, ts.Listener.Addr().(*net.TCPAddr).Port
}
func getTestSocketServer(expectedPath, resp, appID, path string) (*http.Server, net.Listener) {
s := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if expectedPath != "" && r.RequestURI != expectedPath {
w.WriteHeader(http.StatusInternalServerError)
return
}
if r.Method == http.MethodGet {
w.Write([]byte(resp))
} else {
buf := new(bytes.Buffer)
buf.ReadFrom(r.Body)
w.Write(buf.Bytes())
}
}),
}
socket := utils.GetSocket(path, appID, "http")
l, err := net.Listen("unix", socket)
if err != nil {
panic(fmt.Sprintf("httptest: failed to listen on %v: %v", socket, err))
}
return s, l
}

View File

@ -1,3 +1,4 @@
//go:build e2e
// +build e2e
// ------------------------------------------------------------
@ -36,6 +37,8 @@ const (
daprDashboardVersion = "0.8.0"
)
var socketCases = []string{"", "/tmp"}
func TestStandaloneInstall(t *testing.T) {
// Ensure a clean environment
uninstall()
@ -346,23 +349,26 @@ func testInstall(t *testing.T) {
func testRun(t *testing.T) {
daprPath := getDaprPath()
t.Run("Normal exit", func(t *testing.T) {
output, err := spawn.Command(daprPath, "run", "--", "bash", "-c", "echo test")
t.Log(output)
require.NoError(t, err, "run failed")
assert.Contains(t, output, "Exited App successfully")
assert.Contains(t, output, "Exited Dapr successfully")
})
for _, path := range socketCases {
t.Run(fmt.Sprintf("Normal exit, socket: %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "run", "--unix-domain-socket", path, "--", "bash", "-c", "echo test")
t.Log(output)
require.NoError(t, err, "run failed")
assert.Contains(t, output, "Exited App successfully")
assert.Contains(t, output, "Exited Dapr successfully")
})
t.Run("Error exit", func(t *testing.T) {
output, err := spawn.Command(daprPath, "run", "--", "bash", "-c", "exit 1")
t.Log(output)
require.NoError(t, err, "run failed")
assert.Contains(t, output, "The App process exited with error code: exit status 1")
assert.Contains(t, output, "Exited Dapr successfully")
})
t.Run(fmt.Sprintf("Error exit, socket: %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "run", "--unix-domain-socket", path, "--", "bash", "-c", "exit 1")
t.Log(output)
require.NoError(t, err, "run failed")
assert.Contains(t, output, "The App process exited with error code: exit status 1")
assert.Contains(t, output, "Exited Dapr successfully")
})
t.Run("API shutdown", func(t *testing.T) {
}
t.Run("API shutdown without socket", func(t *testing.T) {
// Test that the CLI exits on a daprd shutdown.
output, err := spawn.Command(daprPath, "run", "--dapr-http-port", "9999", "--", "bash", "-c", "curl -v -X POST http://localhost:9999/v1.0/shutdown; sleep 10; exit 1")
t.Log(output)
@ -371,6 +377,14 @@ func testRun(t *testing.T) {
assert.Contains(t, output, "Exited Dapr successfully")
})
t.Run("API shutdown with socket", func(t *testing.T) {
// Test that the CLI exits on a daprd shutdown.
output, err := spawn.Command(daprPath, "run", "--app-id", "testapp", "--unix-domain-socket", "/tmp", "--", "bash", "-c", "curl --unix-socket /tmp/dapr-testapp-http.socket -v -X POST http://unix/v1.0/shutdown; sleep 10; exit 1")
t.Log(output)
require.NoError(t, err, "run failed")
assert.Contains(t, output, "Exited App successfully", "App should be shutdown before it has a chance to return non-zero")
assert.Contains(t, output, "Exited Dapr successfully")
})
}
func executeAgainstRunningDapr(t *testing.T, f func(), daprArgs ...string) {
@ -468,48 +482,49 @@ func testPublish(t *testing.T) {
}()
daprPath := getDaprPath()
executeAgainstRunningDapr(t, func() {
t.Run("publish from file", func(t *testing.T) {
output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--pubsub", "pubsub", "--topic", "sample", "--data-file", "../testdata/message.json")
for _, path := range socketCases {
executeAgainstRunningDapr(t, func() {
t.Run(fmt.Sprintf("publish from file with socket %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data-file", "../testdata/message.json")
t.Log(output)
assert.NoError(t, err, "unable to publish from --data-file")
assert.Contains(t, output, "Event published successfully")
event := <-events
assert.Equal(t, map[string]interface{}{"dapr": "is_great"}, event.Data)
})
t.Run(fmt.Sprintf("publish from string with socket %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data", "{\"cli\": \"is_working\"}")
t.Log(output)
assert.NoError(t, err, "unable to publish from --data")
assert.Contains(t, output, "Event published successfully")
event := <-events
assert.Equal(t, map[string]interface{}{"cli": "is_working"}, event.Data)
})
t.Run(fmt.Sprintf("publish from non-existent file fails with socket %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data-file", "a/file/that/does/not/exist")
t.Log(output)
assert.Contains(t, output, "Error reading payload from 'a/file/that/does/not/exist'. Error: ")
assert.Error(t, err, "a non-existent --data-file should fail with error")
})
t.Run(fmt.Sprintf("publish only one of data and data-file with socket %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data-file", "../testdata/message.json", "--data", "{\"cli\": \"is_working\"}")
t.Log(output)
assert.Error(t, err, "--data and --data-file should not be allowed together")
assert.Contains(t, output, "Only one of --data and --data-file allowed in the same publish command")
})
output, err := spawn.Command(getDaprPath(), "stop", "--app-id", "pub_e2e")
t.Log(output)
assert.NoError(t, err, "unable to publish from --data-file")
assert.Contains(t, output, "Event published successfully")
event := <-events
assert.Equal(t, map[string]interface{}{"dapr": "is_great"}, event.Data)
})
t.Run("publish from string", func(t *testing.T) {
output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--pubsub", "pubsub", "--topic", "sample", "--data", "{\"cli\": \"is_working\"}")
t.Log(output)
assert.NoError(t, err, "unable to publish from --data")
assert.Contains(t, output, "Event published successfully")
event := <-events
assert.Equal(t, map[string]interface{}{"cli": "is_working"}, event.Data)
})
t.Run("publish from non-existent file fails", func(t *testing.T) {
output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--pubsub", "pubsub", "--topic", "sample", "--data-file", "a/file/that/does/not/exist")
t.Log(output)
assert.Contains(t, output, "Error reading payload from 'a/file/that/does/not/exist'. Error: ")
assert.Error(t, err, "a non-existent --data-file should fail with error")
})
t.Run("publish only one of data and data-file", func(t *testing.T) {
output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--pubsub", "pubsub", "--topic", "sample", "--data-file", "../testdata/message.json", "--data", "{\"cli\": \"is_working\"}")
t.Log(output)
assert.Error(t, err, "--data and --data-file should not be allowed together")
assert.Contains(t, output, "Only one of --data and --data-file allowed in the same publish command")
})
output, err := spawn.Command(getDaprPath(), "stop", "--app-id", "pub_e2e")
t.Log(output)
require.NoError(t, err, "dapr stop failed")
assert.Contains(t, output, "app stopped successfully: pub_e2e")
}, "run", "--app-id", "pub_e2e", "--app-port", "9988")
require.NoError(t, err, "dapr stop failed")
assert.Contains(t, output, "app stopped successfully: pub_e2e")
}, "run", "--app-id", "pub_e2e", "--app-port", "9988", "--unix-domain-socket", path)
}
}
func testInvoke(t *testing.T) {
@ -534,42 +549,45 @@ func testInvoke(t *testing.T) {
}()
daprPath := getDaprPath()
executeAgainstRunningDapr(t, func() {
t.Run("data from file", func(t *testing.T) {
output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--method", "test", "--data-file", "../testdata/message.json")
t.Log(output)
assert.NoError(t, err, "unable to invoke with --data-file")
assert.Contains(t, output, "App invoked successfully")
assert.Contains(t, output, "{\"dapr\": \"is_great\"}")
})
for _, path := range socketCases {
executeAgainstRunningDapr(t, func() {
t.Run(fmt.Sprintf("data from file with socket %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--unix-domain-socket", path, "--method", "test", "--data-file", "../testdata/message.json")
t.Log(output)
assert.NoError(t, err, "unable to invoke with --data-file")
assert.Contains(t, output, "App invoked successfully")
assert.Contains(t, output, "{\"dapr\": \"is_great\"}")
})
t.Run("data from string", func(t *testing.T) {
output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--method", "test", "--data", "{\"cli\": \"is_working\"}")
t.Log(output)
assert.NoError(t, err, "unable to invoke with --data")
assert.Contains(t, output, "{\"cli\": \"is_working\"}")
assert.Contains(t, output, "App invoked successfully")
})
t.Run(fmt.Sprintf("data from string with socket %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--unix-domain-socket", path, "--method", "test", "--data", "{\"cli\": \"is_working\"}")
t.Log(output)
assert.NoError(t, err, "unable to invoke with --data")
assert.Contains(t, output, "{\"cli\": \"is_working\"}")
assert.Contains(t, output, "App invoked successfully")
})
t.Run("data from non-existent file fails", func(t *testing.T) {
output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--method", "test", "--data-file", "a/file/that/does/not/exist")
t.Log(output)
assert.Error(t, err, "a non-existent --data-file should fail with error")
assert.Contains(t, output, "Error reading payload from 'a/file/that/does/not/exist'. Error: ")
})
t.Run(fmt.Sprintf("data from non-existent file fails with socket %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--unix-domain-socket", path, "--method", "test", "--data-file", "a/file/that/does/not/exist")
t.Log(output)
assert.Error(t, err, "a non-existent --data-file should fail with error")
assert.Contains(t, output, "Error reading payload from 'a/file/that/does/not/exist'. Error: ")
})
t.Run("invoke only one of data and data-file", func(t *testing.T) {
output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--method", "test", "--data-file", "../testdata/message.json", "--data", "{\"cli\": \"is_working\"}")
t.Log(output)
assert.Error(t, err, "--data and --data-file should not be allowed together")
assert.Contains(t, output, "Only one of --data and --data-file allowed in the same invoke command")
})
t.Run(fmt.Sprintf("invoke only one of data and data-file with socket %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "invoke", "--app-id", "invoke_e2e", "--unix-domain-socket", path, "--method", "test", "--data-file", "../testdata/message.json", "--data", "{\"cli\": \"is_working\"}")
t.Log(output)
assert.Error(t, err, "--data and --data-file should not be allowed together")
assert.Contains(t, output, "Only one of --data and --data-file allowed in the same invoke command")
})
output, err := spawn.Command(getDaprPath(), "stop", "--app-id", "invoke_e2e")
t.Log(output)
require.NoError(t, err, "dapr stop failed")
assert.Contains(t, output, "app stopped successfully: invoke_e2e")
}, "run", "--app-id", "invoke_e2e", "--app-port", "9987")
output, err := spawn.Command(getDaprPath(), "stop", "--app-id", "invoke_e2e")
t.Log(output)
require.NoError(t, err, "dapr stop failed")
assert.Contains(t, output, "app stopped successfully: invoke_e2e")
}, "run", "--app-id", "invoke_e2e", "--app-port", "9987", "--unix-domain-socket", path)
}
}

View File

@ -26,6 +26,10 @@ import (
"gopkg.in/yaml.v2"
)
const (
socketFormat = "%s/dapr-%s-%s.socket"
)
// PrintTable to print in the table format.
func PrintTable(csvContent string) {
WriteTable(os.Stdout, csvContent)
@ -152,6 +156,24 @@ func IsDaprListeningOnPort(port int, timeout time.Duration) error {
}
}
func IsDaprListeningOnSocket(socket string, timeout time.Duration) error {
start := time.Now()
for {
conn, err := net.DialTimeout("unix", socket, timeout)
if err == nil {
conn.Close()
return nil
}
if time.Since(start).Seconds() >= timeout.Seconds() {
// Give up.
return err
}
time.Sleep(time.Second)
}
}
func MarshalAndWriteTable(writer io.Writer, in interface{}) error {
table, err := gocsv.MarshalString(in)
if err != nil {
@ -195,3 +217,7 @@ func IsAddressLegal(address string) bool {
}
return isLegal
}
func GetSocket(path, appID, protocol string) string {
return fmt.Sprintf(socketFormat, path, appID, protocol)
}