go-sdk/client/client.go

531 lines
21 KiB
Go

/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/dapr/go-sdk/actor"
"github.com/dapr/go-sdk/actor/config"
"github.com/dapr/go-sdk/client/internal"
"github.com/dapr/go-sdk/version"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
// used to import codec implements.
_ "github.com/dapr/go-sdk/actor/codec/impl"
)
const (
daprPortDefault = "50001"
daprPortEnvVarName = "DAPR_GRPC_PORT" /* #nosec */
daprGRPCEndpointEnvVarName = "DAPR_GRPC_ENDPOINT"
traceparentKey = "traceparent"
apiTokenKey = "dapr-api-token" /* #nosec */
apiTokenEnvVarName = "DAPR_API_TOKEN" /* #nosec */
clientDefaultTimeoutSeconds = 5
clientTimeoutSecondsEnvVarName = "DAPR_CLIENT_TIMEOUT_SECONDS"
)
var (
logger = log.New(os.Stdout, "", 0)
lock = &sync.Mutex{}
_ Client = (*GRPCClient)(nil)
defaultClient Client
)
// SetLogger sets the global logger for the Dapr client.
// The default logger has a destination of os.Stdout, SetLogger allows you to
// optionally specify a custom logger (with a custom destination).
// To disable client logging entirely, use a nil argument e.g.: client.SetLogger(nil)
func SetLogger(l *log.Logger) {
if l == nil {
l = log.New(io.Discard, "", 0)
}
logger = l
}
// Client is the interface for Dapr client implementation.
//
//nolint:interfacebloat
type Client interface {
// InvokeBinding invokes specific operation on the configured Dapr binding.
// This method covers input, output, and bi-directional bindings.
InvokeBinding(ctx context.Context, in *InvokeBindingRequest) (out *BindingEvent, err error)
// InvokeOutputBinding invokes configured Dapr binding with data.InvokeOutputBinding
// This method differs from InvokeBinding in that it doesn't expect any content being returned from the invoked method.
InvokeOutputBinding(ctx context.Context, in *InvokeBindingRequest) error
// InvokeMethod invokes service without raw data
InvokeMethod(ctx context.Context, appID, methodName, verb string) (out []byte, err error)
// InvokeMethodWithContent invokes service with content
InvokeMethodWithContent(ctx context.Context, appID, methodName, verb string, content *DataContent) (out []byte, err error)
// InvokeMethodWithCustomContent invokes app with custom content (struct + content type).
InvokeMethodWithCustomContent(ctx context.Context, appID, methodName, verb string, contentType string, content interface{}) (out []byte, err error)
// GetMetadata returns metadata from the sidecar.
GetMetadata(ctx context.Context) (metadata *GetMetadataResponse, err error)
// SetMetadata sets a key-value pair in the sidecar.
SetMetadata(ctx context.Context, key, value string) error
// PublishEvent publishes data onto topic in specific pubsub component.
PublishEvent(ctx context.Context, pubsubName, topicName string, data interface{}, opts ...PublishEventOption) error
// PublishEventfromCustomContent serializes an struct and publishes its contents as data (JSON) onto topic in specific pubsub component.
// Deprecated: This method is deprecated and will be removed in a future version of the SDK. Please use `PublishEvent` instead.
PublishEventfromCustomContent(ctx context.Context, pubsubName, topicName string, data interface{}) error
// PublishEvents publishes multiple events onto topic in specific pubsub component.
// If all events are successfully published, response Error will be nil.
// The FailedEvents field will contain all events that failed to publish.
PublishEvents(ctx context.Context, pubsubName, topicName string, events []interface{}, opts ...PublishEventsOption) PublishEventsResponse
// GetSecret retrieves preconfigured secret from specified store using key.
GetSecret(ctx context.Context, storeName, key string, meta map[string]string) (data map[string]string, err error)
// GetBulkSecret retrieves all preconfigured secrets for this application.
GetBulkSecret(ctx context.Context, storeName string, meta map[string]string) (data map[string]map[string]string, err error)
// SaveState saves the raw data into store using default state options.
SaveState(ctx context.Context, storeName, key string, data []byte, meta map[string]string, so ...StateOption) error
// SaveState saves the raw data into store using provided state options and etag.
SaveStateWithETag(ctx context.Context, storeName, key string, data []byte, etag string, meta map[string]string, so ...StateOption) error
// SaveBulkState saves multiple state item to store with specified options.
SaveBulkState(ctx context.Context, storeName string, items ...*SetStateItem) error
// GetState retrieves state from specific store using default consistency option.
GetState(ctx context.Context, storeName, key string, meta map[string]string) (item *StateItem, err error)
// GetStateWithConsistency retrieves state from specific store using provided state consistency.
GetStateWithConsistency(ctx context.Context, storeName, key string, meta map[string]string, sc StateConsistency) (item *StateItem, err error)
// GetBulkState retrieves state for multiple keys from specific store.
GetBulkState(ctx context.Context, storeName string, keys []string, meta map[string]string, parallelism int32) ([]*BulkStateItem, error)
// QueryStateAlpha1 runs a query against state store.
QueryStateAlpha1(ctx context.Context, storeName, query string, meta map[string]string) (*QueryResponse, error)
// DeleteState deletes content from store using default state options.
DeleteState(ctx context.Context, storeName, key string, meta map[string]string) error
// DeleteStateWithETag deletes content from store using provided state options and etag.
DeleteStateWithETag(ctx context.Context, storeName, key string, etag *ETag, meta map[string]string, opts *StateOptions) error
// ExecuteStateTransaction provides way to execute multiple operations on a specified store.
ExecuteStateTransaction(ctx context.Context, storeName string, meta map[string]string, ops []*StateOperation) error
// GetConfigurationItem can get target configuration item by storeName and key
GetConfigurationItem(ctx context.Context, storeName, key string, opts ...ConfigurationOpt) (*ConfigurationItem, error)
// GetConfigurationItems can get a list of configuration item by storeName and keys
GetConfigurationItems(ctx context.Context, storeName string, keys []string, opts ...ConfigurationOpt) (map[string]*ConfigurationItem, error)
// SubscribeConfigurationItems can subscribe the change of configuration items by storeName and keys, and return subscription id
SubscribeConfigurationItems(ctx context.Context, storeName string, keys []string, handler ConfigurationHandleFunction, opts ...ConfigurationOpt) (string, error)
// UnsubscribeConfigurationItems stops the subscription with target store's and ID.
// Deprecated: Closing the `SubscribeConfigurationItems` stream (closing the given context) will unsubscribe the client and should be used in favor of `UnsubscribeConfigurationItems`.
// UnsubscribeConfigurationItems can stop the subscription with target store's and id
UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error
// Subscribe subscribes to a pubsub topic and streams messages to the returned Subscription.
// Subscription must be closed after finishing with subscribing.
Subscribe(ctx context.Context, opts SubscriptionOptions) (*Subscription, error)
// SubscribeWithHandler subscribes to a pubsub topic and calls the given handler on topic events.
// The returned cancel function must be called after finishing with subscribing.
SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error)
// DeleteBulkState deletes content for multiple keys from store.
DeleteBulkState(ctx context.Context, storeName string, keys []string, meta map[string]string) error
// DeleteBulkStateItems deletes content for multiple items from store.
DeleteBulkStateItems(ctx context.Context, storeName string, items []*DeleteStateItem) error
// TryLockAlpha1 attempts to grab a lock from a lock store.
TryLockAlpha1(ctx context.Context, storeName string, request *LockRequest) (*LockResponse, error)
// UnlockAlpha1 deletes unlocks a lock from a lock store.
UnlockAlpha1(ctx context.Context, storeName string, request *UnlockRequest) (*UnlockResponse, error)
// Encrypt data read from a stream, returning a readable stream that receives the encrypted data.
// This method returns an error if the initial call fails. Errors performed during the encryption are received by the out stream.
Encrypt(ctx context.Context, in io.Reader, opts EncryptOptions) (io.Reader, error)
// Decrypt data read from a stream, returning a readable stream that receives the decrypted data.
// This method returns an error if the initial call fails. Errors performed during the encryption are received by the out stream.
Decrypt(ctx context.Context, in io.Reader, opts DecryptOptions) (io.Reader, error)
// Shutdown the sidecar.
Shutdown(ctx context.Context) error
// Wait for a sidecar to become available for at most `timeout` seconds. Returns errWaitTimedOut if timeout is reached.
Wait(ctx context.Context, timeout time.Duration) error
// WithTraceID adds existing trace ID to the outgoing context.
WithTraceID(ctx context.Context, id string) context.Context
// WithAuthToken sets Dapr API token on the instantiated client.
WithAuthToken(token string)
// Close cleans up all resources created by the client.
Close()
// RegisterActorTimer registers an actor timer.
RegisterActorTimer(ctx context.Context, req *RegisterActorTimerRequest) error
// UnregisterActorTimer unregisters an actor timer.
UnregisterActorTimer(ctx context.Context, req *UnregisterActorTimerRequest) error
// RegisterActorReminder registers an actor reminder.
RegisterActorReminder(ctx context.Context, req *RegisterActorReminderRequest) error
// UnregisterActorReminder unregisters an actor reminder.
UnregisterActorReminder(ctx context.Context, req *UnregisterActorReminderRequest) error
// InvokeActor calls a method on an actor.
InvokeActor(ctx context.Context, req *InvokeActorRequest) (*InvokeActorResponse, error)
// GetActorState get actor state
GetActorState(ctx context.Context, req *GetActorStateRequest) (data *GetActorStateResponse, err error)
// SaveStateTransactionally save actor state
SaveStateTransactionally(ctx context.Context, actorType, actorID string, operations []*ActorStateOperation) error
// ImplActorClientStub is to impl user defined actor client stub
ImplActorClientStub(actorClientStub actor.Client, opt ...config.Option)
// StartWorkflowBeta1 starts a workflow.
// Deprecated: Please use the workflow client (github.com/dapr/go-sdk/workflow).
// These methods for managing workflows are no longer supported and will be removed in the 1.16 release.
StartWorkflowBeta1(ctx context.Context, req *StartWorkflowRequest) (*StartWorkflowResponse, error)
// GetWorkflowBeta1 gets a workflow.
// Deprecated: Please use the workflow client (github.com/dapr/go-sdk/workflow).
// These methods for managing workflows are no longer supported and will be removed in the 1.16 release.
GetWorkflowBeta1(ctx context.Context, req *GetWorkflowRequest) (*GetWorkflowResponse, error)
// PurgeWorkflowBeta1 purges a workflow.
// Deprecated: Please use the workflow client (github.com/dapr/go-sdk/workflow).
// These methods for managing workflows are no longer supported and will be removed in the 1.16 release.
PurgeWorkflowBeta1(ctx context.Context, req *PurgeWorkflowRequest) error
// TerminateWorkflowBeta1 terminates a workflow.
// Deprecated: Please use the workflow client (github.com/dapr/go-sdk/workflow).
// These methods for managing workflows are no longer supported and will be removed in the 1.16 release.
TerminateWorkflowBeta1(ctx context.Context, req *TerminateWorkflowRequest) error
// PauseWorkflowBeta1 pauses a workflow.
// Deprecated: Please use the workflow client (github.com/dapr/go-sdk/workflow).
// These methods for managing workflows are no longer supported and will be removed in the 1.16 release.
PauseWorkflowBeta1(ctx context.Context, req *PauseWorkflowRequest) error
// ResumeWorkflowBeta1 resumes a workflow.
// Deprecated: Please use the workflow client (github.com/dapr/go-sdk/workflow).
// These methods for managing workflows are no longer supported and will be removed in the 1.16 release.
ResumeWorkflowBeta1(ctx context.Context, req *ResumeWorkflowRequest) error
// RaiseEventWorkflowBeta1 raises an event for a workflow.
// Deprecated: Please use the workflow client (github.com/dapr/go-sdk/workflow).
// These methods for managing workflows are no longer supported and will be removed in the 1.16 release.
RaiseEventWorkflowBeta1(ctx context.Context, req *RaiseEventWorkflowRequest) error
// ScheduleJobAlpha1 creates and schedules a job.
ScheduleJobAlpha1(ctx context.Context, req *Job) error
// GetJobAlpha1 returns a scheduled job.
GetJobAlpha1(ctx context.Context, name string) (*Job, error)
// DeleteJobAlpha1 deletes a scheduled job.
DeleteJobAlpha1(ctx context.Context, name string) error
// ConverseAlpha1 interacts with a conversational AI model.
ConverseAlpha1(ctx context.Context, request conversationRequest, options ...conversationRequestOption) (*ConversationResponse, error)
// GrpcClient returns the base grpc client if grpc is used and nil otherwise
GrpcClient() pb.DaprClient
GrpcClientConn() *grpc.ClientConn
}
// NewClient instantiates Dapr client using DAPR_GRPC_PORT environment variable as port.
// Note, this default factory function creates Dapr client only once. All subsequent invocations
// will return the already created instance. To create multiple instances of the Dapr client,
// use one of the parameterized factory functions:
//
// NewClientWithPort(port string) (client Client, err error)
// NewClientWithAddress(address string) (client Client, err error)
// NewClientWithConnection(conn *grpc.ClientConn) Client
// NewClientWithSocket(socket string) (client Client, err error)
func NewClient() (client Client, err error) {
lock.Lock()
defer lock.Unlock()
if defaultClient != nil {
return defaultClient, nil
}
addr, ok := os.LookupEnv(daprGRPCEndpointEnvVarName)
if ok {
client, err = NewClientWithAddress(addr)
if err != nil {
return nil, fmt.Errorf("error creating %q client: %w", daprGRPCEndpointEnvVarName, err)
}
defaultClient = client
return defaultClient, nil
}
port, ok := os.LookupEnv(daprPortEnvVarName)
if !ok {
port = daprPortDefault
}
c, err := NewClientWithPort(port)
if err != nil {
return nil, fmt.Errorf("error creating default client: %w", err)
}
defaultClient = c
return defaultClient, nil
}
// NewClientWithPort instantiates Dapr using specific gRPC port.
func NewClientWithPort(port string) (client Client, err error) {
if port == "" {
return nil, errors.New("nil port")
}
return NewClientWithAddress(net.JoinHostPort("127.0.0.1", port))
}
// NewClientWithAddress instantiates Dapr using specific address (including port).
// Deprecated: use NewClientWithAddressContext instead.
func NewClientWithAddress(address string) (client Client, err error) {
return NewClientWithAddressContext(context.Background(), address)
}
// NewClientWithAddressContext instantiates Dapr using specific address (including port).
// Uses the provided context to create the connection.
func NewClientWithAddressContext(ctx context.Context, address string) (client Client, err error) {
if address == "" {
return nil, errors.New("empty address")
}
logger.Printf("dapr client initializing for: %s", address)
timeoutSeconds, err := getClientTimeoutSeconds()
if err != nil {
return nil, err
}
parsedAddress, err := internal.ParseGRPCEndpoint(address)
if err != nil {
return nil, fmt.Errorf("error parsing address '%s': %w", address, err)
}
at := &authToken{}
opts := []grpc.DialOption{
grpc.WithUserAgent(userAgent()),
grpc.WithBlock(), //nolint:staticcheck
authTokenUnaryInterceptor(at),
authTokenStreamInterceptor(at),
}
if parsedAddress.TLS {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(new(tls.Config))))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSeconds)*time.Second)
conn, err := grpc.DialContext( //nolint:staticcheck
ctx,
parsedAddress.Target,
opts...,
)
cancel()
if err != nil {
return nil, fmt.Errorf("error creating connection to '%s': %w", address, err)
}
return newClientWithConnection(conn, at), nil
}
func getClientTimeoutSeconds() (int, error) {
timeoutStr := os.Getenv(clientTimeoutSecondsEnvVarName)
if len(timeoutStr) == 0 {
return clientDefaultTimeoutSeconds, nil
}
timeoutVar, err := strconv.Atoi(timeoutStr)
if err != nil {
return 0, err
}
if timeoutVar <= 0 {
return 0, errors.New("incorrect value")
}
return timeoutVar, nil
}
// NewClientWithSocket instantiates Dapr using specific socket.
func NewClientWithSocket(socket string) (client Client, err error) {
if socket == "" {
return nil, errors.New("nil socket")
}
at := &authToken{}
logger.Printf("dapr client initializing for: %s", socket)
addr := "unix://" + socket
conn, err := grpc.Dial( //nolint:staticcheck
addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUserAgent(userAgent()),
authTokenUnaryInterceptor(at),
authTokenStreamInterceptor(at),
)
if err != nil {
return nil, fmt.Errorf("error creating connection to '%s': %w", addr, err)
}
return newClientWithConnection(conn, at), nil
}
func newClientWithConnection(conn *grpc.ClientConn, authToken *authToken) Client {
apiToken := os.Getenv(apiTokenEnvVarName)
if apiToken != "" {
logger.Println("client uses API token")
authToken.set(apiToken)
}
return &GRPCClient{
connection: conn,
protoClient: pb.NewDaprClient(conn),
authToken: authToken,
}
}
// NewClientWithConnection instantiates Dapr client using specific connection.
func NewClientWithConnection(conn *grpc.ClientConn) Client {
return newClientWithConnection(conn, &authToken{})
}
type authToken struct {
mu sync.RWMutex
authToken string
}
func (a *authToken) get() string {
a.mu.RLock()
defer a.mu.RUnlock()
return a.authToken
}
func (a *authToken) set(token string) {
a.mu.Lock()
defer a.mu.Unlock()
a.authToken = token
}
func authTokenUnaryInterceptor(authToken *authToken) grpc.DialOption {
return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
token := authToken.get()
if token != "" {
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(apiTokenKey, token))
}
return invoker(ctx, method, req, reply, cc, opts...)
})
}
func authTokenStreamInterceptor(authToken *authToken) grpc.DialOption {
return grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
token := authToken.get()
if token != "" {
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(apiTokenKey, token))
}
return streamer(ctx, desc, cc, method, opts...)
})
}
// GRPCClient is the gRPC implementation of Dapr client.
type GRPCClient struct {
connection *grpc.ClientConn
protoClient pb.DaprClient
authToken *authToken
}
// Close cleans up all resources created by the client.
func (c *GRPCClient) Close() {
if c.connection != nil {
c.connection.Close()
c.connection = nil
}
}
// WithAuthToken sets Dapr API token on the instantiated client.
// Allows empty string to reset token on existing client.
func (c *GRPCClient) WithAuthToken(token string) {
c.authToken.set(token)
}
// WithTraceID adds existing trace ID to the outgoing context.
func (c *GRPCClient) WithTraceID(ctx context.Context, id string) context.Context {
if id == "" {
return ctx
}
logger.Printf("using trace parent ID: %s", id)
md := metadata.Pairs(traceparentKey, id)
return metadata.NewOutgoingContext(ctx, md)
}
// Shutdown the sidecar.
func (c *GRPCClient) Shutdown(ctx context.Context) error {
_, err := c.protoClient.Shutdown(ctx, &pb.ShutdownRequest{})
if err != nil {
return fmt.Errorf("error shutting down the sidecar: %w", err)
}
return nil
}
// GrpcClient returns the base grpc client.
func (c *GRPCClient) GrpcClient() pb.DaprClient {
return c.protoClient
}
// GrpcClientConn returns the grpc.ClientConn object used by this client.
func (c *GRPCClient) GrpcClientConn() *grpc.ClientConn {
return c.connection
}
func userAgent() string {
return "dapr-sdk-go/" + strings.TrimSpace(version.SDKVersion)
}