go-sdk/client/actor.go

467 lines
13 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"errors"
"fmt"
"reflect"
"strconv"
"google.golang.org/protobuf/types/known/anypb"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/actor"
"github.com/dapr/go-sdk/actor/codec"
"github.com/dapr/go-sdk/actor/config"
)
const (
metadataKeyTTLInSeconds = "ttlInSeconds"
)
type InvokeActorRequest struct {
ActorType string
ActorID string
Method string
Data []byte
}
type InvokeActorResponse struct {
Data []byte
}
// InvokeActor invokes specific operation on the configured Dapr binding.
// This method covers input, output, and bi-directional bindings.
func (c *GRPCClient) InvokeActor(ctx context.Context, in *InvokeActorRequest) (out *InvokeActorResponse, err error) {
if in == nil {
return nil, errors.New("actor invocation required")
}
if in.Method == "" {
return nil, errors.New("actor invocation method required")
}
if in.ActorType == "" {
return nil, errors.New("actor invocation actorType required")
}
if in.ActorID == "" {
return nil, errors.New("actor invocation actorID required")
}
req := &pb.InvokeActorRequest{
ActorType: in.ActorType,
ActorId: in.ActorID,
Method: in.Method,
Data: in.Data,
}
resp, err := c.protoClient.InvokeActor(ctx, req)
if err != nil {
return nil, fmt.Errorf("error invoking binding %s/%s: %w", in.ActorType, in.ActorID, err)
}
out = &InvokeActorResponse{}
if resp != nil {
out.Data = resp.GetData()
}
return out, nil
}
// ImplActorClientStub impls the given client stub @actorClientStub, an example of client stub is as followed
/*
type ClientStub struct {
// User defined function
GetUser func(context.Context, *User) (*User, error)
Invoke func(context.Context, string) (string, error)
Get func(context.Context) (string, error)
Post func(context.Context, string) error
StartTimer func(context.Context, *TimerRequest) error
StopTimer func(context.Context, *TimerRequest) error
...
}
// Type defined the target type, which should be compatible with server side actor
func (a *ClientStub) Type() string {
return "testActorType"
}
// ID defined actor ID to be invoked
func (a *ClientStub) ID() string {
return "ActorImplID123456"
}.
*/
func (c *GRPCClient) ImplActorClientStub(actorClientStub actor.Client, opt ...config.Option) {
serializerType := config.GetConfigFromOptions(opt...).SerializerType
serializer, err := codec.GetActorCodec(serializerType)
if err != nil {
fmt.Printf("[Actor] ERROR: serializer type %s unsupported\n", serializerType)
return
}
c.implActor(actorClientStub, serializer)
}
type RegisterActorReminderRequest struct {
ActorType string
ActorID string
Name string
DueTime string
Period string
TTL string
Data []byte
}
// RegisterActorReminder registers a new reminder to target actor. Then, a reminder would be created and
// invoke actor's ReminderCall function if implemented.
// If server side actor impls this function, it's asserted to actor.ReminderCallee and can be invoked with call period
// and state data as param @in defined.
// Scheduling parameters 'DueTime', 'Period', and 'TTL' are optional.
func (c *GRPCClient) RegisterActorReminder(ctx context.Context, in *RegisterActorReminderRequest) (err error) {
if in == nil {
return errors.New("actor register reminder invocation request param required")
}
if in.ActorType == "" {
return errors.New("actor register reminder invocation actorType required")
}
if in.ActorID == "" {
return errors.New("actor register reminder invocation actorID required")
}
if in.Name == "" {
return errors.New("actor register reminder invocation name required")
}
req := &pb.RegisterActorReminderRequest{
ActorType: in.ActorType,
ActorId: in.ActorID,
Name: in.Name,
DueTime: in.DueTime,
Period: in.Period,
Ttl: in.TTL,
Data: in.Data,
}
_, err = c.protoClient.RegisterActorReminder(ctx, req)
if err != nil {
return fmt.Errorf("error invoking register actor reminder %s/%s: %w", in.ActorType, in.ActorID, err)
}
return nil
}
type UnregisterActorReminderRequest struct {
ActorType string
ActorID string
Name string
}
// UnregisterActorReminder would unregister the actor reminder.
func (c *GRPCClient) UnregisterActorReminder(ctx context.Context, in *UnregisterActorReminderRequest) error {
if in == nil {
return errors.New("actor unregister reminder invocation request param required")
}
if in.ActorType == "" {
return errors.New("actor unregister reminder invocation actorType required")
}
if in.ActorID == "" {
return errors.New("actor unregister reminder invocation actorID required")
}
if in.Name == "" {
return errors.New("actor unregister reminder invocation name required")
}
req := &pb.UnregisterActorReminderRequest{
ActorType: in.ActorType,
ActorId: in.ActorID,
Name: in.Name,
}
_, err := c.protoClient.UnregisterActorReminder(ctx, req)
if err != nil {
return fmt.Errorf("error invoking unregister actor reminder %s/%s: %w", in.ActorType, in.ActorID, err)
}
return nil
}
type RegisterActorTimerRequest struct {
ActorType string
ActorID string
Name string
DueTime string
Period string
TTL string
Data []byte
CallBack string
}
// RegisterActorTimer register actor timer as given param @in defined.
// Scheduling parameters 'DueTime', 'Period', and 'TTL' are optional.
func (c *GRPCClient) RegisterActorTimer(ctx context.Context, in *RegisterActorTimerRequest) (err error) {
if in == nil {
return errors.New("actor register timer invocation request param required")
}
if in.ActorType == "" {
return errors.New("actor register timer invocation actorType required")
}
if in.ActorID == "" {
return errors.New("actor register timer invocation actorID required")
}
if in.Name == "" {
return errors.New("actor register timer invocation name required")
}
if in.CallBack == "" {
return errors.New("actor register timer invocation callback function required")
}
req := &pb.RegisterActorTimerRequest{
ActorType: in.ActorType,
ActorId: in.ActorID,
Name: in.Name,
DueTime: in.DueTime,
Period: in.Period,
Ttl: in.TTL,
Data: in.Data,
Callback: in.CallBack,
}
_, err = c.protoClient.RegisterActorTimer(ctx, req)
if err != nil {
return fmt.Errorf("error invoking actor register timer %s/%s: %w", in.ActorType, in.ActorID, err)
}
return nil
}
type UnregisterActorTimerRequest struct {
ActorType string
ActorID string
Name string
}
// UnregisterActorTimer unregisters actor timer.
func (c *GRPCClient) UnregisterActorTimer(ctx context.Context, in *UnregisterActorTimerRequest) error {
if in == nil {
return errors.New("actor unregister timer invocation request param required")
}
if in.ActorType == "" {
return errors.New("actor unregister timer invocation actorType required")
}
if in.ActorID == "" {
return errors.New("actor unregister timer invocation actorID required")
}
if in.Name == "" {
return errors.New("actor unregister timer invocation name required")
}
req := &pb.UnregisterActorTimerRequest{
ActorType: in.ActorType,
ActorId: in.ActorID,
Name: in.Name,
}
_, err := c.protoClient.UnregisterActorTimer(ctx, req)
if err != nil {
return fmt.Errorf("error invoking binding %s/%s: %w", in.ActorType, in.ActorID, err)
}
return nil
}
func (c *GRPCClient) implActor(actor actor.Client, serializer codec.Codec) {
actorValue := reflect.ValueOf(actor)
valueOfActor := actorValue.Elem()
typeOfActor := valueOfActor.Type()
// check incoming interface, the incoming interface's elem must be a struct.
if typeOfActor.Kind() != reflect.Struct {
fmt.Println("[Actor] ERROR: impl actor client stub failed, incoming interface is not struct")
return
}
numField := valueOfActor.NumField()
for i := range numField {
t := typeOfActor.Field(i)
methodName := t.Name
if methodName == "Type" {
continue
}
f := valueOfActor.Field(i)
if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() {
outNum := t.Type.NumOut()
if outNum != 1 && outNum != 2 {
fmt.Printf("[Actor] ERROR: method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2\n",
t.Name, t.Type.String(), outNum)
continue
}
// The latest return type of the method must be error.
if returnType := t.Type.Out(outNum - 1); returnType != reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type() {
fmt.Printf("[Actor] ERROR: the latest return type %s of method %q is not error\n", returnType, t.Name)
continue
}
funcOuts := make([]reflect.Type, outNum)
for i := range outNum {
funcOuts[i] = t.Type.Out(i)
}
f.Set(reflect.MakeFunc(f.Type(), c.makeCallProxyFunction(actor, methodName, funcOuts, serializer)))
}
}
}
func (c *GRPCClient) makeCallProxyFunction(actor actor.Client, methodName string, outs []reflect.Type, serializer codec.Codec) func(in []reflect.Value) []reflect.Value {
return func(in []reflect.Value) []reflect.Value {
var (
err error
inIArr []interface{}
reply reflect.Value
)
if len(outs) == 2 {
if outs[0].Kind() == reflect.Ptr {
reply = reflect.New(outs[0].Elem())
} else {
reply = reflect.New(outs[0])
}
}
start := 0
end := len(in)
invCtx := context.Background()
if end > 0 {
if in[0].Type().String() == "context.Context" {
if !in[0].IsNil() {
invCtx = in[0].Interface().(context.Context)
}
start++
}
}
if end-start <= 0 {
inIArr = []interface{}{}
} else if end-start == 1 {
inIArr = []interface{}{in[start].Interface()}
} else {
fmt.Println("[Actor] ERROR: param nums is zero or one is allowed by actor")
return nil
}
var data []byte
if len(inIArr) > 0 {
data, err = serializer.Marshal(inIArr[0])
}
if err != nil {
panic(err)
}
rsp, err := c.InvokeActor(invCtx, &InvokeActorRequest{
ActorType: actor.Type(),
ActorID: actor.ID(),
Method: methodName,
Data: data,
})
if len(outs) == 1 {
return []reflect.Value{reflect.ValueOf(&err).Elem()}
}
response := reply.Interface()
if rsp != nil {
if err = serializer.Unmarshal(rsp.Data, response); err != nil {
fmt.Printf("[Actor] ERROR: unmarshal response err = %v\n", err)
}
}
if len(outs) == 2 && outs[0].Kind() != reflect.Ptr {
return []reflect.Value{reply.Elem(), reflect.ValueOf(&err).Elem()}
}
return []reflect.Value{reply, reflect.ValueOf(&err).Elem()}
}
}
type GetActorStateRequest struct {
ActorType string
ActorID string
KeyName string
}
type GetActorStateResponse struct {
Data []byte
}
func (c *GRPCClient) GetActorState(ctx context.Context, in *GetActorStateRequest) (*GetActorStateResponse, error) {
if in == nil {
return nil, errors.New("actor get state invocation request param required")
}
if in.ActorType == "" {
return nil, errors.New("actor get state invocation actorType required")
}
if in.ActorID == "" {
return nil, errors.New("actor get state invocation actorID required")
}
if in.KeyName == "" {
return nil, errors.New("actor get state invocation keyName required")
}
rsp, err := c.protoClient.GetActorState(ctx, &pb.GetActorStateRequest{
ActorId: in.ActorID,
ActorType: in.ActorType,
Key: in.KeyName,
})
if err != nil {
return nil, fmt.Errorf("error invoking actor get state %s/%s: %w", in.ActorType, in.ActorID, err)
}
return &GetActorStateResponse{Data: rsp.GetData()}, nil
}
type ActorStateOperation struct {
OperationType string
Key string
Value []byte
TTLInSeconds *int64
}
func (c *GRPCClient) SaveStateTransactionally(ctx context.Context, actorType, actorID string, operations []*ActorStateOperation) error {
if len(operations) == 0 {
return errors.New("actor save state transactionally invocation request param operations is empty")
}
if actorType == "" {
return errors.New("actor save state transactionally invocation actorType required")
}
if actorID == "" {
return errors.New("actor save state transactionally invocation actorID required")
}
grpcOperations := make([]*pb.TransactionalActorStateOperation, 0)
for _, op := range operations {
var metadata map[string]string
if op.TTLInSeconds != nil {
metadata = make(map[string]string)
metadata[metadataKeyTTLInSeconds] = strconv.FormatInt(*op.TTLInSeconds, 10)
}
grpcOperations = append(grpcOperations, &pb.TransactionalActorStateOperation{
OperationType: op.OperationType,
Key: op.Key,
Value: &anypb.Any{
Value: op.Value,
},
Metadata: metadata,
})
}
_, err := c.protoClient.ExecuteActorStateTransaction(ctx, &pb.ExecuteActorStateTransactionRequest{
ActorType: actorType,
ActorId: actorID,
Operations: grpcOperations,
})
return err
}