package client import ( "context" "log" "net" "os" "sync" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/metadata" pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1" ) const ( daprPortDefault = "50001" daprPortEnvVarName = "DAPR_GRPC_PORT" traceparentKey = "traceparent" apiTokenKey = "dapr-api-token" apiTokenEnvVarName = "DAPR_API_TOKEN" ) var ( logger = log.New(os.Stdout, "", 0) _ Client = (*GRPCClient)(nil) defaultClient Client doOnce sync.Once ) // Client is the interface for Dapr client implementation. type Client interface { // InvokeBinding invokes specific operation on the configured Dapr binding. // This method covers input, output, and bi-directional bindings. InvokeBinding(ctx context.Context, in *InvokeBindingRequest) (out *BindingEvent, err error) // InvokeOutputBinding invokes configured Dapr binding with data.InvokeOutputBinding // This method differs from InvokeBinding in that it doesn't expect any content being returned from the invoked method. InvokeOutputBinding(ctx context.Context, in *InvokeBindingRequest) error // InvokeMethod invokes service without raw data InvokeMethod(ctx context.Context, appID, methodName, verb string) (out []byte, err error) // InvokeMethodWithContent invokes service with content InvokeMethodWithContent(ctx context.Context, appID, methodName, verb string, content *DataContent) (out []byte, err error) // InvokeMethodWithCustomContent invokes app with custom content (struct + content type). InvokeMethodWithCustomContent(ctx context.Context, appID, methodName, verb string, contentType string, content interface{}) (out []byte, err error) // PublishEvent publishes data onto topic in specific pubsub component. PublishEvent(ctx context.Context, pubsubName, topicName string, data []byte) error // PublishEventfromCustomContent serializes an struct and publishes its contents as data (JSON) onto topic in specific pubsub component. PublishEventfromCustomContent(ctx context.Context, pubsubName, topicName string, data interface{}) error // GetSecret retrieves preconfigured secret from specified store using key. GetSecret(ctx context.Context, storeName, key string, meta map[string]string) (data map[string]string, err error) // GetBulkSecret retrieves all preconfigured secrets for this application. GetBulkSecret(ctx context.Context, storeName string, meta map[string]string) (data map[string]string, err error) // SaveState saves the raw data into store using default state options. SaveState(ctx context.Context, storeName, key string, data []byte) error // SaveBulkState saves multiple state item to store with specified options. SaveBulkState(ctx context.Context, storeName string, items ...*SetStateItem) error // GetState retrieves state from specific store using default consistency option. GetState(ctx context.Context, storeName, key string) (item *StateItem, err error) // GetStateWithConsistency retrieves state from specific store using provided state consistency. GetStateWithConsistency(ctx context.Context, storeName, key string, meta map[string]string, sc StateConsistency) (item *StateItem, err error) // GetBulkState retrieves state for multiple keys from specific store. GetBulkState(ctx context.Context, storeName string, keys []string, meta map[string]string, parallelism int32) ([]*BulkStateItem, error) // DeleteState deletes content from store using default state options. DeleteState(ctx context.Context, storeName, key string) error // DeleteStateWithETag deletes content from store using provided state options and etag. DeleteStateWithETag(ctx context.Context, storeName, key, etag string, meta map[string]string, opts *StateOptions) error // ExecuteStateTransaction provides way to execute multiple operations on a specified store. ExecuteStateTransaction(ctx context.Context, storeName string, meta map[string]string, ops []*StateOperation) error // WithTraceID adds existing trace ID to the outgoing context. WithTraceID(ctx context.Context, id string) context.Context // WithAuthToken sets Dapr API token on the instantiated client. WithAuthToken(token string) // Close cleans up all resources created by the client. Close() } // NewClient instantiates Dapr client using DAPR_GRPC_PORT environment variable as port. // Note, this default factory function creates Dapr client only once. All subsequent invocations // will return the already created instance. To create multiple instances of the Dapr client, // use one of the parameterized factory functions: // NewClientWithPort(port string) (client Client, err error) // NewClientWithAddress(address string) (client Client, err error) // NewClientWithConnection(conn *grpc.ClientConn) Client func NewClient() (client Client, err error) { port := os.Getenv(daprPortEnvVarName) if port == "" { port = daprPortDefault } var onceErr error doOnce.Do(func() { c, err := NewClientWithPort(port) onceErr = errors.Wrap(err, "error creating default client") defaultClient = c }) return defaultClient, onceErr } // NewClientWithPort instantiates Dapr using specific port. func NewClientWithPort(port string) (client Client, err error) { if port == "" { return nil, errors.New("nil port") } return NewClientWithAddress(net.JoinHostPort("127.0.0.1", port)) } // NewClientWithAddress instantiates Dapr using specific address (including port). func NewClientWithAddress(address string) (client Client, err error) { if address == "" { return nil, errors.New("nil address") } logger.Printf("dapr client initializing for: %s", address) conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { return nil, errors.Wrapf(err, "error creating connection to '%s': %v", address, err) } if hasToken := os.Getenv(apiTokenEnvVarName); hasToken != "" { logger.Println("client uses API token") } return NewClientWithConnection(conn), nil } // NewClientWithConnection instantiates Dapr client using specific connection. func NewClientWithConnection(conn *grpc.ClientConn) Client { return &GRPCClient{ connection: conn, protoClient: pb.NewDaprClient(conn), authToken: os.Getenv(apiTokenEnvVarName), } } // GRPCClient is the gRPC implementation of Dapr client. type GRPCClient struct { connection *grpc.ClientConn protoClient pb.DaprClient authToken string mux sync.Mutex } // Close cleans up all resources created by the client. func (c *GRPCClient) Close() { if c.connection != nil { c.connection.Close() } } // WithAuthToken sets Dapr API token on the instantiated client. // Allows empty string to reset token on existing client func (c *GRPCClient) WithAuthToken(token string) { c.mux.Lock() c.authToken = token c.mux.Unlock() } // WithTraceID adds existing trace ID to the outgoing context func (c *GRPCClient) WithTraceID(ctx context.Context, id string) context.Context { if id == "" { return ctx } logger.Printf("using trace parent ID: %s", id) md := metadata.Pairs(traceparentKey, id) return metadata.NewOutgoingContext(ctx, md) } func (c *GRPCClient) withAuthToken(ctx context.Context) context.Context { if c.authToken == "" { return ctx } return metadata.NewOutgoingContext(ctx, metadata.Pairs(apiTokenKey, string(c.authToken))) }