mirror of https://github.com/dapr/go-sdk.git
* WIP: server * gRPC server implementaiton * http serving * grpc ports as string vs int * bump tag * adds ctx, fixes linting errors * v0.8.2 bump * fixes serving options * cleans up cloud events in gttp serving * v0.8.4 version bump * uses http method constant * adds grpc serving tests * pubsub evnet as bytes * adds tests, seperate payload between http, grpc * refactored names, tests * fixes release names * fixes versoin number * http invoke test, grpc invoke params * makefile updates * readme updates, fixes grpc to http invocations * readme updates * comments, refactored http service * more serving tests * more client tests * test verbocity * fixes grpc error serializtion error * adds serving to readme * code formatting * adds support for TypeUrl in case of a proto conten * added binding in http service * cron in grpc example * single interface for grpc and http sdks * normalized serving interfaces across http and grpc * unit, rest api tests for grpc, html serving * updated tests * resolved master conflicts * changes from @youngbupark review * overloaded AddTopicEventHandler for ease of use * updated to go 1.15, dependancies * updated actions to go 1.15 * service test coverage * readme and release version updates * makefile flag options for dapr v0.9 * serving to grpc package for ease of http imp
This commit is contained in:
parent
3bd9a9afde
commit
eddc378eb0
|
@ -16,7 +16,7 @@ jobs:
|
|||
id: go
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: ^1.14
|
||||
go-version: ^1.15
|
||||
|
||||
- name: Checkout
|
||||
id: setup
|
||||
|
|
|
@ -14,7 +14,7 @@ jobs:
|
|||
id: go
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: ^1.14
|
||||
go-version: ^1.15
|
||||
|
||||
- name: Checkout
|
||||
id: setup
|
||||
|
|
54
Makefile
54
Makefile
|
@ -1,31 +1,65 @@
|
|||
RELEASE_VERSION =v0.9.0
|
||||
RELEASE_VERSION =v0.10.0
|
||||
GDOC_PORT =8888
|
||||
PROTO_ROOT =https://raw.githubusercontent.com/dapr/dapr/master/dapr/proto/
|
||||
|
||||
.PHONY: mod test cover service client lint protps tag docs clean help
|
||||
all: test
|
||||
|
||||
mod: ## Updates the go modules
|
||||
tidy: ## Updates the go modules
|
||||
go mod tidy
|
||||
|
||||
test: mod ## Tests the entire project
|
||||
go test -v -count=1 -race ./...
|
||||
# go test -v -count=1 -run NameOfSingleTest ./...
|
||||
go test -count=1 -race ./...
|
||||
|
||||
cover: mod ## Displays test coverage in the Client package
|
||||
go test -coverprofile=cover.out ./client && go tool cover -html=cover.out
|
||||
cover: mod ## Displays test coverage in the client and service packages
|
||||
go test -coverprofile=cover-client.out ./client && go tool cover -html=cover-client.out
|
||||
go test -coverprofile=cover-service.out ./service/grpc && go tool cover -html=cover-service.out
|
||||
|
||||
service: mod ## Runs the uncompiled example service code
|
||||
dapr run --app-id serving \
|
||||
--app-protocol grpc \
|
||||
--app-protocol grpc \
|
||||
--app-port 50001 \
|
||||
go run example/serving/main.go
|
||||
--port 3500 \
|
||||
--log-level debug \
|
||||
--components-path example/serving/grpc/config \
|
||||
go run example/serving/grpc/main.go
|
||||
|
||||
service09: mod ## Runs the uncompiled example service code using the Dapr v0.9 flags
|
||||
dapr run --app-id serving \
|
||||
--protocol grpc \
|
||||
--app-port 50001 \
|
||||
--port 3500 \
|
||||
--log-level debug \
|
||||
--components-path example/serving/grpc/config \
|
||||
go run example/serving/grpc/main.go
|
||||
|
||||
client: mod ## Runs the uncompiled example client code
|
||||
dapr run --app-id caller \
|
||||
--components-path example/client/comp \
|
||||
--components-path example/client/config \
|
||||
--log-level debug \
|
||||
go run example/client/main.go
|
||||
|
||||
pubsub: ## Submits pub/sub events in different cotnent types
|
||||
curl -d '{ "from": "John", "to": "Lary", "message": "hi" }' \
|
||||
-H "Content-type: application/json" \
|
||||
"http://localhost:3500/v1.0/publish/messages"
|
||||
curl -d '<message><from>John</from><to>Lary</to></message>' \
|
||||
-H "Content-type: application/xml" \
|
||||
"http://localhost:3500/v1.0/publish/messages"
|
||||
curl -d '0x18, 0x2d, 0x44, 0x54, 0xfb, 0x21, 0x09, 0x40' \
|
||||
-H "Content-type: application/octet-stream" \
|
||||
"http://localhost:3500/v1.0/publish/messages"
|
||||
|
||||
invoke: ## Invokes service method with different operations
|
||||
curl -d '{ "from": "John", "to": "Lary", "message": "hi" }' \
|
||||
-H "Content-type: application/json" \
|
||||
"http://localhost:3500/v1.0/invoke/serving/method/echo"
|
||||
curl -d "ping" \
|
||||
-H "Content-type: text/plain;charset=UTF-8" \
|
||||
"http://localhost:3500/v1.0/invoke/serving/method/echo"
|
||||
curl -X DELETE \
|
||||
"http://localhost:3500/v1.0/invoke/serving/method/echo?k1=v1&k2=v2"
|
||||
|
||||
lint: ## Lints the entire project
|
||||
golangci-lint run --timeout=3m
|
||||
|
||||
|
@ -48,7 +82,7 @@ clean: ## Cleans go and generated files in ./dapr/proto/
|
|||
rm -fr ./dapr/proto/common/v1/*
|
||||
rm -fr ./dapr/proto/runtime/v1/*
|
||||
|
||||
protos: ## Downloads proto files from dapr/dapr and generats gRPC proto clients
|
||||
protos: ## Downloads proto files from dapr/dapr master and generats gRPC proto clients
|
||||
go install github.com/gogo/protobuf/gogoreplace
|
||||
|
||||
wget -q $(PROTO_ROOT)/common/v1/common.proto -O ./dapr/proto/common/v1/common.proto
|
||||
|
|
68
Readme.md
68
Readme.md
|
@ -1,18 +1,24 @@
|
|||
# Dapr SDK for Go
|
||||
|
||||
This is the dapr SDK (client) for go (golang). It covers all of the APIs described in Dapr's [protocol buffers](https://raw.githubusercontent.com/dapr/dapr/master/dapr/proto/) with focus on developer productivity.
|
||||
Client library to accelerate Dapr application development in go. This client supports all public [Dapr API](https://github.com/dapr/docs/tree/master/reference/api) and focuses on developer productivity.
|
||||
|
||||
[](https://github.com/dapr/go-sdk/actions?query=workflow%3ATest) [](https://github.com/dapr/go-sdk/actions?query=workflow%3ARelease) [](https://goreportcard.com/report/github.com/dapr/go-sdk) 
|
||||
|
||||
## Installation
|
||||
## Usage
|
||||
|
||||
To install Dapr client package, you need to first [install go](https://golang.org/doc/install) and set up your development environment. To import Dapr go client in your code:
|
||||
> Assuming you already have [installed](https://golang.org/doc/install) go
|
||||
|
||||
Dapr go client includes two packages: `client` (for invoking public Dapr API) and `service` (to create services in go that can be invoked by Dapr, this is sometimes refereed to as "callback").
|
||||
|
||||
### Client
|
||||
|
||||
Import Dapr go `client` package:
|
||||
|
||||
```go
|
||||
import "github.com/dapr/go-sdk/client"
|
||||
```
|
||||
|
||||
## Quick start
|
||||
#### Quick start
|
||||
|
||||
```go
|
||||
package main
|
||||
|
@ -27,26 +33,34 @@ func main() {
|
|||
panic(err)
|
||||
}
|
||||
defer client.Close()
|
||||
//TODO: use the client here
|
||||
//TODO: use the client here, see below for examples
|
||||
}
|
||||
```
|
||||
|
||||
Assuming you have Dapr CLI installed locally, you can then launch your app like this:
|
||||
Assuming you have [Dapr CLI](https://github.com/dapr/docs/blob/master/getting-started/environment-setup.md) installed locally, you can then launch your app locally like this:
|
||||
|
||||
```shell
|
||||
dapr run --app-id my-app --app-protocol grpc --app-port 50001 go run main.go
|
||||
dapr run --app-id example-service \
|
||||
--app-protocol grpc \
|
||||
--app-port 50001 \
|
||||
go run main.go
|
||||
```
|
||||
|
||||
See [example folder](./example) for complete example.
|
||||
Check the [example folder](./example) for working Dapr go client examples.
|
||||
|
||||
To accelerate your Dapr service development even more, consider the GitHub templates with complete gRPC solutions for two common use-cases:
|
||||
|
||||
* [gRPC Event Subscriber Template](https://github.com/mchmarny/dapr-grpc-event-subscriber-template) for pub/sub event processing
|
||||
* [gRPC Serving Service Template ](https://github.com/mchmarny/dapr-grpc-service-template) which creates a target for service to service invocations
|
||||
|
||||
|
||||
## Examples
|
||||
#### Usage
|
||||
|
||||
Few common Dapr client usage examples
|
||||
The Dapr go client supports following functionality:
|
||||
|
||||
### State
|
||||
##### State
|
||||
|
||||
For simple use-cases, Dapr client provides easy to use methods:
|
||||
For simple use-cases, Dapr client provides easy to use methods for `Save`, `Get`, and `Delete`:
|
||||
|
||||
```go
|
||||
ctx := context.Background()
|
||||
|
@ -54,7 +68,7 @@ data := []byte("hello")
|
|||
store := "my-store" // defined in the component YAML
|
||||
|
||||
// save state with the key
|
||||
err = client.SaveStateData(ctx, store, "k1", "v1", data)
|
||||
err = client.SaveStateData(ctx, store, "k1", data)
|
||||
handleErrors(err)
|
||||
|
||||
// get state for key
|
||||
|
@ -66,7 +80,7 @@ err = client.DeleteState(ctx, store, "k1")
|
|||
handleErrors(err)
|
||||
```
|
||||
|
||||
The `StateItem` type exposed by Dapr client provides more granular control options:
|
||||
For more granular control, the Dapr go client exposed `StateItem` type which can be use to gain more control over the state operations:
|
||||
|
||||
```go
|
||||
data := &client.StateItem{
|
||||
|
@ -80,17 +94,12 @@ data := &client.StateItem{
|
|||
Options: &client.StateOptions{
|
||||
Concurrency: client.StateConcurrencyLastWrite,
|
||||
Consistency: client.StateConsistencyStrong,
|
||||
RetryPolicy: &client.StateRetryPolicy{
|
||||
Threshold: 3,
|
||||
Pattern: client.RetryPatternExponential,
|
||||
Interval: time.Duration(5 * time.Second),
|
||||
},
|
||||
},
|
||||
}
|
||||
err = client.SaveStateItem(ctx, store, data)
|
||||
```
|
||||
|
||||
Similar `StateOptions` exist on `GetDate` and `DeleteState` methods. Additionally, Dapr client also provides a method to save multiple state items at once:
|
||||
Similarly, `StateOptions` exist on the `GetDate` and `DeleteState` methods to support multiple item operations at once:
|
||||
|
||||
```go
|
||||
data := &client.State{
|
||||
|
@ -109,17 +118,17 @@ data := &client.State{
|
|||
err = client.SaveState(ctx, data)
|
||||
```
|
||||
|
||||
### PubSub
|
||||
##### PubSub
|
||||
|
||||
To publish data onto a topic the Dapr client provides a simple method:
|
||||
|
||||
```go
|
||||
data := []byte("hello")
|
||||
data := []byte(`{ "id": "a123", "value": "abcdefg", "valid": true }`)
|
||||
err = client.PublishEvent(ctx, "topic-name", data)
|
||||
handleErrors(err)
|
||||
```
|
||||
|
||||
### Service Invocation
|
||||
##### Service Invocation
|
||||
|
||||
To invoke a specific method on another service running with Dapr sidecar, the Dapr client provides two options. To invoke a service without any data:
|
||||
|
||||
|
@ -131,12 +140,12 @@ handleErrors(err)
|
|||
And to invoke a service with data:
|
||||
|
||||
```go
|
||||
data := []byte("hello")
|
||||
resp, err := client.InvokeServiceWithContent(ctx, "service-name", "method-name", "text/plain", data)
|
||||
data := []byte(`{ "id": "a123", "value": "abcdefg", "valid": true }`)
|
||||
resp, err := client.InvokeServiceWithContent(ctx, "service-name", "method-name", "application/json", data)
|
||||
handleErrors(err)
|
||||
```
|
||||
|
||||
### Bindings
|
||||
##### Bindings
|
||||
|
||||
Similarly to Service, Dapr client provides two methods to invoke an operation on a [Dapr-defined binding](https://github.com/dapr/docs/tree/master/concepts/bindings). Dapr supports input, output, and bidirectional bindings so the first methods supports all of them along with metadata options:
|
||||
|
||||
|
@ -158,7 +167,7 @@ err = client.InvokeOutputBinding(ctx, "binding-name", "operation-name", data)
|
|||
handleErrors(err)
|
||||
```
|
||||
|
||||
### Secrets
|
||||
##### Secrets
|
||||
|
||||
The Dapr client also provides access to the runtime secrets that can be backed by any number of secrete stores (e.g. Kubernetes Secrets, Hashicorp Vault, or Azure KeyVault):
|
||||
|
||||
|
@ -170,6 +179,11 @@ secret, err = client.GetSecret(ctx, "store-name", "secret-name", opt)
|
|||
handleErrors(err)
|
||||
```
|
||||
|
||||
## Service (callback)
|
||||
|
||||
In addition to a an easy to use client, Dapr go package also provides implementation for `service`. Instructions on how to use it are located [here](./service/grpc/Readme.md)
|
||||
|
||||
|
||||
## Contributing to Dapr go client
|
||||
|
||||
See the [Contribution Guide](./CONTRIBUTING.md) to get started with building and developing.
|
||||
|
|
|
@ -7,20 +7,36 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// go test -timeout 30s ./client -count 1 -run ^TestInvokeBinding$
|
||||
|
||||
func TestInvokeBinding(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
data := "ping"
|
||||
|
||||
mIn := make(map[string]string)
|
||||
mIn["test"] = "value"
|
||||
t.Run("output binding", func(t *testing.T) {
|
||||
err := testClient.InvokeOutputBinding(ctx, "test", "fn", []byte(data))
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("output binding without data", func(t *testing.T) {
|
||||
err := testClient.InvokeOutputBinding(ctx, "test", "fn", []byte(data))
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("binding without data", func(t *testing.T) {
|
||||
out, mOut, err := testClient.InvokeBinding(ctx, "test", "fn", nil, nil)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, mOut)
|
||||
assert.NotNil(t, out)
|
||||
})
|
||||
|
||||
t.Run("binding with data and meta", func(t *testing.T) {
|
||||
mIn := map[string]string{"k1": "v1", "k2": "v2"}
|
||||
out, mOut, err := testClient.InvokeBinding(ctx, "test", "fn", []byte(data), mIn)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, mOut)
|
||||
assert.NotNil(t, out)
|
||||
assert.Equal(t, data, string(out))
|
||||
})
|
||||
|
||||
out, mOut, err := testClient.InvokeBinding(ctx, "serving", "EchoMethod", []byte("ping"), mIn)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, mOut)
|
||||
assert.NotNil(t, out)
|
||||
}
|
||||
|
||||
func TestInvokeOutputBinding(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
err := testClient.InvokeOutputBinding(ctx, "serving", "EchoMethod", []byte("ping"))
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ type Client interface {
|
|||
// SaveStateData saves the raw data into store using default state options.
|
||||
SaveStateData(ctx context.Context, store, key string, data []byte) error
|
||||
|
||||
// SaveStateDataWithETag saves the raw data into store using default state options and provided ETag.
|
||||
// SaveStateDataWithETag saves the raw data into store using default state options and ETag.
|
||||
SaveStateDataWithETag(ctx context.Context, store, key, etag string, data []byte) error
|
||||
|
||||
// SaveStateItem saves the single state item to store.
|
||||
|
@ -69,8 +69,8 @@ type Client interface {
|
|||
// DeleteState deletes content from store using default state options.
|
||||
DeleteState(ctx context.Context, store, key string) error
|
||||
|
||||
// DeleteStateVersion deletes content from store using provided state options and etag.
|
||||
DeleteStateVersion(ctx context.Context, store, key, etag string, opts *StateOptions) error
|
||||
// DeleteStateWithETag deletes content from store using provided state options and etag.
|
||||
DeleteStateWithETag(ctx context.Context, store, key, etag string, opts *StateOptions) error
|
||||
|
||||
// Close cleans up all resources created by the client.
|
||||
Close()
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/test/bufconn"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
|
||||
commonv1pb "github.com/dapr/go-sdk/dapr/proto/common/v1"
|
||||
pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
|
||||
|
@ -76,9 +77,17 @@ type testDaprServer struct {
|
|||
}
|
||||
|
||||
func (s *testDaprServer) InvokeService(ctx context.Context, req *pb.InvokeServiceRequest) (*commonv1pb.InvokeResponse, error) {
|
||||
if req.Message == nil {
|
||||
return &commonv1pb.InvokeResponse{
|
||||
ContentType: "text/plain",
|
||||
Data: &anypb.Any{
|
||||
Value: []byte("pong"),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
return &commonv1pb.InvokeResponse{
|
||||
ContentType: req.Message.ContentType,
|
||||
Data: req.GetMessage().Data,
|
||||
Data: req.Message.Data,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -106,6 +115,12 @@ func (s *testDaprServer) PublishEvent(ctx context.Context, req *pb.PublishEventR
|
|||
}
|
||||
|
||||
func (s *testDaprServer) InvokeBinding(ctx context.Context, req *pb.InvokeBindingRequest) (*pb.InvokeBindingResponse, error) {
|
||||
if req.Data == nil {
|
||||
return &pb.InvokeBindingResponse{
|
||||
Data: []byte("test"),
|
||||
Metadata: map[string]string{"k1": "v1", "k2": "v2"},
|
||||
}, nil
|
||||
}
|
||||
return &pb.InvokeBindingResponse{
|
||||
Data: req.Data,
|
||||
Metadata: req.Metadata,
|
||||
|
|
|
@ -41,6 +41,9 @@ func (c *GRPCClient) InvokeService(ctx context.Context, serviceID, method string
|
|||
Id: serviceID,
|
||||
Message: &v1.InvokeRequest{
|
||||
Method: method,
|
||||
HttpExtension: &v1.HTTPExtension{
|
||||
Verb: v1.HTTPExtension_POST,
|
||||
},
|
||||
},
|
||||
}
|
||||
return c.invokeServiceWithRequest(ctx, req)
|
||||
|
@ -64,6 +67,9 @@ func (c *GRPCClient) InvokeServiceWithContent(ctx context.Context, serviceID, me
|
|||
Method: method,
|
||||
Data: &anypb.Any{Value: data},
|
||||
ContentType: contentType,
|
||||
HttpExtension: &v1.HTTPExtension{
|
||||
Verb: v1.HTTPExtension_POST,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -7,18 +7,24 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// go test -timeout 30s ./client -count 1 -run ^TestInvokeServiceWithContent$
|
||||
|
||||
func TestInvokeServiceWithContent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
resp, err := testClient.InvokeServiceWithContent(ctx, "serving", "EchoMethod",
|
||||
"text/plain; charset=UTF-8", []byte("ping"))
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, resp)
|
||||
assert.Equal(t, string(resp), "ping")
|
||||
}
|
||||
data := "ping"
|
||||
|
||||
func TestInvokeService(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
resp, err := testClient.InvokeService(ctx, "serving", "EchoMethod")
|
||||
assert.Nil(t, err)
|
||||
assert.Nil(t, resp)
|
||||
t.Run("with content", func(t *testing.T) {
|
||||
resp, err := testClient.InvokeServiceWithContent(ctx, "test", "fn", "text/plain", []byte(data))
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, resp)
|
||||
assert.Equal(t, string(resp), data)
|
||||
|
||||
})
|
||||
|
||||
t.Run("without content", func(t *testing.T) {
|
||||
resp, err := testClient.InvokeService(ctx, "test", "fn")
|
||||
assert.Nil(t, err)
|
||||
assert.Nil(t, resp)
|
||||
|
||||
})
|
||||
}
|
||||
|
|
|
@ -7,8 +7,22 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// go test -timeout 30s ./client -count 1 -run ^TestPublishEvent$
|
||||
func TestPublishEvent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
err := testClient.PublishEvent(ctx, "serving", []byte("ping"))
|
||||
assert.Nil(t, err)
|
||||
|
||||
t.Run("with data", func(t *testing.T) {
|
||||
err := testClient.PublishEvent(ctx, "test", []byte("ping"))
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("with empty topic name", func(t *testing.T) {
|
||||
err := testClient.PublishEvent(ctx, "", []byte("ping"))
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("without data", func(t *testing.T) {
|
||||
err := testClient.PublishEvent(ctx, "test", nil)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -7,16 +7,20 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// go test -timeout 30s ./client -count 1 -run ^TestGetSecret$
|
||||
func TestGetSecret(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
out, err := testClient.GetSecret(ctx, "store", "key1", nil)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, out)
|
||||
|
||||
in := make(map[string]string)
|
||||
in["test"] = "value"
|
||||
t.Run("without meta", func(t *testing.T) {
|
||||
out, err := testClient.GetSecret(ctx, "store", "key1", nil)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, out)
|
||||
})
|
||||
|
||||
out, err = testClient.GetSecret(ctx, "store", "key1", in)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, out)
|
||||
t.Run("with meta", func(t *testing.T) {
|
||||
in := map[string]string{"k1": "v1", "k2": "v2"}
|
||||
out, err := testClient.GetSecret(ctx, "store", "key1", in)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, out)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -145,11 +145,6 @@ func (c *GRPCClient) SaveState(ctx context.Context, s *State) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SaveStateData saves the raw data into store using default state options.
|
||||
func (c *GRPCClient) SaveStateData(ctx context.Context, store, key string, data []byte) error {
|
||||
return c.SaveStateDataWithETag(ctx, store, key, "", data)
|
||||
}
|
||||
|
||||
// SaveStateDataWithETag saves the raw data into store using default state options.
|
||||
func (c *GRPCClient) SaveStateDataWithETag(ctx context.Context, store, key, etag string, data []byte) error {
|
||||
if store == "" {
|
||||
|
@ -173,6 +168,11 @@ func (c *GRPCClient) SaveStateDataWithETag(ctx context.Context, store, key, etag
|
|||
return c.SaveState(ctx, req)
|
||||
}
|
||||
|
||||
// SaveStateData saves the raw data into store using default state options.
|
||||
func (c *GRPCClient) SaveStateData(ctx context.Context, store, key string, data []byte) error {
|
||||
return c.SaveStateDataWithETag(ctx, store, key, "", data)
|
||||
}
|
||||
|
||||
// SaveStateItem saves the single state item to store.
|
||||
func (c *GRPCClient) SaveStateItem(ctx context.Context, store string, item *StateItem) error {
|
||||
if store == "" {
|
||||
|
@ -220,11 +220,11 @@ func (c *GRPCClient) GetStateWithConsistency(ctx context.Context, store, key str
|
|||
|
||||
// DeleteState deletes content from store using default state options.
|
||||
func (c *GRPCClient) DeleteState(ctx context.Context, store, key string) error {
|
||||
return c.DeleteStateVersion(ctx, store, key, "", nil)
|
||||
return c.DeleteStateWithETag(ctx, store, key, "", nil)
|
||||
}
|
||||
|
||||
// DeleteStateVersion deletes content from store using provided state options and etag.
|
||||
func (c *GRPCClient) DeleteStateVersion(ctx context.Context, store, key, etag string, opts *StateOptions) error {
|
||||
// DeleteStateWithETag deletes content from store using provided state options and etag.
|
||||
func (c *GRPCClient) DeleteStateWithETag(ctx context.Context, store, key, etag string, opts *StateOptions) error {
|
||||
if store == "" {
|
||||
return errors.New("nil store")
|
||||
}
|
||||
|
|
|
@ -27,19 +27,33 @@ func TestStateOptionsConverter(t *testing.T) {
|
|||
assert.Equal(t, p.Consistency, v1.StateOptions_CONSISTENCY_STRONG)
|
||||
}
|
||||
|
||||
func TestSaveStateData(t *testing.T) {
|
||||
// go test -timeout 30s ./client -count 1 -run ^TestSaveState$
|
||||
func TestSaveState(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
data := "test"
|
||||
store := "test"
|
||||
key := "key1"
|
||||
|
||||
err := testClient.SaveStateData(ctx, "store", "key1", []byte(data))
|
||||
assert.Nil(t, err)
|
||||
t.Run("save data", func(t *testing.T) {
|
||||
err := testClient.SaveStateData(ctx, store, key, []byte(data))
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
out, etag, err := testClient.GetState(ctx, "store", "key1")
|
||||
assert.Nil(t, err)
|
||||
assert.NotEmpty(t, etag)
|
||||
assert.NotNil(t, out)
|
||||
assert.Equal(t, string(out), data)
|
||||
t.Run("get saved data", func(t *testing.T) {
|
||||
out, etag, err := testClient.GetState(ctx, store, key)
|
||||
assert.Nil(t, err)
|
||||
assert.NotEmpty(t, etag)
|
||||
assert.NotNil(t, out)
|
||||
assert.Equal(t, string(out), data)
|
||||
})
|
||||
|
||||
err = testClient.DeleteState(ctx, "store", "key1")
|
||||
assert.Nil(t, err)
|
||||
t.Run("save data with version", func(t *testing.T) {
|
||||
err := testClient.SaveStateDataWithETag(ctx, store, key, "1", []byte(data))
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("delete data", func(t *testing.T) {
|
||||
err := testClient.DeleteState(ctx, store, key)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,23 +1,85 @@
|
|||
# Dapr go client example
|
||||
|
||||
The `example` folder contains a Dapr enabled `serving` app and a `client` app that uses this SDK to invoke Dapr API for state and events, The `serving` app is available as HTTP or gRPC. The `client` app can target either one of these for service to service and binding invocations.
|
||||
|
||||
The `example` folder contains a Dapr enabled `serving` app a `client` app that uses this SDK to invoke Dapr API for state and events, `serving` app for service to service invocation, and a simple HTTP binding to illustrate output binding. To run the example:
|
||||
|
||||
1. Start the `serving` app in the `example/serving` directory
|
||||
To run this example, start by first launching the service:
|
||||
|
||||
```
|
||||
cd example/serving
|
||||
cd example/serving/grpc
|
||||
dapr run --app-id serving \
|
||||
--app-protocol grpc \
|
||||
--app-port 50001 \
|
||||
--port 3500 \
|
||||
--log-level debug \
|
||||
--components-path ./config \
|
||||
go run main.go
|
||||
```
|
||||
|
||||
2. Start the `client` app in the `example/client` directory
|
||||
## Client
|
||||
|
||||
Once one of the above services is running is running, launch the client:
|
||||
|
||||
```
|
||||
cd example/client
|
||||
dapr run --app-id caller \
|
||||
--components-path ./comp \
|
||||
--components-path ./config \
|
||||
--log-level debug \
|
||||
go run main.go
|
||||
```
|
||||
|
||||
## API
|
||||
|
||||
### PubSub
|
||||
|
||||
Publish JSON content
|
||||
|
||||
```shell
|
||||
curl -d '{ "from": "John", "to": "Lary", "message": "hi" }' \
|
||||
-H "Content-type: application/json" \
|
||||
"http://localhost:3500/v1.0/publish/messages"
|
||||
```
|
||||
|
||||
Publish XML content (read as text)
|
||||
|
||||
```shell
|
||||
curl -d '<message><from>John</from><to>Lary</to></message>' \
|
||||
-H "Content-type: application/xml" \
|
||||
"http://localhost:3500/v1.0/publish/messages"
|
||||
```
|
||||
|
||||
Publish BIN content
|
||||
|
||||
```shell
|
||||
curl -d '0x18, 0x2d, 0x44, 0x54, 0xfb, 0x21, 0x09, 0x40' \
|
||||
-H "Content-type: application/octet-stream" \
|
||||
"http://localhost:3500/v1.0/publish/messages"
|
||||
```
|
||||
|
||||
### Service Invocation
|
||||
|
||||
Invoke service with JSON payload
|
||||
|
||||
```shell
|
||||
curl -d '{ "from": "John", "to": "Lary", "message": "hi" }' \
|
||||
-H "Content-type: application/json" \
|
||||
"http://localhost:3500/v1.0/invoke/serving/method/echo"
|
||||
```
|
||||
|
||||
Invoke service with plain text message
|
||||
|
||||
```shell
|
||||
curl -d "ping" \
|
||||
-H "Content-type: text/plain;charset=UTF-8" \
|
||||
"http://localhost:3500/v1.0/invoke/serving/method/echo"
|
||||
```
|
||||
|
||||
Invoke service with no content
|
||||
|
||||
```shell
|
||||
curl -X DELETE \
|
||||
"http://localhost:3500/v1.0/invoke/serving/method/echo?k1=v1&k2=v2"
|
||||
```
|
||||
|
||||
### Input Binding
|
||||
|
||||
Uses the [config/cron.yaml](config/cron.yaml) component
|
|
@ -53,8 +53,7 @@ func main() {
|
|||
logger.Println("data deleted")
|
||||
|
||||
// invoke a method called EchoMethod on another dapr enabled service
|
||||
resp, err := client.InvokeServiceWithContent(ctx, "serving", "EchoMethod",
|
||||
"text/plain; charset=UTF-8", data)
|
||||
resp, err := client.InvokeServiceWithContent(ctx, "serving", "echo", "text/plain", data)
|
||||
if err != nil {
|
||||
logger.Panic(err)
|
||||
}
|
||||
|
@ -64,5 +63,6 @@ func main() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
logger.Println("binding invoked")
|
||||
logger.Println("output binding invoked")
|
||||
logger.Println("DONE (CTRL+C to Exit)")
|
||||
}
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: run
|
||||
spec:
|
||||
type: bindings.cron
|
||||
metadata:
|
||||
- name: schedule
|
||||
value: "@every 10s"
|
|
@ -0,0 +1,11 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: messages
|
||||
spec:
|
||||
type: pubsub.redis
|
||||
metadata:
|
||||
- name: redisHost
|
||||
value: localhost:6379
|
||||
- name: redisPassword
|
||||
value: ""
|
|
@ -0,0 +1,13 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: statestore
|
||||
spec:
|
||||
type: state.redis
|
||||
metadata:
|
||||
- name: redisHost
|
||||
value: localhost:6379
|
||||
- name: redisPassword
|
||||
value: ""
|
||||
- name: actorStateStore
|
||||
value: "true"
|
|
@ -0,0 +1,67 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log"
|
||||
|
||||
daprd "github.com/dapr/go-sdk/service/grpc"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// create a Dapr service server
|
||||
s, err := daprd.NewService(":50001")
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start the server: %v", err)
|
||||
}
|
||||
|
||||
// add some topic subscriptions
|
||||
err = s.AddTopicEventHandler("messages", eventHandler)
|
||||
if err != nil {
|
||||
log.Fatalf("error adding topic subscription: %v", err)
|
||||
}
|
||||
|
||||
// add a service to service invocation handler
|
||||
err = s.AddServiceInvocationHandler("echo", echoHandler)
|
||||
if err != nil {
|
||||
log.Fatalf("error adding invocation handler: %v", err)
|
||||
}
|
||||
|
||||
// add a binding invocation handler
|
||||
err = s.AddBindingInvocationHandler("run", runHandler)
|
||||
if err != nil {
|
||||
log.Fatalf("error adding binding handler: %v", err)
|
||||
}
|
||||
|
||||
// start the server
|
||||
if err := s.Start(); err != nil {
|
||||
log.Fatalf("server error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func eventHandler(ctx context.Context, e *daprd.TopicEvent) error {
|
||||
log.Printf("event - Topic:%s, ID:%s, Data: %v", e.Topic, e.ID, e.Data)
|
||||
return nil
|
||||
}
|
||||
|
||||
func echoHandler(ctx context.Context, in *daprd.InvocationEvent) (out *daprd.Content, err error) {
|
||||
if in == nil {
|
||||
err = errors.New("nil invocation parameter")
|
||||
return
|
||||
}
|
||||
log.Printf(
|
||||
"echo - ContentType:%s, Verb:%s, QueryString:%s, %+v",
|
||||
in.ContentType, in.Verb, in.QueryString, string(in.Data),
|
||||
)
|
||||
out = &daprd.Content{
|
||||
Data: in.Data,
|
||||
ContentType: in.ContentType,
|
||||
DataTypeURL: in.DataTypeURL,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func runHandler(ctx context.Context, in *daprd.BindingEvent) (out []byte, err error) {
|
||||
log.Printf("binding - Data:%v, Meta:%v", in.Data, in.Metadata)
|
||||
return nil, nil
|
||||
}
|
|
@ -1,89 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
|
||||
"github.com/golang/protobuf/ptypes/any"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
|
||||
commonv1pb "github.com/dapr/go-sdk/dapr/proto/common/v1"
|
||||
pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// server is our user app
|
||||
type server struct {
|
||||
}
|
||||
|
||||
func main() {
|
||||
// create listener
|
||||
lis, err := net.Listen("tcp", ":50001")
|
||||
if err != nil {
|
||||
log.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
|
||||
// create grpc server
|
||||
s := grpc.NewServer()
|
||||
pb.RegisterAppCallbackServer(s, &server{})
|
||||
|
||||
fmt.Println("Client starting...")
|
||||
|
||||
// and start...
|
||||
if err := s.Serve(lis); err != nil {
|
||||
log.Fatalf("failed to serve: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// EchoMethod is a simple demo method to invoke
|
||||
func (s *server) EchoMethod() string {
|
||||
return "pong"
|
||||
}
|
||||
|
||||
// This method gets invoked when a remote service has called the app through Dapr
|
||||
// The payload carries a Method to identify the method, a set of metadata properties and an optional payload
|
||||
func (s *server) OnInvoke(ctx context.Context, in *commonv1pb.InvokeRequest) (*commonv1pb.InvokeResponse, error) {
|
||||
var response string
|
||||
|
||||
switch in.Method {
|
||||
case "EchoMethod":
|
||||
response = s.EchoMethod()
|
||||
}
|
||||
|
||||
return &commonv1pb.InvokeResponse{
|
||||
ContentType: "text/plain; charset=UTF-8",
|
||||
Data: &any.Any{Value: []byte(response)},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Dapr will call this method to get the list of topics the app wants to subscribe to. In this example, we are telling Dapr
|
||||
// To subscribe to a topic named TopicA
|
||||
func (s *server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*pb.ListTopicSubscriptionsResponse, error) {
|
||||
return &pb.ListTopicSubscriptionsResponse{
|
||||
Subscriptions: []*pb.TopicSubscription{
|
||||
{Topic: "TopicA"},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Dapr will call this method to get the list of bindings the app will get invoked by. In this example, we are telling Dapr
|
||||
// To invoke our app with a binding named storage
|
||||
func (s *server) ListInputBindings(ctx context.Context, in *empty.Empty) (*pb.ListInputBindingsResponse, error) {
|
||||
return &pb.ListInputBindingsResponse{
|
||||
Bindings: []string{"storage"},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// This method gets invoked every time a new event is fired from a registerd binding. The message carries the binding name, a payload and optional metadata
|
||||
func (s *server) OnBindingEvent(ctx context.Context, in *pb.BindingEventRequest) (*pb.BindingEventResponse, error) {
|
||||
fmt.Println("Invoked from binding")
|
||||
return &pb.BindingEventResponse{}, nil
|
||||
}
|
||||
|
||||
// This method is fired whenever a message has been published to a topic that has been subscribed. Dapr sends published messages in a CloudEvents 0.3 envelope.
|
||||
func (s *server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*pb.TopicEventResponse, error) {
|
||||
fmt.Println("Topic message arrived")
|
||||
return &pb.TopicEventResponse{}, nil
|
||||
}
|
15
go.mod
15
go.mod
|
@ -1,17 +1,20 @@
|
|||
module github.com/dapr/go-sdk
|
||||
|
||||
go 1.14
|
||||
go 1.15
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/golang/protobuf v1.4.2
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/stretchr/testify v1.6.1
|
||||
golang.org/x/net v0.0.0-20200602114024-627f9648deb9 // indirect
|
||||
golang.org/x/sys v0.0.0-20200620081246-981b61492c35 // indirect
|
||||
golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect
|
||||
golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed // indirect
|
||||
golang.org/x/text v0.3.3 // indirect
|
||||
google.golang.org/genproto v0.0.0-20200622133129-d0ee0c36e670 // indirect
|
||||
google.golang.org/grpc v1.29.1
|
||||
google.golang.org/protobuf v1.24.0
|
||||
google.golang.org/genproto v0.0.0-20200808173500-a06252235341 // indirect
|
||||
google.golang.org/grpc v1.31.0
|
||||
google.golang.org/protobuf v1.25.0
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
|
||||
)
|
||||
|
|
33
go.sum
33
go.sum
|
@ -3,6 +3,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
|
|||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
|
@ -28,6 +29,14 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
|
|||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
|
||||
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
|
@ -37,6 +46,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
|
|||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
|
@ -45,17 +55,19 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r
|
|||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20200602114024-627f9648deb9 h1:pNX+40auqi2JqRfOP1akLGtYcn15TUbkhwuCO3foqqM=
|
||||
golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
|
||||
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200620081246-981b61492c35 h1:wb/9mP8eUAmHfkM8RmpeLq6nUA7c2i5+bQOtcDftjaE=
|
||||
golang.org/x/sys v0.0.0-20200620081246-981b61492c35/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed h1:WBkVNH1zd9jg/dK4HCM4lNANnmd12EHC9z+LmcCG4ns=
|
||||
golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
|
@ -63,6 +75,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
|
|||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A=
|
||||
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
@ -71,14 +84,14 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
|
|||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
|
||||
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
|
||||
google.golang.org/genproto v0.0.0-20200622133129-d0ee0c36e670 h1:v/N9fZIfu6jopNImrZwgx48fql5yT3k82CJvpIyGtPA=
|
||||
google.golang.org/genproto v0.0.0-20200622133129-d0ee0c36e670/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
|
||||
google.golang.org/genproto v0.0.0-20200808173500-a06252235341 h1:Kceb+1TNS2X7Cj/A+IUTljNerF/4wOFjlFJ0RGHYKKE=
|
||||
google.golang.org/genproto v0.0.0-20200808173500-a06252235341/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
|
||||
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
|
||||
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
|
||||
google.golang.org/grpc v1.29.1 h1:EC2SB8S04d2r73uptxphDSUG+kTKVgjRPF+N3xpxRB4=
|
||||
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
|
||||
google.golang.org/grpc v1.31.0 h1:T7P4R73V3SSDPhH7WW7ATbfViLtmamH0DKrP3f9AuDI=
|
||||
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
|
@ -89,8 +102,12 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
|
|||
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.24.0 h1:UhZDfRO8JRQru4/+LlLE0BRKGF8L+PICnvYZmx/fEGA=
|
||||
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
|
||||
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
|
||||
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
# Dapr Service SDK for Go
|
||||
|
||||
Start by importing Dapr go `service` package:
|
||||
|
||||
```go
|
||||
daprd "github.com/dapr/go-sdk/service/grpc"
|
||||
```
|
||||
|
||||
## Event Handling
|
||||
|
||||
To handle events from specific topic, first create a Dapr service, add topic event handler, and start the service:
|
||||
|
||||
```go
|
||||
s, err := daprd.NewService(":50001")
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start the server: %v", err)
|
||||
}
|
||||
|
||||
err = s.AddTopicEventHandler("messages", eventHandler)
|
||||
if err != nil {
|
||||
log.Fatalf("error adding topic subscription: %v", err)
|
||||
}
|
||||
|
||||
if err := s.Start(); err != nil {
|
||||
log.Fatalf("server error: %v", err)
|
||||
}
|
||||
|
||||
func eventHandler(ctx context.Context, e *daprd.TopicEvent) error {
|
||||
log.Printf("event - Topic:%s, ID:%s, Data: %v", e.Topic, e.ID, e.Data)
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
## Service Invocation Handler
|
||||
|
||||
To handle service invocations, create and start the Dapr service as in the above example. In this case though add the handler for service invocation:
|
||||
|
||||
```go
|
||||
s, err := daprd.NewService(":50001")
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start the server: %v", err)
|
||||
}
|
||||
|
||||
err = s.AddServiceInvocationHandler("echo", echoHandler)
|
||||
if err != nil {
|
||||
log.Fatalf("error adding invocation handler: %v", err)
|
||||
}
|
||||
|
||||
if err := s.Start(); err != nil {
|
||||
log.Fatalf("server error: %v", err)
|
||||
}
|
||||
|
||||
func echoHandler(ctx context.Context, in *daprd.InvocationEvent) (out *daprd.Content, err error) {
|
||||
if in == nil {
|
||||
err = errors.New("nil invocation parameter")
|
||||
return
|
||||
}
|
||||
log.Printf(
|
||||
"echo - ContentType:%s, Verb:%s, QueryString:%s, %+v",
|
||||
in.ContentType, in.Verb, in.QueryString, string(in.Data),
|
||||
)
|
||||
out = &daprd.Content{
|
||||
Data: in.Data,
|
||||
ContentType: in.ContentType,
|
||||
DataTypeURL: in.DataTypeURL,
|
||||
}
|
||||
return
|
||||
}
|
||||
```
|
||||
|
||||
## Binding Invocation Handler
|
||||
|
||||
To handle binding invocations, create and start the Dapr service as in the above examples. In this case though add the handler for binding invocation:
|
||||
|
||||
```go
|
||||
s, err := daprd.NewService(":50001")
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start the server: %v", err)
|
||||
}
|
||||
|
||||
err = s.AddBindingInvocationHandler("run", runHandler)
|
||||
if err != nil {
|
||||
log.Fatalf("error adding binding handler: %v", err)
|
||||
}
|
||||
|
||||
if err := s.Start(); err != nil {
|
||||
log.Fatalf("server error: %v", err)
|
||||
}
|
||||
|
||||
func runHandler(ctx context.Context, in *daprd.BindingEvent) (out []byte, err error) {
|
||||
log.Printf("binding - Data:%v, Meta:%v", in.Data, in.Metadata)
|
||||
return nil, nil
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## Contributing to Dapr go client
|
||||
|
||||
See the [Contribution Guide](../CONTRIBUTING.md) to get started with building and developing.
|
|
@ -0,0 +1,55 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
|
||||
)
|
||||
|
||||
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service
|
||||
func (s *Server) AddBindingInvocationHandler(name string, fn func(ctx context.Context, in *BindingEvent) (out []byte, err error)) error {
|
||||
if name == "" {
|
||||
return fmt.Errorf("binding name required")
|
||||
}
|
||||
s.bindingHandlers[name] = fn
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListInputBindings is called by Dapr to get the list of bindings the app will get invoked by. In this example, we are telling Dapr
|
||||
// To invoke our app with a binding named storage
|
||||
func (s *Server) ListInputBindings(ctx context.Context, in *empty.Empty) (*pb.ListInputBindingsResponse, error) {
|
||||
list := make([]string, 0)
|
||||
for k := range s.bindingHandlers {
|
||||
list = append(list, k)
|
||||
}
|
||||
|
||||
return &pb.ListInputBindingsResponse{
|
||||
Bindings: list,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// OnBindingEvent gets invoked every time a new event is fired from a registered binding. The message carries the binding name, a payload and optional metadata
|
||||
func (s *Server) OnBindingEvent(ctx context.Context, in *pb.BindingEventRequest) (*pb.BindingEventResponse, error) {
|
||||
if in == nil {
|
||||
return nil, errors.New("nil binding event request")
|
||||
}
|
||||
if fn, ok := s.bindingHandlers[in.Name]; ok {
|
||||
e := &BindingEvent{
|
||||
Data: in.Data,
|
||||
Metadata: in.Metadata,
|
||||
}
|
||||
data, err := fn(ctx, e)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error executing %s binding", in.Name)
|
||||
}
|
||||
return &pb.BindingEventResponse{
|
||||
Data: data,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("binding not implemented: %s", in.Name)
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/dapr/go-sdk/dapr/proto/runtime/v1"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func testBindingHandler(ctx context.Context, in *BindingEvent) (out []byte, err error) {
|
||||
if in == nil {
|
||||
return nil, errors.New("nil event")
|
||||
}
|
||||
return in.Data, nil
|
||||
}
|
||||
|
||||
// go test -timeout 30s ./service/grpc -count 1 -run ^TestBinding$
|
||||
func TestBinding(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
methodName := "test"
|
||||
|
||||
server := getTestServer()
|
||||
err := server.AddBindingInvocationHandler(methodName, testBindingHandler)
|
||||
assert.Nil(t, err)
|
||||
startTestServer(server)
|
||||
|
||||
t.Run("binding without event", func(t *testing.T) {
|
||||
_, err := server.OnBindingEvent(ctx, nil)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("binding event for wrong method", func(t *testing.T) {
|
||||
in := &runtime.BindingEventRequest{Name: "invalid"}
|
||||
_, err := server.OnBindingEvent(ctx, in)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("binding event without data", func(t *testing.T) {
|
||||
in := &runtime.BindingEventRequest{Name: methodName}
|
||||
out, err := server.OnBindingEvent(ctx, in)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, out)
|
||||
})
|
||||
|
||||
t.Run("binding event with data", func(t *testing.T) {
|
||||
data := "hello there"
|
||||
in := &runtime.BindingEventRequest{
|
||||
Name: methodName,
|
||||
Data: []byte(data),
|
||||
}
|
||||
out, err := server.OnBindingEvent(ctx, in)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, out)
|
||||
assert.Equal(t, data, string(out.Data))
|
||||
})
|
||||
|
||||
t.Run("binding event with metadata", func(t *testing.T) {
|
||||
in := &runtime.BindingEventRequest{
|
||||
Name: methodName,
|
||||
Metadata: map[string]string{"k1": "v1", "k2": "v2"},
|
||||
}
|
||||
out, err := server.OnBindingEvent(ctx, in)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, out)
|
||||
})
|
||||
|
||||
stopTestServer(t, server)
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/protobuf/ptypes/any"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
cpb "github.com/dapr/go-sdk/dapr/proto/common/v1"
|
||||
)
|
||||
|
||||
// AddServiceInvocationHandler appends provided service invocation handler with its method to the service
|
||||
func (s *Server) AddServiceInvocationHandler(method string, fn func(ctx context.Context, in *InvocationEvent) (our *Content, err error)) error {
|
||||
if method == "" {
|
||||
return fmt.Errorf("servie name required")
|
||||
}
|
||||
s.invokeHandlers[method] = fn
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnInvoke gets invoked when a remote service has called the app through Dapr
|
||||
func (s *Server) OnInvoke(ctx context.Context, in *cpb.InvokeRequest) (*cpb.InvokeResponse, error) {
|
||||
if in == nil {
|
||||
return nil, errors.New("nil invoke request")
|
||||
}
|
||||
if fn, ok := s.invokeHandlers[in.Method]; ok {
|
||||
e := &InvocationEvent{}
|
||||
if in != nil {
|
||||
e.ContentType = in.ContentType
|
||||
|
||||
if in.Data != nil {
|
||||
e.Data = in.Data.Value
|
||||
e.DataTypeURL = in.Data.TypeUrl
|
||||
}
|
||||
|
||||
if in.HttpExtension != nil {
|
||||
e.Verb = in.HttpExtension.Verb.String()
|
||||
e.QueryString = in.HttpExtension.Querystring
|
||||
}
|
||||
}
|
||||
|
||||
ct, er := fn(ctx, e)
|
||||
if er != nil {
|
||||
return nil, errors.Wrap(er, "error executing handler")
|
||||
}
|
||||
|
||||
if ct == nil {
|
||||
return &cpb.InvokeResponse{}, nil
|
||||
}
|
||||
|
||||
return &cpb.InvokeResponse{
|
||||
ContentType: ct.ContentType,
|
||||
Data: &any.Any{
|
||||
Value: ct.Data,
|
||||
TypeUrl: ct.DataTypeURL,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("method not implemented: %s", in.Method)
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/dapr/go-sdk/dapr/proto/common/v1"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
)
|
||||
|
||||
func testInvokeHandler(ctx context.Context, in *InvocationEvent) (out *Content, err error) {
|
||||
if in == nil {
|
||||
return
|
||||
}
|
||||
out = &Content{
|
||||
ContentType: in.ContentType,
|
||||
Data: in.Data,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// go test -timeout 30s ./service/grpc -count 1 -run ^TestInvoke$
|
||||
func TestInvoke(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
methodName := "test"
|
||||
ctx := context.Background()
|
||||
|
||||
server := getTestServer()
|
||||
err := server.AddServiceInvocationHandler(methodName, testInvokeHandler)
|
||||
assert.Nil(t, err)
|
||||
startTestServer(server)
|
||||
|
||||
t.Run("invoke without request", func(t *testing.T) {
|
||||
_, err := server.OnInvoke(ctx, nil)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("invoke request with invalid method name", func(t *testing.T) {
|
||||
in := &common.InvokeRequest{Method: "invalid"}
|
||||
_, err := server.OnInvoke(ctx, in)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("invoke request without data", func(t *testing.T) {
|
||||
in := &common.InvokeRequest{Method: methodName}
|
||||
_, err := server.OnInvoke(ctx, in)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("invoke request with data", func(t *testing.T) {
|
||||
data := "hello there"
|
||||
dataContentType := "text/plain"
|
||||
in := &common.InvokeRequest{Method: methodName}
|
||||
in.Data = &anypb.Any{Value: []byte(data)}
|
||||
in.ContentType = dataContentType
|
||||
out, err := server.OnInvoke(ctx, in)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, out)
|
||||
assert.Equal(t, dataContentType, out.ContentType)
|
||||
assert.Equal(t, data, string(out.Data.Value))
|
||||
})
|
||||
|
||||
stopTestServer(t, server)
|
||||
}
|
|
@ -0,0 +1,141 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// Service represents Dapr callback service
|
||||
type Service interface {
|
||||
// AddServiceInvocationHandler appends provided service invocation handler with its name to the service.
|
||||
AddServiceInvocationHandler(name string, fn func(ctx context.Context, in *InvocationEvent) (out *Content, err error)) error
|
||||
// AddTopicEventHandler appends provided event handler with it's topic and optional metadata to the service.
|
||||
AddTopicEventHandler(topic string, fn func(ctx context.Context, e *TopicEvent) error) error
|
||||
// AddTopicEventHandlerWithMetadata appends provided event handler with topic name and metadata to the service.
|
||||
AddTopicEventHandlerWithMetadata(topic string, m map[string]string, fn func(ctx context.Context, e *TopicEvent) error) error
|
||||
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
|
||||
AddBindingInvocationHandler(name string, fn func(ctx context.Context, in *BindingEvent) (out []byte, err error)) error
|
||||
// Start starts service.
|
||||
Start() error
|
||||
// Stop stops the previously started service.
|
||||
Stop() error
|
||||
}
|
||||
|
||||
// TopicEvent is the content of the inbound topic message.
|
||||
type TopicEvent struct {
|
||||
// ID identifies the event.
|
||||
ID string
|
||||
// The version of the CloudEvents specification.
|
||||
SpecVersion string
|
||||
// The type of event related to the originating occurrence.
|
||||
Type string
|
||||
// Source identifies the context in which an event happened.
|
||||
Source string
|
||||
// The content type of data value.
|
||||
DataContentType string
|
||||
// The content of the event.
|
||||
Data interface{}
|
||||
// Cloud event subject
|
||||
Subject string
|
||||
// The pubsub topic which publisher sent to.
|
||||
Topic string
|
||||
}
|
||||
|
||||
// InvocationEvent represents the input and output of binding invocation.
|
||||
type InvocationEvent struct {
|
||||
// Data is the payload that the input bindings sent.
|
||||
Data []byte
|
||||
// ContentType of the Data
|
||||
ContentType string
|
||||
// DataTypeURL is the resource URL that uniquely identifies the type of the serialized.
|
||||
DataTypeURL string
|
||||
// Verb is the HTTP verb that was used to invoke this service.
|
||||
Verb string
|
||||
// QueryString is the HTTP query string that was used to invoke this service.
|
||||
QueryString map[string]string
|
||||
}
|
||||
|
||||
// Content is a generic data content.
|
||||
type Content struct {
|
||||
// Data is the payload that the input bindings sent.
|
||||
Data []byte
|
||||
// ContentType of the Data
|
||||
ContentType string
|
||||
// DataTypeURL is the resource URL that uniquely identifies the type of the serialized.
|
||||
DataTypeURL string
|
||||
}
|
||||
|
||||
// BindingEvent represents the binding event handler input.
|
||||
type BindingEvent struct {
|
||||
// Data is the input bindings sent.
|
||||
Data []byte
|
||||
// Metadata is the input binging components.
|
||||
Metadata map[string]string
|
||||
}
|
||||
|
||||
// Subscription represents single topic subscription.
|
||||
type Subscription struct {
|
||||
// Topic is the name of the topic.
|
||||
Topic string
|
||||
// Route is the route of the handler where topic events should be published.
|
||||
Route string
|
||||
}
|
||||
|
||||
// NewService creates new Service.
|
||||
func NewService(address string) (s Service, err error) {
|
||||
if address == "" {
|
||||
return nil, errors.New("nil address")
|
||||
}
|
||||
lis, err := net.Listen("tcp", address)
|
||||
if err != nil {
|
||||
err = errors.Wrapf(err, "failed to TCP listen on: %s", address)
|
||||
return
|
||||
}
|
||||
s = newService(lis)
|
||||
return
|
||||
}
|
||||
|
||||
// NewServiceWithListener creates new Service with specific listener.
|
||||
func NewServiceWithListener(lis net.Listener) Service {
|
||||
return newService(lis)
|
||||
}
|
||||
|
||||
func newService(lis net.Listener) *Server {
|
||||
return &Server{
|
||||
listener: lis,
|
||||
invokeHandlers: make(map[string]func(ctx context.Context, in *InvocationEvent) (out *Content, err error)),
|
||||
topicSubscriptions: make(map[string]*topicEventHandler),
|
||||
bindingHandlers: make(map[string]func(ctx context.Context, in *BindingEvent) (out []byte, err error)),
|
||||
}
|
||||
}
|
||||
|
||||
// Server is the gRPC service implementation for Dapr.
|
||||
type Server struct {
|
||||
listener net.Listener
|
||||
invokeHandlers map[string]func(ctx context.Context, in *InvocationEvent) (out *Content, err error)
|
||||
topicSubscriptions map[string]*topicEventHandler
|
||||
bindingHandlers map[string]func(ctx context.Context, in *BindingEvent) (out []byte, err error)
|
||||
}
|
||||
|
||||
type topicEventHandler struct {
|
||||
topic string
|
||||
fn func(ctx context.Context, e *TopicEvent) error
|
||||
meta map[string]string
|
||||
}
|
||||
|
||||
// Start registers the server and starts it.
|
||||
func (s *Server) Start() error {
|
||||
gs := grpc.NewServer()
|
||||
pb.RegisterAppCallbackServer(gs, s)
|
||||
return gs.Serve(s.listener)
|
||||
}
|
||||
|
||||
// Stop stops the previously started service.
|
||||
func (s *Server) Stop() error {
|
||||
return s.listener.Close()
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/grpc/test/bufconn"
|
||||
)
|
||||
|
||||
func TestServer(t *testing.T) {
|
||||
t.Parallel()
|
||||
server := getTestServer()
|
||||
startTestServer(server)
|
||||
stopTestServer(t, server)
|
||||
}
|
||||
|
||||
func getTestServer() *Server {
|
||||
return newService(bufconn.Listen(1024 * 1024))
|
||||
}
|
||||
|
||||
func startTestServer(server *Server) {
|
||||
go func() {
|
||||
if err := server.Start(); err != nil && err.Error() != "closed" {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func stopTestServer(t *testing.T, server *Server) {
|
||||
assert.NotNil(t, server)
|
||||
err := server.Stop()
|
||||
assert.Nilf(t, err, "error stopping server")
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
|
||||
)
|
||||
|
||||
// AddTopicEventHandler appends provided event handler with topic name to the service
|
||||
func (s *Server) AddTopicEventHandler(topic string, fn func(ctx context.Context, e *TopicEvent) error) error {
|
||||
if topic == "" {
|
||||
return fmt.Errorf("topic name required")
|
||||
}
|
||||
s.topicSubscriptions[topic] = &topicEventHandler{
|
||||
topic: topic,
|
||||
fn: fn,
|
||||
meta: map[string]string{},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddTopicEventHandlerWithMetadata appends provided event handler with topic name and metadata to the service
|
||||
func (s *Server) AddTopicEventHandlerWithMetadata(topic string, m map[string]string, fn func(ctx context.Context, e *TopicEvent) error) error {
|
||||
if topic == "" {
|
||||
return fmt.Errorf("topic name required")
|
||||
}
|
||||
s.topicSubscriptions[topic] = &topicEventHandler{
|
||||
topic: topic,
|
||||
fn: fn,
|
||||
meta: m,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListTopicSubscriptions is called by Dapr to get the list of topics the app wants to subscribe to. In this example, we are telling Dapr
|
||||
// To subscribe to a topic named TopicA
|
||||
func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*pb.ListTopicSubscriptionsResponse, error) {
|
||||
subs := make([]*pb.TopicSubscription, 0)
|
||||
for k, v := range s.topicSubscriptions {
|
||||
sub := &pb.TopicSubscription{
|
||||
Topic: k,
|
||||
Metadata: v.meta,
|
||||
}
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
|
||||
return &pb.ListTopicSubscriptionsResponse{
|
||||
Subscriptions: subs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed. Dapr sends published messages in a CloudEvents 0.3 envelope.
|
||||
func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*pb.TopicEventResponse, error) {
|
||||
if in == nil {
|
||||
return nil, errors.New("nil event request")
|
||||
}
|
||||
if in.Topic == "" {
|
||||
return nil, errors.New("topic event request has no topic name")
|
||||
}
|
||||
if h, ok := s.topicSubscriptions[in.Topic]; ok {
|
||||
e := &TopicEvent{
|
||||
ID: in.Id,
|
||||
Source: in.Source,
|
||||
Type: in.Type,
|
||||
SpecVersion: in.SpecVersion,
|
||||
DataContentType: in.DataContentType,
|
||||
Data: in.Data,
|
||||
Topic: in.Topic,
|
||||
}
|
||||
err := h.fn(ctx, e)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error handling topic event: %s", in.Topic)
|
||||
}
|
||||
return &pb.TopicEventResponse{}, nil
|
||||
}
|
||||
return &pb.TopicEventResponse{}, fmt.Errorf("topic not configured: %s", in.Topic)
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/dapr/go-sdk/dapr/proto/runtime/v1"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func eventHandler(ctx context.Context, event *TopicEvent) error {
|
||||
if event == nil {
|
||||
return errors.New("nil event")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// go test -timeout 30s ./service/grpc -count 1 -run ^TestTopic$
|
||||
func TestTopic(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
topicName := "test"
|
||||
ctx := context.Background()
|
||||
|
||||
server := getTestServer()
|
||||
err := server.AddTopicEventHandler(topicName, eventHandler)
|
||||
assert.Nil(t, err)
|
||||
startTestServer(server)
|
||||
|
||||
t.Run("topic event without request", func(t *testing.T) {
|
||||
_, err := server.OnTopicEvent(ctx, nil)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("topic event for wrong topic", func(t *testing.T) {
|
||||
in := &runtime.TopicEventRequest{
|
||||
Topic: "invlid",
|
||||
}
|
||||
_, err := server.OnTopicEvent(ctx, in)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("topic event for valid topic", func(t *testing.T) {
|
||||
in := &runtime.TopicEventRequest{
|
||||
Id: "a123",
|
||||
Source: "test",
|
||||
Type: "test",
|
||||
SpecVersion: "v0.3",
|
||||
DataContentType: "text/plain",
|
||||
Data: []byte("test"),
|
||||
Topic: topicName,
|
||||
}
|
||||
_, err := server.OnTopicEvent(ctx, in)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
stopTestServer(t, server)
|
||||
}
|
Loading…
Reference in New Issue