addresses previous PR feedback

This commit is contained in:
Mark Chmarny 2020-06-13 05:17:46 -07:00
parent 886ceb135e
commit 81ea43e1dd
7 changed files with 234 additions and 81 deletions

View File

@ -9,31 +9,47 @@ import (
)
// InvokeBinding invokes specific operation on the configured Dapr binding
func (c *Client) InvokeBinding(ctx context.Context, name, op string, in []byte, meta map[string]string) error {
func (c *Client) InvokeBinding(ctx context.Context, name, op string, in []byte, inm map[string]string) (out []byte, outm map[string]string, err error) {
if name == "" {
return errors.New("nil topic")
return nil, nil, errors.New("nil topic")
}
envelop := &pb.InvokeBindingRequest{
req := &pb.InvokeBindingRequest{
Name: name,
Operation: op,
Data: in,
Metadata: meta,
Metadata: inm,
}
_, err := c.protoClient.InvokeBinding(authContext(ctx), envelop)
resp, err := c.protoClient.InvokeBinding(authContext(ctx), req)
if err != nil {
return errors.Wrapf(err, "error invoking binding %s", name)
return nil, nil, errors.Wrapf(err, "error invoking binding %s", name)
}
return nil
if resp != nil {
return resp.Data, resp.Metadata, nil
}
return nil, nil, nil
}
// InvokeBindingJSON invokes configured Dapr binding with an instance
func (c *Client) InvokeBindingJSON(ctx context.Context, name, operation string, in interface{}) error {
func (c *Client) InvokeBindingJSON(ctx context.Context, name, operation string, in interface{}) (out []byte, outm map[string]string, err error) {
if in == nil {
return nil, nil, errors.New("nil in")
}
b, err := json.Marshal(in)
if err != nil {
return errors.Wrap(err, "error marshaling content")
return nil, nil, errors.Wrap(err, "error marshaling content")
}
return c.InvokeBinding(ctx, name, operation, b, nil)
}
// InvokeOutputBinding invokes configured Dapr binding with data (allows nil)
func (c *Client) InvokeOutputBinding(ctx context.Context, name, operation string, data []byte) error {
_, _, err := c.InvokeBinding(ctx, name, operation, data, nil)
if err != nil {
return errors.Wrap(err, "error invoking output binding")
}
return nil
}

View File

@ -14,7 +14,7 @@ import (
)
const (
daprPortDefault = "4000"
daprPortDefault = "50001"
daprPortEnvVarName = "DAPR_GRPC_PORT"
)
@ -23,7 +23,6 @@ var (
)
// NewClient instantiates dapr client locally using port from DAPR_GRPC_PORT env var
// When DAPR_GRPC_PORT client defaults to 4000
func NewClient() (client *Client, err error) {
port := os.Getenv(daprPortEnvVarName)
if port == "" {
@ -34,8 +33,7 @@ func NewClient() (client *Client, err error) {
// NewClientWithPort instantiates dapr client locally for the specific port
func NewClientWithPort(port string) (client *Client, err error) {
address := net.JoinHostPort("127.0.0.1", port)
return NewClientWithAddress(address)
return NewClientWithAddress(net.JoinHostPort("127.0.0.1", port))
}
// NewClientWithAddress instantiates dapr client configured for the specific address

View File

@ -6,39 +6,90 @@ import (
v1 "github.com/dapr/go-sdk/dapr/proto/common/v1"
pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
"github.com/golang/protobuf/ptypes/any"
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/pkg/errors"
)
// InvokeService represents the request message for Service invocation
func (c *Client) InvokeService(ctx context.Context, serviceID, method string, in []byte) (out []byte, err error) {
// InvokeServiceWithRequest invokes service with input request
func (c *Client) InvokeServiceWithRequest(ctx context.Context, req *pb.InvokeServiceRequest) (out []byte, err error) {
if req == nil {
return nil, errors.New("nil request")
}
resp, err := c.protoClient.InvokeService(authContext(ctx), req)
if err != nil {
return nil, errors.Wrap(err, "error invoking service")
}
// allow for service to not return any value
if resp != nil && resp.GetData() != nil {
out = resp.GetData().Value
return
}
out = nil
return
}
// InvokeService invokes service without data
func (c *Client) InvokeService(ctx context.Context, serviceID, method string) (out []byte, err error) {
if serviceID == "" {
return nil, errors.New("nil serviceID")
}
if method == "" {
return nil, errors.New("nil method")
}
resp, err := c.protoClient.InvokeService(authContext(ctx), &pb.InvokeServiceRequest{
req := &pb.InvokeServiceRequest{
Id: serviceID,
Message: &v1.InvokeRequest{
Method: method,
Data: &any.Any{Value: in},
},
})
if err != nil {
return nil, errors.Wrapf(err, "error invoking service (%s)", serviceID)
}
return c.InvokeServiceWithRequest(ctx, req)
}
// InvokeServiceWithContent invokes service without content
func (c *Client) InvokeServiceWithContent(ctx context.Context, serviceID, method, contentTpe string, data []byte) (out []byte, err error) {
if serviceID == "" {
return nil, errors.New("nil serviceID")
}
if method == "" {
return nil, errors.New("nil method")
}
if contentTpe == "" {
return nil, errors.New("nil contentTpe")
}
out = resp.GetData().Value
return
req := &pb.InvokeServiceRequest{
Id: serviceID,
Message: &v1.InvokeRequest{
Method: method,
Data: &anypb.Any{Value: data},
ContentType: contentTpe,
},
}
return c.InvokeServiceWithRequest(ctx, req)
}
// InvokeServiceJSON represents the request message for Service invocation with identity parameter
func (c *Client) InvokeServiceJSON(ctx context.Context, serviceID, method string, in interface{}) (out []byte, err error) {
if in == nil {
return c.InvokeService(ctx, serviceID, method)
}
b, err := json.Marshal(in)
if err != nil {
return nil, errors.Wrap(err, "error marshaling content")
return nil, errors.Wrap(err, "error marshaling in parameter")
}
return c.InvokeService(ctx, serviceID, method, b)
req := &pb.InvokeServiceRequest{
Id: serviceID,
Message: &v1.InvokeRequest{
Method: method,
Data: &anypb.Any{Value: b},
ContentType: "application/json",
},
}
return c.InvokeServiceWithRequest(ctx, req)
}

View File

@ -13,6 +13,9 @@ func (c *Client) PublishEvent(ctx context.Context, topic string, in []byte) erro
if topic == "" {
return errors.New("nil topic")
}
if in == nil {
return errors.New("nil in")
}
envelop := &pb.PublishEventRequest{
Topic: topic,
@ -29,6 +32,9 @@ func (c *Client) PublishEvent(ctx context.Context, topic string, in []byte) erro
// PublishEventJSON is the message to publish event data to pubsub topic with identity
func (c *Client) PublishEventJSON(ctx context.Context, topic string, in interface{}) error {
if in == nil {
return errors.New("nil in")
}
b, err := json.Marshal(in)
if err != nil {
return errors.Wrap(err, "error marshaling content")

View File

@ -9,8 +9,61 @@ import (
"github.com/pkg/errors"
)
// SaveState is the message to save multiple states into state store
func (c *Client) SaveState(ctx context.Context, store, key string, in []byte) error {
var (
// StateOptionConsistencyDefault is strong
StateOptionConsistencyDefault = v1.StateOptions_CONSISTENCY_STRONG
// StateOptionConcurrencyDefault is last write
StateOptionConcurrencyDefault = v1.StateOptions_CONCURRENCY_LAST_WRITE
// StateOptionRetryPolicyDefault is threshold 3
StateOptionRetryPolicyDefault = &v1.StateRetryPolicy{
Threshold: 3,
}
// StateOptionDefault is the optimistic state option (last write concurency and strong consistency)
StateOptionDefault = &v1.StateOptions{
Concurrency: StateOptionConcurrencyDefault,
Consistency: StateOptionConsistencyDefault,
RetryPolicy: StateOptionRetryPolicyDefault,
}
)
// *** Save State ***
// SaveState saves the fully loaded save state request
func (c *Client) SaveState(ctx context.Context, req *pb.SaveStateRequest) error {
if req == nil {
return errors.New("nil request")
}
_, err := c.protoClient.SaveState(authContext(ctx), req)
if err != nil {
return errors.Wrap(err, "error saving state")
}
return nil
}
// SaveStateItem saves a single state item
func (c *Client) SaveStateItem(ctx context.Context, store string, item *v1.StateItem) error {
if store == "" {
return errors.New("nil store")
}
if item == nil {
return errors.New("nil item")
}
req := &pb.SaveStateRequest{
StoreName: store,
States: []*v1.StateItem{item},
}
return c.SaveState(ctx, req)
}
// SaveStateWithData saves the data into store using default state options
func (c *Client) SaveStateWithData(ctx context.Context, store, key string, data []byte) error {
if store == "" {
return errors.New("nil store")
}
@ -18,34 +71,49 @@ func (c *Client) SaveState(ctx context.Context, store, key string, in []byte) er
return errors.New("nil key")
}
envelop := &pb.SaveStateRequest{
req := &pb.SaveStateRequest{
StoreName: store,
States: []*v1.StateItem{
{
Key: key,
Value: in,
Key: key,
Value: data,
Options: StateOptionDefault,
},
},
}
_, err := c.protoClient.SaveState(authContext(ctx), envelop)
if err != nil {
return errors.Wrapf(err, "error saving state into %s", store)
}
return nil
return c.SaveState(ctx, req)
}
// SaveStateJSON is the message to save multiple states into state store with identity
// SaveStateJSON saves the JSON serialized in into store using default state options
func (c *Client) SaveStateJSON(ctx context.Context, store, key string, in interface{}) error {
if in == nil {
return errors.New("nil data to save")
}
b, err := json.Marshal(in)
if err != nil {
return errors.Wrap(err, "error marshaling content")
}
return c.SaveState(authContext(ctx), store, key, b)
return c.SaveStateWithData(ctx, store, key, b)
}
// GetState is the message to get key-value states from specific state store
// *** Get State ***
// GetStateWithRequest retreaves state from specific store using provided request
func (c *Client) GetStateWithRequest(ctx context.Context, req *pb.GetStateRequest) (out []byte, err error) {
if req == nil {
return nil, errors.New("nil request")
}
result, err := c.protoClient.GetState(authContext(ctx), req)
if err != nil {
return nil, errors.Wrap(err, "error getting state")
}
return result.Data, nil
}
// GetState retreaves state from specific store using default consistency option
func (c *Client) GetState(ctx context.Context, store, key string) (out []byte, err error) {
if store == "" {
return nil, errors.New("nil store")
@ -53,20 +121,32 @@ func (c *Client) GetState(ctx context.Context, store, key string) (out []byte, e
if key == "" {
return nil, errors.New("nil key")
}
envelop := &pb.GetStateRequest{
StoreName: store,
Key: key,
req := &pb.GetStateRequest{
StoreName: store,
Key: key,
Consistency: StateOptionConsistencyDefault,
}
result, err := c.protoClient.GetState(authContext(ctx), envelop)
if err != nil {
return nil, errors.Wrapf(err, "error getting state from %s for %s key", store, key)
}
return result.Data, nil
return c.GetStateWithRequest(ctx, req)
}
// DeleteState is the message to delete key-value states from specific state store
// *** Delete State ***
// DeleteStateWithRequest deletes content from store using provided request
func (c *Client) DeleteStateWithRequest(ctx context.Context, req *pb.DeleteStateRequest) error {
if req == nil {
return errors.New("nil request")
}
_, err := c.protoClient.DeleteState(authContext(ctx), req)
if err != nil {
return errors.Wrap(err, "error deleting state")
}
return nil
}
// DeleteState deletes content from store using default state options
func (c *Client) DeleteState(ctx context.Context, store, key string) error {
if store == "" {
return errors.New("nil store")
@ -74,15 +154,11 @@ func (c *Client) DeleteState(ctx context.Context, store, key string) error {
if key == "" {
return errors.New("nil key")
}
envelop := &pb.DeleteStateRequest{
req := &pb.DeleteStateRequest{
StoreName: store,
Key: key,
Options: StateOptionDefault,
}
_, err := c.protoClient.DeleteState(authContext(ctx), envelop)
if err != nil {
return errors.Wrapf(err, "error deleting state from %s for %s key", store, key)
}
return nil
return c.DeleteStateWithRequest(ctx, req)
}

View File

@ -2,11 +2,16 @@ package main
import (
"context"
"fmt"
"log"
"os"
dapr "github.com/dapr/go-sdk/client"
)
var (
logger = log.New(os.Stdout, "", 0)
)
func main() {
// just for this demo
ctx := context.Background()
@ -15,48 +20,49 @@ func main() {
// create the client
client, err := dapr.NewClient()
if err != nil {
panic(err)
logger.Panic(err)
}
defer client.Close(ctx)
// publish a message to the topic messagebus
err = client.PublishEvent(ctx, "messagebus", data)
if err != nil {
panic(err)
logger.Panic(err)
}
fmt.Println("data published")
logger.Println("data published")
// save state with the key key1
err = client.SaveState(ctx, "statestore", "key1", data)
err = client.SaveStateWithData(ctx, "statestore", "key1", data)
if err != nil {
panic(err)
logger.Panic(err)
}
fmt.Println("data saved")
logger.Println("data saved")
// get state for key key1
dataOut, err := client.GetState(ctx, "statestore", "key1")
if err != nil {
panic(err)
logger.Panic(err)
}
fmt.Println(string(dataOut))
logger.Printf("data out: %s", string(dataOut))
// delete state for key key1
err = client.DeleteState(ctx, "statestore", "key1")
if err != nil {
panic(err)
logger.Panic(err)
}
fmt.Println("data deleted")
logger.Println("data deleted")
// invoke a method called MyMethod on another dapr enabled service with id client
resp, err := client.InvokeService(ctx, "serving", "MyMethod", data)
// invoke a method called MyMethod on another dapr enabled service
resp, err := client.InvokeServiceWithContent(ctx, "serving", "MyMethod",
"text/plain; charset=UTF-8", data)
if err != nil {
logger.Panic(err)
}
logger.Printf("service method invoked, response: %s", string(resp))
err = client.InvokeOutputBinding(ctx, "example-http-binding", "create", nil)
if err != nil {
panic(err)
}
fmt.Println(string(resp))
err = client.InvokeBinding(ctx, "example-http-binding", "create", data, nil)
if err != nil {
panic(err)
}
fmt.Println("binding invoked")
logger.Println("binding invoked")
}

View File

@ -37,8 +37,8 @@ func main() {
}
}
// Sample method to invoke
func (s *server) MyMethod() string {
// EchoMethod is a simple demo method to invoke
func (s *server) EchoMethod() string {
return "pong"
}
@ -48,8 +48,8 @@ func (s *server) OnInvoke(ctx context.Context, in *commonv1pb.InvokeRequest) (*c
var response string
switch in.Method {
case "MyMethod":
response = s.MyMethod()
case "EchoMethod":
response = s.EchoMethod()
}
return &commonv1pb.InvokeResponse{