mirror of https://github.com/dapr/go-sdk.git
127 lines
3.3 KiB
Go
127 lines
3.3 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
|
|
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
|
|
)
|
|
|
|
type ConfigurationItem struct {
|
|
Value string
|
|
Version string
|
|
Metadata map[string]string
|
|
}
|
|
|
|
type ConfigurationOpt func(map[string]string)
|
|
|
|
func WithConfigurationMetadata(key, value string) ConfigurationOpt {
|
|
return func(m map[string]string) {
|
|
m[key] = value
|
|
}
|
|
}
|
|
|
|
func (c *GRPCClient) GetConfigurationItem(ctx context.Context, storeName, key string, opts ...ConfigurationOpt) (*ConfigurationItem, error) {
|
|
items, err := c.GetConfigurationItems(ctx, storeName, []string{key}, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(items) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
return items[key], nil
|
|
}
|
|
|
|
func (c *GRPCClient) GetConfigurationItems(ctx context.Context, storeName string, keys []string, opts ...ConfigurationOpt) (map[string]*ConfigurationItem, error) {
|
|
metadata := make(map[string]string)
|
|
for _, opt := range opts {
|
|
opt(metadata)
|
|
}
|
|
rsp, err := c.protoClient.GetConfiguration(ctx, &pb.GetConfigurationRequest{
|
|
StoreName: storeName,
|
|
Keys: keys,
|
|
Metadata: metadata,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
configItems := make(map[string]*ConfigurationItem)
|
|
for k, v := range rsp.GetItems() {
|
|
configItems[k] = &ConfigurationItem{
|
|
Value: v.GetValue(),
|
|
Version: v.GetVersion(),
|
|
Metadata: v.GetMetadata(),
|
|
}
|
|
}
|
|
return configItems, nil
|
|
}
|
|
|
|
type ConfigurationHandleFunction func(string, map[string]*ConfigurationItem)
|
|
|
|
func (c *GRPCClient) SubscribeConfigurationItems(ctx context.Context, storeName string, keys []string, handler ConfigurationHandleFunction, opts ...ConfigurationOpt) (string, error) {
|
|
metadata := make(map[string]string)
|
|
for _, opt := range opts {
|
|
opt(metadata)
|
|
}
|
|
|
|
client, err := c.protoClient.SubscribeConfiguration(ctx, &pb.SubscribeConfigurationRequest{
|
|
StoreName: storeName,
|
|
Keys: keys,
|
|
Metadata: metadata,
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("subscribe configuration failed with error = %w", err)
|
|
}
|
|
subscribeIDChan := make(chan string, 1)
|
|
go func() {
|
|
isFirst := true
|
|
for {
|
|
rsp, err := client.Recv()
|
|
if errors.Is(err, io.EOF) || rsp == nil {
|
|
// receive goroutine would close if unsubscribe is called.
|
|
fmt.Println("dapr configuration subscribe finished.")
|
|
break
|
|
}
|
|
configurationItems := make(map[string]*ConfigurationItem)
|
|
|
|
for k, v := range rsp.GetItems() {
|
|
configurationItems[k] = &ConfigurationItem{
|
|
Value: v.GetValue(),
|
|
Version: v.GetVersion(),
|
|
Metadata: v.GetMetadata(),
|
|
}
|
|
}
|
|
// Get the subscription ID from the first response.
|
|
if isFirst {
|
|
subscribeIDChan <- rsp.GetId()
|
|
isFirst = false
|
|
}
|
|
// Do not invoke handler in case there are no items.
|
|
if len(configurationItems) > 0 {
|
|
handler(rsp.GetId(), configurationItems)
|
|
}
|
|
}
|
|
}()
|
|
subscribeID := <-subscribeIDChan
|
|
close(subscribeIDChan)
|
|
return subscribeID, nil
|
|
}
|
|
|
|
func (c *GRPCClient) UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error {
|
|
resp, err := c.protoClient.UnsubscribeConfiguration(ctx, &pb.UnsubscribeConfigurationRequest{
|
|
StoreName: storeName,
|
|
Id: id,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("unsubscribe failed with error = %w", err)
|
|
}
|
|
if !resp.GetOk() {
|
|
return fmt.Errorf("unsubscribe error message = %s", resp.GetMessage())
|
|
}
|
|
return nil
|
|
}
|