components-contrib/bindings/alicloud/tablestore/tablestore.go

356 lines
8.9 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tablestore
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
"time"
"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore"
"github.com/dapr/components-contrib/bindings"
contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
kitmd "github.com/dapr/kit/metadata"
)
const (
tableName = "tableName"
columnToGet = "columnToGet"
primaryKeys = "primaryKeys"
invokeStartTimeKey = "start-time"
invokeEndTimeKey = "end-time"
invokeDurationKey = "duration"
)
type tablestoreMetadata struct {
Endpoint string `json:"endpoint" mapstructure:"endpoint"`
AccessKeyID string `json:"accessKeyID" mapstructure:"accessKeyID"`
AccessKey string `json:"accessKey" mapstructure:"accessKey"`
InstanceName string `json:"instanceName" mapstructure:"instanceName"`
TableName string `json:"tableName" mapstructure:"tableName"`
}
type AliCloudTableStore struct {
logger logger.Logger
client *tablestore.TableStoreClient
metadata tablestoreMetadata
}
func NewAliCloudTableStore(log logger.Logger) bindings.OutputBinding {
return &AliCloudTableStore{
logger: log,
client: nil,
}
}
func (s *AliCloudTableStore) Init(_ context.Context, metadata bindings.Metadata) error {
m, err := s.parseMetadata(metadata)
if err != nil {
return err
}
s.metadata = *m
s.client = tablestore.NewClient(m.Endpoint, m.InstanceName, m.AccessKeyID, m.AccessKey)
return nil
}
func (s *AliCloudTableStore) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
if req == nil {
return nil, errors.New("invoke request required")
}
startTime := time.Now()
resp := &bindings.InvokeResponse{
Metadata: map[string]string{
invokeStartTimeKey: startTime.Format(time.RFC3339Nano),
},
}
switch req.Operation {
case bindings.GetOperation:
err := s.get(req, resp)
if err != nil {
return nil, err
}
case bindings.ListOperation:
err := s.list(req, resp)
if err != nil {
return nil, err
}
case bindings.CreateOperation:
err := s.create(req, resp)
if err != nil {
return nil, err
}
case bindings.DeleteOperation:
err := s.delete(req, resp)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("invalid operation type: %s. Expected %s, %s, %s, or %s",
req.Operation, bindings.GetOperation, bindings.ListOperation, bindings.CreateOperation, bindings.DeleteOperation)
}
endTime := time.Now()
resp.Metadata[invokeEndTimeKey] = endTime.Format(time.RFC3339Nano)
resp.Metadata[invokeDurationKey] = endTime.Sub(startTime).String()
return resp, nil
}
func (s *AliCloudTableStore) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation, bindings.DeleteOperation, bindings.GetOperation, bindings.ListOperation}
}
func (s *AliCloudTableStore) parseMetadata(metadata bindings.Metadata) (*tablestoreMetadata, error) {
m := tablestoreMetadata{}
err := kitmd.DecodeMetadata(metadata.Properties, &m)
if err != nil {
return nil, err
}
return &m, nil
}
func (s *AliCloudTableStore) get(req *bindings.InvokeRequest, resp *bindings.InvokeResponse) error {
columns := strings.Split(req.Metadata[columnToGet], ",")
pkNames := strings.Split(req.Metadata[primaryKeys], ",")
pks := make([]*tablestore.PrimaryKeyColumn, len(pkNames))
data := make(map[string]interface{})
err := json.Unmarshal(req.Data, &data)
if err != nil {
return err
}
for idx, pkName := range pkNames {
pks[idx] = &tablestore.PrimaryKeyColumn{
ColumnName: pkName,
Value: data[pkName],
}
}
criteria := &tablestore.SingleRowQueryCriteria{
TableName: s.getTableName(req.Metadata),
PrimaryKey: &tablestore.PrimaryKey{PrimaryKeys: pks},
ColumnsToGet: columns,
MaxVersion: 1,
}
getRowReq := &tablestore.GetRowRequest{
SingleRowQueryCriteria: criteria,
}
getRowResp, err := s.client.GetRow(getRowReq)
if err != nil {
return err
}
ret, err := s.unmarshal(getRowResp.PrimaryKey.PrimaryKeys, getRowResp.Columns)
if err != nil {
return err
}
if ret == nil {
resp.Data = nil
return nil
}
resp.Data, err = json.Marshal(ret)
return err
}
func (s *AliCloudTableStore) list(req *bindings.InvokeRequest, resp *bindings.InvokeResponse) error {
columns := strings.Split(req.Metadata[columnToGet], ",")
pkNames := strings.Split(req.Metadata[primaryKeys], ",")
var data []map[string]interface{}
err := json.Unmarshal(req.Data, &data)
if err != nil {
return err
}
criteria := &tablestore.MultiRowQueryCriteria{
TableName: s.getTableName(req.Metadata),
ColumnsToGet: columns,
MaxVersion: 1,
}
for _, item := range data {
pk := &tablestore.PrimaryKey{}
for _, pkName := range pkNames {
pk.AddPrimaryKeyColumn(pkName, item[pkName])
}
criteria.AddRow(pk)
}
getRowRequest := &tablestore.BatchGetRowRequest{}
getRowRequest.MultiRowQueryCriteria = append(getRowRequest.MultiRowQueryCriteria, criteria)
getRowResp, err := s.client.BatchGetRow(getRowRequest)
if err != nil {
return err
}
var ret []interface{}
for _, criteria := range getRowRequest.MultiRowQueryCriteria {
for _, row := range getRowResp.TableToRowsResult[criteria.TableName] {
rowData, rowErr := s.unmarshal(row.PrimaryKey.PrimaryKeys, row.Columns)
if rowErr != nil {
return rowErr
}
ret = append(ret, rowData)
}
}
resp.Data, err = json.Marshal(ret)
return err
}
func (s *AliCloudTableStore) create(req *bindings.InvokeRequest, resp *bindings.InvokeResponse) error {
data := make(map[string]interface{})
err := json.Unmarshal(req.Data, &data)
if err != nil {
return err
}
pkNames := strings.Split(req.Metadata[primaryKeys], ",")
pks := make([]*tablestore.PrimaryKeyColumn, len(pkNames))
columns := make([]tablestore.AttributeColumn, len(data)-len(pkNames))
for idx, pk := range pkNames {
pks[idx] = &tablestore.PrimaryKeyColumn{
ColumnName: pk,
Value: data[pk],
}
}
idx := 0
for key, val := range data {
if !contains(pkNames, key) {
columns[idx] = tablestore.AttributeColumn{
ColumnName: key,
Value: val,
}
idx++
}
}
change := tablestore.PutRowChange{
TableName: s.getTableName(req.Metadata),
PrimaryKey: &tablestore.PrimaryKey{PrimaryKeys: pks},
Columns: columns,
ReturnType: tablestore.ReturnType_RT_NONE, //nolint:nosnakecase
TransactionId: nil,
}
change.SetCondition(tablestore.RowExistenceExpectation_IGNORE) //nolint:nosnakecase
putRequest := &tablestore.PutRowRequest{
PutRowChange: &change,
}
_, err = s.client.PutRow(putRequest)
if err != nil {
return err
}
return nil
}
func (s *AliCloudTableStore) delete(req *bindings.InvokeRequest, resp *bindings.InvokeResponse) error {
pkNams := strings.Split(req.Metadata[primaryKeys], ",")
pks := make([]*tablestore.PrimaryKeyColumn, len(pkNams))
data := make(map[string]interface{})
err := json.Unmarshal(req.Data, &data)
if err != nil {
return err
}
for idx, pkName := range pkNams {
pks[idx] = &tablestore.PrimaryKeyColumn{
ColumnName: pkName,
Value: data[pkName],
}
}
change := &tablestore.DeleteRowChange{
TableName: s.getTableName(req.Metadata),
PrimaryKey: &tablestore.PrimaryKey{PrimaryKeys: pks},
}
change.SetCondition(tablestore.RowExistenceExpectation_IGNORE) //nolint:nosnakecase
deleteReq := &tablestore.DeleteRowRequest{DeleteRowChange: change}
_, err = s.client.DeleteRow(deleteReq)
if err != nil {
return err
}
return nil
}
func (s *AliCloudTableStore) unmarshal(pks []*tablestore.PrimaryKeyColumn, columns []*tablestore.AttributeColumn) (map[string]interface{}, error) {
if pks == nil && columns == nil {
return nil, nil
}
data := make(map[string]interface{})
for _, pk := range pks {
data[pk.ColumnName] = pk.Value
}
for _, column := range columns {
data[column.ColumnName] = column.Value
}
return data, nil
}
func (s *AliCloudTableStore) getTableName(metadata map[string]string) string {
name := metadata[tableName]
if name == "" {
name = s.metadata.TableName
}
return name
}
func contains(arr []string, str string) bool {
for _, a := range arr {
if a == str {
return true
}
}
return false
}
// GetComponentMetadata returns the metadata of the component.
func (s *AliCloudTableStore) GetComponentMetadata() (metadataInfo contribMetadata.MetadataMap) {
metadataStruct := tablestoreMetadata{}
contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribMetadata.BindingType)
return
}