Cosmos - Nested PartitionKey Support (#296)
* Nested partition key in cosmosdb bindings * fix lint * Lookup values by iterating * fix lint Co-authored-by: Aman Bhardwaj <amanbha@users.noreply.github.com>
This commit is contained in:
parent
75fcd7accf
commit
fd531b006f
|
@ -8,6 +8,7 @@ package cosmosdb
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/a8m/documentdb"
|
||||
"github.com/dapr/components-contrib/bindings"
|
||||
|
@ -103,13 +104,54 @@ func (c *CosmosDB) Write(req *bindings.WriteRequest) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if val, ok := obj.(map[string]interface{})[c.partitionKey]; ok && val != "" {
|
||||
_, err = c.client.CreateDocument(c.collection.Self, obj, documentdb.PartitionKey(val))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
val, err := c.getPartitionKeyValue(c.partitionKey, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("missing partitionKey field %s from request body", c.partitionKey)
|
||||
|
||||
_, err = c.client.CreateDocument(c.collection.Self, obj, documentdb.PartitionKey(val))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (interface{}, error) {
|
||||
val, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, "."))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("missing partitionKey field %s from request body - %s", c.partitionKey, err)
|
||||
}
|
||||
|
||||
if val == "" {
|
||||
return nil, fmt.Errorf("partitionKey field %s from request body is empty", c.partitionKey)
|
||||
}
|
||||
|
||||
return val, nil
|
||||
}
|
||||
|
||||
func (c *CosmosDB) lookup(m map[string]interface{}, ks []string) (val interface{}, err error) {
|
||||
var ok bool
|
||||
|
||||
if len(ks) == 0 {
|
||||
return nil, fmt.Errorf("needs at least one key")
|
||||
}
|
||||
|
||||
c.logger.Infof("%s, %s", ks[0], m[ks[0]])
|
||||
|
||||
if val, ok = m[ks[0]]; !ok {
|
||||
return nil, fmt.Errorf("key not found %v", ks[0])
|
||||
}
|
||||
|
||||
// Last Key
|
||||
if len(ks) == 1 {
|
||||
return val, nil
|
||||
}
|
||||
|
||||
// Convert val to map to iterate again
|
||||
if m, ok = val.(map[string]interface{}); !ok {
|
||||
return nil, fmt.Errorf("invalid structure at %#v", val)
|
||||
}
|
||||
|
||||
return c.lookup(m, ks[1:])
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package cosmosdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/dapr/components-contrib/bindings"
|
||||
|
@ -25,3 +26,43 @@ func TestParseMetadata(t *testing.T) {
|
|||
assert.Equal(t, "a", meta.PartitionKey)
|
||||
assert.Equal(t, "a", meta.URL)
|
||||
}
|
||||
|
||||
func TestPartitionKeyValue(t *testing.T) {
|
||||
m := bindings.Metadata{}
|
||||
m.Properties = map[string]string{"Collection": "a", "Database": "a", "MasterKey": "a", "PartitionKey": "a", "URL": "a"}
|
||||
cosmosDB := CosmosDB{logger: logger.NewLogger("test")}
|
||||
var obj interface{}
|
||||
jsonStr := `{"name": "name", "empty" : "", "address": { "planet" : { "name": "earth" }, "zip" : "zipcode" }}`
|
||||
json.Unmarshal([]byte(jsonStr), &obj)
|
||||
|
||||
// Valid single partition key
|
||||
val, err := cosmosDB.getPartitionKeyValue("name", obj)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "name", val)
|
||||
|
||||
// Not existing key
|
||||
_, err = cosmosDB.getPartitionKeyValue("notexists", obj)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// // Empty value for the key
|
||||
_, err = cosmosDB.getPartitionKeyValue("empty", obj)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// Valid nested partition key
|
||||
val, err = cosmosDB.getPartitionKeyValue("address.zip", obj)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "zipcode", val)
|
||||
|
||||
// Valid nested three level partition key
|
||||
val, err = cosmosDB.getPartitionKeyValue("address.planet.name", obj)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "earth", val)
|
||||
|
||||
//Invalid nested partition key
|
||||
_, err = cosmosDB.getPartitionKeyValue("address.notexists", obj)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// Empty key is passed
|
||||
_, err = cosmosDB.getPartitionKeyValue("", obj)
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue