update config subscribe method (#389)

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>
This commit is contained in:
Shivam Kumar 2023-04-28 03:37:16 +05:30 committed by GitHub
parent 78a1fe63ea
commit 99dc3e31d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 52 additions and 52 deletions

View File

@ -132,7 +132,7 @@ type Client interface {
GetConfigurationItems(ctx context.Context, storeName string, keys []string, opts ...ConfigurationOpt) (map[string]*ConfigurationItem, error)
// SubscribeConfigurationItems can subscribe the change of configuration items by storeName and keys, and return subscription id
SubscribeConfigurationItems(ctx context.Context, storeName string, keys []string, handler ConfigurationHandleFunction, opts ...ConfigurationOpt) error
SubscribeConfigurationItems(ctx context.Context, storeName string, keys []string, handler ConfigurationHandleFunction, opts ...ConfigurationOpt) (string, error)
// UnsubscribeConfigurationItems can stop the subscription with target store's and id
UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error

View File

@ -446,6 +446,14 @@ func (s *testDaprServer) SubscribeConfigurationAlpha1(in *pb.SubscribeConfigurat
s.configurationSubscriptionIDMapLoc.Lock()
s.configurationSubscriptionID[id.String()] = stopCh
s.configurationSubscriptionIDMapLoc.Unlock()
// Send subscription ID in the first response.
if err := server.Send(&pb.SubscribeConfigurationResponse{
Id: id.String(),
}); err != nil {
return err
}
for i := 0; i < 5; i++ {
select {
case <-stopCh:

View File

@ -62,34 +62,31 @@ func (c *GRPCClient) GetConfigurationItems(ctx context.Context, storeName string
type ConfigurationHandleFunction func(string, map[string]*ConfigurationItem)
func (c *GRPCClient) SubscribeConfigurationItems(ctx context.Context, storeName string, keys []string, handler ConfigurationHandleFunction, opts ...ConfigurationOpt) error {
func (c *GRPCClient) SubscribeConfigurationItems(ctx context.Context, storeName string, keys []string, handler ConfigurationHandleFunction, opts ...ConfigurationOpt) (string, error) {
metadata := make(map[string]string)
for _, opt := range opts {
opt(metadata)
}
client, err := c.protoClient.SubscribeConfigurationAlpha1(ctx, &pb.SubscribeConfigurationRequest{
StoreName: storeName,
Keys: keys,
Metadata: metadata,
})
if err != nil {
return fmt.Errorf("subscribe configuration failed with error = %w", err)
return "", fmt.Errorf("subscribe configuration failed with error = %w", err)
}
var subscribeID string
stopCh := make(chan struct{})
subscribeIDChan := make(chan string, 1)
go func() {
isFirst := true
for {
rsp, err := client.Recv()
if errors.Is(err, io.EOF) || rsp == nil {
// receive goroutine would close if unsubscribe is called.
fmt.Println("dapr configuration subscribe finished.")
close(stopCh)
break
}
subscribeID = rsp.Id
configurationItems := make(map[string]*ConfigurationItem)
for k, v := range rsp.Items {
configurationItems[k] = &ConfigurationItem{
Value: v.Value,
@ -97,15 +94,20 @@ func (c *GRPCClient) SubscribeConfigurationItems(ctx context.Context, storeName
Metadata: v.Metadata,
}
}
handler(rsp.Id, configurationItems)
// Get the subscription ID from the first response.
if isFirst {
subscribeIDChan <- rsp.Id
isFirst = false
}
// Do not invoke handler in case there are no items.
if len(configurationItems) > 0 {
handler(rsp.Id, configurationItems)
}
}
}()
select {
case <-ctx.Done():
return c.UnsubscribeConfigurationItems(context.Background(), storeName, subscribeID)
case <-stopCh:
return nil
}
subscribeID := <-subscribeIDChan
close(subscribeIDChan)
return subscribeID, nil
}
func (c *GRPCClient) UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error {

View File

@ -44,23 +44,24 @@ func TestGetConfigurationItems(t *testing.T) {
func TestSubscribeConfigurationItems(t *testing.T) {
ctx := context.Background()
counter := 0
totalCounter := 0
var counter, totalCounter uint32
counter = 0
totalCounter = 0
keys := []string{"mykey1", "mykey2", "mykey3"}
t.Run("Test subscribe configuration items", func(t *testing.T) {
err := testClient.SubscribeConfigurationItems(ctx, "example-config",
_, err := testClient.SubscribeConfigurationItems(ctx, "example-config",
keys, func(s string, items map[string]*ConfigurationItem) {
counter++
atomic.AddUint32(&counter, 1)
for _, k := range keys {
assert.Equal(t, k+valueSuffix, items[k].Value)
totalCounter++
atomic.AddUint32(&totalCounter, 1)
}
})
assert.Nil(t, err)
})
time.Sleep(time.Second*5 + time.Millisecond*500)
assert.Equal(t, 5, counter)
assert.Equal(t, 15, totalCounter)
assert.Equal(t, uint32(5), atomic.LoadUint32(&counter))
assert.Equal(t, uint32(15), atomic.LoadUint32(&totalCounter))
}
func TestUnSubscribeConfigurationItems(t *testing.T) {
@ -68,27 +69,19 @@ func TestUnSubscribeConfigurationItems(t *testing.T) {
var counter, totalCounter uint32
t.Run("Test unsubscribe configuration items", func(t *testing.T) {
subscribeIDChan := make(chan string)
go func() {
keys := []string{"mykey1", "mykey2", "mykey3"}
err := testClient.SubscribeConfigurationItems(ctx, "example-config",
keys, func(id string, items map[string]*ConfigurationItem) {
atomic.AddUint32(&counter, 1)
for _, k := range keys {
assert.Equal(t, k+valueSuffix, items[k].Value)
atomic.AddUint32(&totalCounter, 1)
}
select {
case subscribeIDChan <- id:
default:
}
})
assert.Nil(t, err)
}()
subscribeID := <-subscribeIDChan
keys := []string{"mykey1", "mykey2", "mykey3"}
subscribeID, err := testClient.SubscribeConfigurationItems(ctx, "example-config",
keys, func(id string, items map[string]*ConfigurationItem) {
atomic.AddUint32(&counter, 1)
for _, k := range keys {
assert.Equal(t, k+valueSuffix, items[k].Value)
atomic.AddUint32(&totalCounter, 1)
}
})
assert.Nil(t, err)
time.Sleep(time.Second * 2)
time.Sleep(time.Millisecond * 500)
err := testClient.UnsubscribeConfigurationItems(ctx, "example-config", subscribeID)
err = testClient.UnsubscribeConfigurationItems(ctx, "example-config", subscribeID)
assert.Nil(t, err)
})
time.Sleep(time.Second * 5)

View File

@ -48,17 +48,14 @@ func main() {
md := metadata.Pairs("dapr-app-id", "configuration-api")
ctx = metadata.NewOutgoingContext(ctx, md)
defer f()
var subscribeID string
go func() {
if err := client.SubscribeConfigurationItems(ctx, "example-config", []string{"mySubscribeKey1", "mySubscribeKey2", "mySubscribeKey3"}, func(id string, items map[string]*dapr.ConfigurationItem) {
for k, v := range items {
fmt.Printf("get updated config key = %s, value = %s \n", k, v.Value)
}
subscribeID = id
}); err != nil {
panic(err)
subscribeID, err := client.SubscribeConfigurationItems(ctx, "example-config", []string{"mySubscribeKey1", "mySubscribeKey2", "mySubscribeKey3"}, func(id string, items map[string]*dapr.ConfigurationItem) {
for k, v := range items {
fmt.Printf("get updated config key = %s, value = %s \n", k, v.Value)
}
}()
})
if err != nil {
panic(err)
}
time.Sleep(time.Second*3 + time.Millisecond*500)
// dapr configuration unsubscribe called.