go-sdk/client/configuration.go

127 lines
3.3 KiB
Go

package client
import (
"context"
"errors"
"fmt"
"io"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
)
type ConfigurationItem struct {
Value string
Version string
Metadata map[string]string
}
type ConfigurationOpt func(map[string]string)
func WithConfigurationMetadata(key, value string) ConfigurationOpt {
return func(m map[string]string) {
m[key] = value
}
}
func (c *GRPCClient) GetConfigurationItem(ctx context.Context, storeName, key string, opts ...ConfigurationOpt) (*ConfigurationItem, error) {
items, err := c.GetConfigurationItems(ctx, storeName, []string{key}, opts...)
if err != nil {
return nil, err
}
if len(items) == 0 {
return nil, nil
}
return items[key], nil
}
func (c *GRPCClient) GetConfigurationItems(ctx context.Context, storeName string, keys []string, opts ...ConfigurationOpt) (map[string]*ConfigurationItem, error) {
metadata := make(map[string]string)
for _, opt := range opts {
opt(metadata)
}
rsp, err := c.protoClient.GetConfiguration(ctx, &pb.GetConfigurationRequest{
StoreName: storeName,
Keys: keys,
Metadata: metadata,
})
if err != nil {
return nil, err
}
configItems := make(map[string]*ConfigurationItem)
for k, v := range rsp.GetItems() {
configItems[k] = &ConfigurationItem{
Value: v.GetValue(),
Version: v.GetVersion(),
Metadata: v.GetMetadata(),
}
}
return configItems, nil
}
type ConfigurationHandleFunction func(string, map[string]*ConfigurationItem)
func (c *GRPCClient) SubscribeConfigurationItems(ctx context.Context, storeName string, keys []string, handler ConfigurationHandleFunction, opts ...ConfigurationOpt) (string, error) {
metadata := make(map[string]string)
for _, opt := range opts {
opt(metadata)
}
client, err := c.protoClient.SubscribeConfiguration(ctx, &pb.SubscribeConfigurationRequest{
StoreName: storeName,
Keys: keys,
Metadata: metadata,
})
if err != nil {
return "", fmt.Errorf("subscribe configuration failed with error = %w", err)
}
subscribeIDChan := make(chan string, 1)
go func() {
isFirst := true
for {
rsp, err := client.Recv()
if errors.Is(err, io.EOF) || rsp == nil {
// receive goroutine would close if unsubscribe is called.
fmt.Println("dapr configuration subscribe finished.")
break
}
configurationItems := make(map[string]*ConfigurationItem)
for k, v := range rsp.GetItems() {
configurationItems[k] = &ConfigurationItem{
Value: v.GetValue(),
Version: v.GetVersion(),
Metadata: v.GetMetadata(),
}
}
// Get the subscription ID from the first response.
if isFirst {
subscribeIDChan <- rsp.GetId()
isFirst = false
}
// Do not invoke handler in case there are no items.
if len(configurationItems) > 0 {
handler(rsp.GetId(), configurationItems)
}
}
}()
subscribeID := <-subscribeIDChan
close(subscribeIDChan)
return subscribeID, nil
}
func (c *GRPCClient) UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error {
resp, err := c.protoClient.UnsubscribeConfiguration(ctx, &pb.UnsubscribeConfigurationRequest{
StoreName: storeName,
Id: id,
})
if err != nil {
return fmt.Errorf("unsubscribe failed with error = %w", err)
}
if !resp.GetOk() {
return fmt.Errorf("unsubscribe error message = %s", resp.GetMessage())
}
return nil
}