mirror of https://github.com/dapr/go-sdk.git
467 lines
13 KiB
Go
467 lines
13 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package client
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"strconv"
|
|
|
|
"google.golang.org/protobuf/types/known/anypb"
|
|
|
|
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
|
|
"github.com/dapr/go-sdk/actor"
|
|
"github.com/dapr/go-sdk/actor/codec"
|
|
"github.com/dapr/go-sdk/actor/config"
|
|
)
|
|
|
|
const (
|
|
metadataKeyTTLInSeconds = "ttlInSeconds"
|
|
)
|
|
|
|
type InvokeActorRequest struct {
|
|
ActorType string
|
|
ActorID string
|
|
Method string
|
|
Data []byte
|
|
}
|
|
|
|
type InvokeActorResponse struct {
|
|
Data []byte
|
|
}
|
|
|
|
// InvokeActor invokes specific operation on the configured Dapr binding.
|
|
// This method covers input, output, and bi-directional bindings.
|
|
func (c *GRPCClient) InvokeActor(ctx context.Context, in *InvokeActorRequest) (out *InvokeActorResponse, err error) {
|
|
if in == nil {
|
|
return nil, errors.New("actor invocation required")
|
|
}
|
|
if in.Method == "" {
|
|
return nil, errors.New("actor invocation method required")
|
|
}
|
|
if in.ActorType == "" {
|
|
return nil, errors.New("actor invocation actorType required")
|
|
}
|
|
if in.ActorID == "" {
|
|
return nil, errors.New("actor invocation actorID required")
|
|
}
|
|
|
|
req := &pb.InvokeActorRequest{
|
|
ActorType: in.ActorType,
|
|
ActorId: in.ActorID,
|
|
Method: in.Method,
|
|
Data: in.Data,
|
|
}
|
|
|
|
resp, err := c.protoClient.InvokeActor(ctx, req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error invoking binding %s/%s: %w", in.ActorType, in.ActorID, err)
|
|
}
|
|
|
|
out = &InvokeActorResponse{}
|
|
|
|
if resp != nil {
|
|
out.Data = resp.GetData()
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// ImplActorClientStub impls the given client stub @actorClientStub, an example of client stub is as followed
|
|
/*
|
|
type ClientStub struct {
|
|
// User defined function
|
|
GetUser func(context.Context, *User) (*User, error)
|
|
Invoke func(context.Context, string) (string, error)
|
|
Get func(context.Context) (string, error)
|
|
Post func(context.Context, string) error
|
|
StartTimer func(context.Context, *TimerRequest) error
|
|
StopTimer func(context.Context, *TimerRequest) error
|
|
...
|
|
}
|
|
|
|
// Type defined the target type, which should be compatible with server side actor
|
|
func (a *ClientStub) Type() string {
|
|
return "testActorType"
|
|
}
|
|
|
|
// ID defined actor ID to be invoked
|
|
func (a *ClientStub) ID() string {
|
|
return "ActorImplID123456"
|
|
}.
|
|
*/
|
|
func (c *GRPCClient) ImplActorClientStub(actorClientStub actor.Client, opt ...config.Option) {
|
|
serializerType := config.GetConfigFromOptions(opt...).SerializerType
|
|
serializer, err := codec.GetActorCodec(serializerType)
|
|
if err != nil {
|
|
fmt.Printf("[Actor] ERROR: serializer type %s unsupported\n", serializerType)
|
|
return
|
|
}
|
|
|
|
c.implActor(actorClientStub, serializer)
|
|
}
|
|
|
|
type RegisterActorReminderRequest struct {
|
|
ActorType string
|
|
ActorID string
|
|
Name string
|
|
DueTime string
|
|
Period string
|
|
TTL string
|
|
Data []byte
|
|
}
|
|
|
|
// RegisterActorReminder registers a new reminder to target actor. Then, a reminder would be created and
|
|
// invoke actor's ReminderCall function if implemented.
|
|
// If server side actor impls this function, it's asserted to actor.ReminderCallee and can be invoked with call period
|
|
// and state data as param @in defined.
|
|
// Scheduling parameters 'DueTime', 'Period', and 'TTL' are optional.
|
|
func (c *GRPCClient) RegisterActorReminder(ctx context.Context, in *RegisterActorReminderRequest) (err error) {
|
|
if in == nil {
|
|
return errors.New("actor register reminder invocation request param required")
|
|
}
|
|
if in.ActorType == "" {
|
|
return errors.New("actor register reminder invocation actorType required")
|
|
}
|
|
if in.ActorID == "" {
|
|
return errors.New("actor register reminder invocation actorID required")
|
|
}
|
|
if in.Name == "" {
|
|
return errors.New("actor register reminder invocation name required")
|
|
}
|
|
|
|
req := &pb.RegisterActorReminderRequest{
|
|
ActorType: in.ActorType,
|
|
ActorId: in.ActorID,
|
|
Name: in.Name,
|
|
DueTime: in.DueTime,
|
|
Period: in.Period,
|
|
Ttl: in.TTL,
|
|
Data: in.Data,
|
|
}
|
|
|
|
_, err = c.protoClient.RegisterActorReminder(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("error invoking register actor reminder %s/%s: %w", in.ActorType, in.ActorID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type UnregisterActorReminderRequest struct {
|
|
ActorType string
|
|
ActorID string
|
|
Name string
|
|
}
|
|
|
|
// UnregisterActorReminder would unregister the actor reminder.
|
|
func (c *GRPCClient) UnregisterActorReminder(ctx context.Context, in *UnregisterActorReminderRequest) error {
|
|
if in == nil {
|
|
return errors.New("actor unregister reminder invocation request param required")
|
|
}
|
|
if in.ActorType == "" {
|
|
return errors.New("actor unregister reminder invocation actorType required")
|
|
}
|
|
if in.ActorID == "" {
|
|
return errors.New("actor unregister reminder invocation actorID required")
|
|
}
|
|
if in.Name == "" {
|
|
return errors.New("actor unregister reminder invocation name required")
|
|
}
|
|
|
|
req := &pb.UnregisterActorReminderRequest{
|
|
ActorType: in.ActorType,
|
|
ActorId: in.ActorID,
|
|
Name: in.Name,
|
|
}
|
|
|
|
_, err := c.protoClient.UnregisterActorReminder(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("error invoking unregister actor reminder %s/%s: %w", in.ActorType, in.ActorID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type RegisterActorTimerRequest struct {
|
|
ActorType string
|
|
ActorID string
|
|
Name string
|
|
DueTime string
|
|
Period string
|
|
TTL string
|
|
Data []byte
|
|
CallBack string
|
|
}
|
|
|
|
// RegisterActorTimer register actor timer as given param @in defined.
|
|
// Scheduling parameters 'DueTime', 'Period', and 'TTL' are optional.
|
|
func (c *GRPCClient) RegisterActorTimer(ctx context.Context, in *RegisterActorTimerRequest) (err error) {
|
|
if in == nil {
|
|
return errors.New("actor register timer invocation request param required")
|
|
}
|
|
if in.ActorType == "" {
|
|
return errors.New("actor register timer invocation actorType required")
|
|
}
|
|
if in.ActorID == "" {
|
|
return errors.New("actor register timer invocation actorID required")
|
|
}
|
|
if in.Name == "" {
|
|
return errors.New("actor register timer invocation name required")
|
|
}
|
|
if in.CallBack == "" {
|
|
return errors.New("actor register timer invocation callback function required")
|
|
}
|
|
|
|
req := &pb.RegisterActorTimerRequest{
|
|
ActorType: in.ActorType,
|
|
ActorId: in.ActorID,
|
|
Name: in.Name,
|
|
DueTime: in.DueTime,
|
|
Period: in.Period,
|
|
Ttl: in.TTL,
|
|
Data: in.Data,
|
|
Callback: in.CallBack,
|
|
}
|
|
|
|
_, err = c.protoClient.RegisterActorTimer(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("error invoking actor register timer %s/%s: %w", in.ActorType, in.ActorID, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type UnregisterActorTimerRequest struct {
|
|
ActorType string
|
|
ActorID string
|
|
Name string
|
|
}
|
|
|
|
// UnregisterActorTimer unregisters actor timer.
|
|
func (c *GRPCClient) UnregisterActorTimer(ctx context.Context, in *UnregisterActorTimerRequest) error {
|
|
if in == nil {
|
|
return errors.New("actor unregister timer invocation request param required")
|
|
}
|
|
if in.ActorType == "" {
|
|
return errors.New("actor unregister timer invocation actorType required")
|
|
}
|
|
if in.ActorID == "" {
|
|
return errors.New("actor unregister timer invocation actorID required")
|
|
}
|
|
if in.Name == "" {
|
|
return errors.New("actor unregister timer invocation name required")
|
|
}
|
|
req := &pb.UnregisterActorTimerRequest{
|
|
ActorType: in.ActorType,
|
|
ActorId: in.ActorID,
|
|
Name: in.Name,
|
|
}
|
|
|
|
_, err := c.protoClient.UnregisterActorTimer(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("error invoking binding %s/%s: %w", in.ActorType, in.ActorID, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *GRPCClient) implActor(actor actor.Client, serializer codec.Codec) {
|
|
actorValue := reflect.ValueOf(actor)
|
|
valueOfActor := actorValue.Elem()
|
|
typeOfActor := valueOfActor.Type()
|
|
|
|
// check incoming interface, the incoming interface's elem must be a struct.
|
|
if typeOfActor.Kind() != reflect.Struct {
|
|
fmt.Println("[Actor] ERROR: impl actor client stub failed, incoming interface is not struct")
|
|
return
|
|
}
|
|
|
|
numField := valueOfActor.NumField()
|
|
for i := range numField {
|
|
t := typeOfActor.Field(i)
|
|
methodName := t.Name
|
|
if methodName == "Type" {
|
|
continue
|
|
}
|
|
f := valueOfActor.Field(i)
|
|
if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() {
|
|
outNum := t.Type.NumOut()
|
|
|
|
if outNum != 1 && outNum != 2 {
|
|
fmt.Printf("[Actor] ERROR: method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2\n",
|
|
t.Name, t.Type.String(), outNum)
|
|
continue
|
|
}
|
|
|
|
// The latest return type of the method must be error.
|
|
if returnType := t.Type.Out(outNum - 1); returnType != reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type() {
|
|
fmt.Printf("[Actor] ERROR: the latest return type %s of method %q is not error\n", returnType, t.Name)
|
|
continue
|
|
}
|
|
|
|
funcOuts := make([]reflect.Type, outNum)
|
|
for i := range outNum {
|
|
funcOuts[i] = t.Type.Out(i)
|
|
}
|
|
|
|
f.Set(reflect.MakeFunc(f.Type(), c.makeCallProxyFunction(actor, methodName, funcOuts, serializer)))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *GRPCClient) makeCallProxyFunction(actor actor.Client, methodName string, outs []reflect.Type, serializer codec.Codec) func(in []reflect.Value) []reflect.Value {
|
|
return func(in []reflect.Value) []reflect.Value {
|
|
var (
|
|
err error
|
|
inIArr []interface{}
|
|
reply reflect.Value
|
|
)
|
|
|
|
if len(outs) == 2 {
|
|
if outs[0].Kind() == reflect.Ptr {
|
|
reply = reflect.New(outs[0].Elem())
|
|
} else {
|
|
reply = reflect.New(outs[0])
|
|
}
|
|
}
|
|
|
|
start := 0
|
|
end := len(in)
|
|
invCtx := context.Background()
|
|
if end > 0 {
|
|
if in[0].Type().String() == "context.Context" {
|
|
if !in[0].IsNil() {
|
|
invCtx = in[0].Interface().(context.Context)
|
|
}
|
|
start++
|
|
}
|
|
}
|
|
|
|
if end-start <= 0 {
|
|
inIArr = []interface{}{}
|
|
} else if end-start == 1 {
|
|
inIArr = []interface{}{in[start].Interface()}
|
|
} else {
|
|
fmt.Println("[Actor] ERROR: param nums is zero or one is allowed by actor")
|
|
return nil
|
|
}
|
|
|
|
var data []byte
|
|
if len(inIArr) > 0 {
|
|
data, err = serializer.Marshal(inIArr[0])
|
|
}
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
rsp, err := c.InvokeActor(invCtx, &InvokeActorRequest{
|
|
ActorType: actor.Type(),
|
|
ActorID: actor.ID(),
|
|
Method: methodName,
|
|
Data: data,
|
|
})
|
|
|
|
if len(outs) == 1 {
|
|
return []reflect.Value{reflect.ValueOf(&err).Elem()}
|
|
}
|
|
|
|
response := reply.Interface()
|
|
if rsp != nil {
|
|
if err = serializer.Unmarshal(rsp.Data, response); err != nil {
|
|
fmt.Printf("[Actor] ERROR: unmarshal response err = %v\n", err)
|
|
}
|
|
}
|
|
if len(outs) == 2 && outs[0].Kind() != reflect.Ptr {
|
|
return []reflect.Value{reply.Elem(), reflect.ValueOf(&err).Elem()}
|
|
}
|
|
return []reflect.Value{reply, reflect.ValueOf(&err).Elem()}
|
|
}
|
|
}
|
|
|
|
type GetActorStateRequest struct {
|
|
ActorType string
|
|
ActorID string
|
|
KeyName string
|
|
}
|
|
|
|
type GetActorStateResponse struct {
|
|
Data []byte
|
|
}
|
|
|
|
func (c *GRPCClient) GetActorState(ctx context.Context, in *GetActorStateRequest) (*GetActorStateResponse, error) {
|
|
if in == nil {
|
|
return nil, errors.New("actor get state invocation request param required")
|
|
}
|
|
if in.ActorType == "" {
|
|
return nil, errors.New("actor get state invocation actorType required")
|
|
}
|
|
if in.ActorID == "" {
|
|
return nil, errors.New("actor get state invocation actorID required")
|
|
}
|
|
if in.KeyName == "" {
|
|
return nil, errors.New("actor get state invocation keyName required")
|
|
}
|
|
rsp, err := c.protoClient.GetActorState(ctx, &pb.GetActorStateRequest{
|
|
ActorId: in.ActorID,
|
|
ActorType: in.ActorType,
|
|
Key: in.KeyName,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error invoking actor get state %s/%s: %w", in.ActorType, in.ActorID, err)
|
|
}
|
|
return &GetActorStateResponse{Data: rsp.GetData()}, nil
|
|
}
|
|
|
|
type ActorStateOperation struct {
|
|
OperationType string
|
|
Key string
|
|
Value []byte
|
|
TTLInSeconds *int64
|
|
}
|
|
|
|
func (c *GRPCClient) SaveStateTransactionally(ctx context.Context, actorType, actorID string, operations []*ActorStateOperation) error {
|
|
if len(operations) == 0 {
|
|
return errors.New("actor save state transactionally invocation request param operations is empty")
|
|
}
|
|
if actorType == "" {
|
|
return errors.New("actor save state transactionally invocation actorType required")
|
|
}
|
|
if actorID == "" {
|
|
return errors.New("actor save state transactionally invocation actorID required")
|
|
}
|
|
grpcOperations := make([]*pb.TransactionalActorStateOperation, 0)
|
|
for _, op := range operations {
|
|
var metadata map[string]string
|
|
if op.TTLInSeconds != nil {
|
|
metadata = make(map[string]string)
|
|
metadata[metadataKeyTTLInSeconds] = strconv.FormatInt(*op.TTLInSeconds, 10)
|
|
}
|
|
grpcOperations = append(grpcOperations, &pb.TransactionalActorStateOperation{
|
|
OperationType: op.OperationType,
|
|
Key: op.Key,
|
|
Value: &anypb.Any{
|
|
Value: op.Value,
|
|
},
|
|
Metadata: metadata,
|
|
})
|
|
}
|
|
_, err := c.protoClient.ExecuteActorStateTransaction(ctx, &pb.ExecuteActorStateTransactionRequest{
|
|
ActorType: actorType,
|
|
ActorId: actorID,
|
|
Operations: grpcOperations,
|
|
})
|
|
return err
|
|
}
|