mirror of https://github.com/dapr/go-sdk.git
151 lines
5.2 KiB
Go
151 lines
5.2 KiB
Go
/*
|
|
Copyright 2024 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
package workflow
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"google.golang.org/protobuf/types/known/wrapperspb"
|
|
|
|
"github.com/dapr/durabletask-go/api/protos"
|
|
"github.com/dapr/durabletask-go/task"
|
|
)
|
|
|
|
type Metadata struct {
|
|
InstanceID string `json:"id"`
|
|
Name string `json:"name"`
|
|
RuntimeStatus Status `json:"status"`
|
|
CreatedAt time.Time `json:"createdAt"`
|
|
LastUpdatedAt time.Time `json:"lastUpdatedAt"`
|
|
SerializedInput string `json:"serializedInput"`
|
|
SerializedOutput string `json:"serializedOutput"`
|
|
SerializedCustomStatus string `json:"serializedCustomStatus"`
|
|
FailureDetails *FailureDetails `json:"failureDetails"`
|
|
}
|
|
|
|
type FailureDetails struct {
|
|
Type string `json:"type"`
|
|
Message string `json:"message"`
|
|
StackTrace string `json:"stackTrace"`
|
|
InnerFailure *FailureDetails `json:"innerFailure"`
|
|
IsNonRetriable bool `json:"IsNonRetriable"`
|
|
}
|
|
|
|
func convertMetadata(orchestrationMetadata *protos.OrchestrationMetadata) *Metadata {
|
|
metadata := Metadata{
|
|
InstanceID: orchestrationMetadata.GetInstanceId(),
|
|
Name: orchestrationMetadata.GetName(),
|
|
RuntimeStatus: Status(orchestrationMetadata.GetRuntimeStatus().Number()),
|
|
CreatedAt: orchestrationMetadata.GetCreatedAt().AsTime(),
|
|
LastUpdatedAt: orchestrationMetadata.GetLastUpdatedAt().AsTime(),
|
|
SerializedInput: orchestrationMetadata.GetInput().GetValue(),
|
|
SerializedOutput: orchestrationMetadata.GetOutput().GetValue(),
|
|
SerializedCustomStatus: orchestrationMetadata.GetCustomStatus().GetValue(),
|
|
}
|
|
if orchestrationMetadata.GetFailureDetails() != nil {
|
|
metadata.FailureDetails = &FailureDetails{
|
|
Type: orchestrationMetadata.GetFailureDetails().GetErrorType(),
|
|
Message: orchestrationMetadata.GetFailureDetails().GetErrorMessage(),
|
|
StackTrace: orchestrationMetadata.GetFailureDetails().GetStackTrace().GetValue(),
|
|
IsNonRetriable: orchestrationMetadata.GetFailureDetails().GetIsNonRetriable(),
|
|
}
|
|
|
|
if orchestrationMetadata.GetFailureDetails().GetInnerFailure() != nil {
|
|
var root *FailureDetails
|
|
current := root
|
|
failure := orchestrationMetadata.GetFailureDetails().GetInnerFailure()
|
|
for {
|
|
current.Type = failure.GetErrorType()
|
|
current.Message = failure.GetErrorMessage()
|
|
if failure.GetStackTrace() != nil {
|
|
current.StackTrace = failure.GetStackTrace().GetValue()
|
|
}
|
|
if failure.GetInnerFailure() == nil {
|
|
break
|
|
}
|
|
failure = failure.GetInnerFailure()
|
|
var inner *FailureDetails
|
|
current.InnerFailure = inner
|
|
current = inner
|
|
}
|
|
metadata.FailureDetails.InnerFailure = root
|
|
}
|
|
}
|
|
return &metadata
|
|
}
|
|
|
|
type callChildWorkflowOptions struct {
|
|
instanceID string
|
|
rawInput *wrapperspb.StringValue
|
|
retryPolicy *RetryPolicy
|
|
}
|
|
|
|
type callChildWorkflowOption func(*callChildWorkflowOptions) error
|
|
|
|
// ChildWorkflowInput is an option to provide a JSON-serializable input when calling a child workflow.
|
|
func ChildWorkflowInput(input any) callChildWorkflowOption {
|
|
return func(opts *callChildWorkflowOptions) error {
|
|
bytes, err := marshalData(input)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal input data to JSON: %v", err)
|
|
}
|
|
opts.rawInput = wrapperspb.String(string(bytes))
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// ChildWorkflowRawInput is an option to provide a byte slice input when calling a child workflow.
|
|
func ChildWorkflowRawInput(input string) callChildWorkflowOption {
|
|
return func(opts *callChildWorkflowOptions) error {
|
|
opts.rawInput = wrapperspb.String(input)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// ChildWorkflowInstanceID is an option to provide an instance id when calling a child workflow.
|
|
func ChildWorkflowInstanceID(instanceID string) callChildWorkflowOption {
|
|
return func(opts *callChildWorkflowOptions) error {
|
|
opts.instanceID = instanceID
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func ChildWorkflowRetryPolicy(policy RetryPolicy) callChildWorkflowOption {
|
|
return func(opts *callChildWorkflowOptions) error {
|
|
opts.retryPolicy = &policy
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (opts *callChildWorkflowOptions) getRetryPolicy() *task.RetryPolicy {
|
|
if opts.retryPolicy == nil {
|
|
return nil
|
|
}
|
|
return &task.RetryPolicy{
|
|
MaxAttempts: opts.retryPolicy.MaxAttempts,
|
|
InitialRetryInterval: opts.retryPolicy.InitialRetryInterval,
|
|
BackoffCoefficient: opts.retryPolicy.BackoffCoefficient,
|
|
MaxRetryInterval: opts.retryPolicy.MaxRetryInterval,
|
|
RetryTimeout: opts.retryPolicy.RetryTimeout,
|
|
}
|
|
}
|
|
|
|
// NewTaskSlice returns a slice of tasks which can be executed in parallel
|
|
func NewTaskSlice(length int) []task.Task {
|
|
taskSlice := make([]task.Task, length)
|
|
return taskSlice
|
|
}
|