diff --git a/.github/workflows/release-on-tag.yaml b/.github/workflows/release-on-tag.yaml index 68be5b3..58e3f5f 100644 --- a/.github/workflows/release-on-tag.yaml +++ b/.github/workflows/release-on-tag.yaml @@ -38,8 +38,9 @@ jobs: run: | echo "RELEASE_VERSION=$(echo ${GITHUB_REF:10})" >> $GITHUB_ENV - - name: Release + - name: Release Main uses: actions/create-release@v1 + if: ${{ !contains(github.ref , 'rc') }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: @@ -49,6 +50,18 @@ jobs: draft: false prerelease: false + - name: Release RC + uses: actions/create-release@v1 + if: ${{ contains(github.ref, 'rc') }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{ github.ref }} + release_name: Release ${{ github.ref }} + body: Automatic Go Dapr client release + draft: false + prerelease: true + - name: Notify uses: rjstone/discord-webhook-notify@v1 with: diff --git a/.github/workflows/validate_examples.yaml b/.github/workflows/validate_examples.yaml index 9015363..e6acf7b 100644 --- a/.github/workflows/validate_examples.yaml +++ b/.github/workflows/validate_examples.yaml @@ -38,9 +38,9 @@ jobs: CHECKOUT_REF: ${{ github.ref }} outputs: DAPR_INSTALL_URL: ${{ env.DAPR_INSTALL_URL }} - DAPR_CLI_VER: 1.15.0-rc.4 + DAPR_CLI_VER: ${{ steps.outputs.outputs.DAPR_CLI_VER }} DAPR_CLI_REF: ${{ steps.outputs.outputs.DAPR_CLI_REF }} - DAPR_RUNTIME_VER: 1.15.0-rc.9 + DAPR_RUNTIME_VER: ${{ steps.outputs.outputs.DAPR_RUNTIME_VER }} CHECKOUT_REPO: ${{ steps.outputs.outputs.CHECKOUT_REPO }} CHECKOUT_REF: ${{ steps.outputs.outputs.CHECKOUT_REF }} DAPR_REF: ${{ steps.outputs.outputs.DAPR_REF }} diff --git a/client/conversation.go b/client/conversation.go index 2c35faa..0109426 100644 --- a/client/conversation.go +++ b/client/conversation.go @@ -44,8 +44,8 @@ type conversationRequestOption func(request *conversationRequest) // ConversationInput defines a single input. type ConversationInput struct { - // The string to send to the llm. - Message string + // The content to send to the llm. + Content string // The role of the message. Role *string // Whether to Scrub PII from the input @@ -104,7 +104,7 @@ func (c *GRPCClient) ConverseAlpha1(ctx context.Context, req conversationRequest cinputs := make([]*runtimev1pb.ConversationInput, len(req.inputs)) for i, in := range req.inputs { cinputs[i] = &runtimev1pb.ConversationInput{ - Message: in.Message, + Content: in.Content, Role: in.Role, ScrubPII: in.ScrubPII, } diff --git a/client/subscribe.go b/client/subscribe.go index e59da80..4bfa48d 100644 --- a/client/subscribe.go +++ b/client/subscribe.go @@ -23,6 +23,9 @@ import ( "sync" "sync/atomic" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + pb "github.com/dapr/dapr/pkg/proto/runtime/v1" "github.com/dapr/go-sdk/service/common" ) @@ -37,10 +40,15 @@ type SubscriptionOptions struct { } type Subscription struct { + ctx context.Context stream pb.Dapr_SubscribeTopicEventsAlpha1Client + // lock locks concurrent writes to subscription stream. lock sync.Mutex closed atomic.Bool + + createStream func(ctx context.Context, opts SubscriptionOptions) (pb.Dapr_SubscribeTopicEventsAlpha1Client, error) + opts SubscriptionOptions } type SubscriptionMessage struct { @@ -55,7 +63,10 @@ func (c *GRPCClient) Subscribe(ctx context.Context, opts SubscriptionOptions) (* } s := &Subscription{ - stream: stream, + ctx: ctx, + stream: stream, + createStream: c.subscribeInitialRequest, + opts: opts, } return s, nil @@ -101,52 +112,96 @@ func (s *Subscription) Close() error { } func (s *Subscription) Receive() (*SubscriptionMessage, error) { - resp, err := s.stream.Recv() - if err != nil { - return nil, err - } - event := resp.GetEventMessage() - - data := any(event.GetData()) - if len(event.GetData()) > 0 { - mediaType, _, err := mime.ParseMediaType(event.GetDataContentType()) - if err == nil { - var v interface{} - switch mediaType { - case "application/json": - if err := json.Unmarshal(event.GetData(), &v); err == nil { - data = v - } - case "text/plain": - // Assume UTF-8 encoded string. - data = string(event.GetData()) + for { + resp, err := s.stream.Recv() + if err != nil { + select { + case <-s.ctx.Done(): + return nil, errors.New("subscription context closed") default: - if strings.HasPrefix(mediaType, "application/") && - strings.HasSuffix(mediaType, "+json") { + // proceed to check the gRPC status error + } + + st, ok := status.FromError(err) + if !ok { + // not a grpc status error + return nil, err + } + + switch st.Code() { + case codes.Unavailable, codes.Unknown: + logger.Printf("gRPC error while reading from stream: %s (code=%v)", + st.Message(), st.Code()) + // close the current stream and reconnect + if s.closed.Load() { + return nil, errors.New("subscription is permanently closed; cannot reconnect") + } + if err := s.closeStreamOnly(); err != nil { + logger.Printf("error closing current stream: %v", err) + } + + newStream, nerr := s.createStream(s.ctx, s.opts) + if nerr != nil { + return nil, errors.New("re-subscribe failed") + } + + s.lock.Lock() + s.stream = newStream + s.lock.Unlock() + + // try receiving again + continue + + case codes.Canceled: + return nil, errors.New("stream canceled") + + default: + return nil, errors.New("subscription recv error") + } + } + + event := resp.GetEventMessage() + data := any(event.GetData()) + if len(event.GetData()) > 0 { + mediaType, _, err := mime.ParseMediaType(event.GetDataContentType()) + if err == nil { + var v interface{} + switch mediaType { + case "application/json": if err := json.Unmarshal(event.GetData(), &v); err == nil { data = v } + case "text/plain": + // Assume UTF-8 encoded string. + data = string(event.GetData()) + default: + if strings.HasPrefix(mediaType, "application/") && + strings.HasSuffix(mediaType, "+json") { + if err := json.Unmarshal(event.GetData(), &v); err == nil { + data = v + } + } } } } - } - topicEvent := &common.TopicEvent{ - ID: event.GetId(), - Source: event.GetSource(), - Type: event.GetType(), - SpecVersion: event.GetSpecVersion(), - DataContentType: event.GetDataContentType(), - Data: data, - RawData: event.GetData(), - Topic: event.GetTopic(), - PubsubName: event.GetPubsubName(), - } + topicEvent := &common.TopicEvent{ + ID: event.GetId(), + Source: event.GetSource(), + Type: event.GetType(), + SpecVersion: event.GetSpecVersion(), + DataContentType: event.GetDataContentType(), + Data: data, + RawData: event.GetData(), + Topic: event.GetTopic(), + PubsubName: event.GetPubsubName(), + } - return &SubscriptionMessage{ - sub: s, - TopicEvent: topicEvent, - }, nil + return &SubscriptionMessage{ + sub: s, + TopicEvent: topicEvent, + }, nil + } } func (s *SubscriptionMessage) Success() error { @@ -232,3 +287,13 @@ func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts Subscript return stream, nil } + +func (s *Subscription) closeStreamOnly() error { + s.lock.Lock() + defer s.lock.Unlock() + + if s.stream != nil { + return s.stream.CloseSend() + } + return nil +} diff --git a/daprdocs/content/en/go-sdk-docs/go-client/_index.md b/daprdocs/content/en/go-sdk-docs/go-client/_index.md index ad1e65a..9561cb2 100644 --- a/daprdocs/content/en/go-sdk-docs/go-client/_index.md +++ b/daprdocs/content/en/go-sdk-docs/go-client/_index.md @@ -88,6 +88,91 @@ resp, err = client.InvokeMethodWithContent(ctx, "app-id", "method-name", "post", For a full guide on service invocation, visit [How-To: Invoke a service]({{< ref howto-invoke-discover-services.md >}}). +### Workflows + +Workflows and their activities can be authored and managed using the Dapr Go SDK like so: + +```go +import ( +... +"github.com/dapr/go-sdk/workflow" +... +) + +func ExampleWorkflow(ctx *workflow.WorkflowContext) (any, error) { + var output string + input := "world" + + if err := ctx.CallActivity(ExampleActivity, workflow.ActivityInput(input)).Await(&output); err != nil { + return nil, err + } + + // Print output - "hello world" + fmt.Println(output) + + return nil, nil +} + +func ExampleActivity(ctx workflow.ActivityContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return "", err + } + + return fmt.Sprintf("hello %s", input), nil +} + +func main() { + // Create a workflow worker + w, err := workflow.NewWorker() + if err != nil { + log.Fatalf("error creating worker: %v", err) + } + + // Register the workflow + w.RegisterWorkflow(ExampleWorkflow) + + // Register the activity + w.RegisterActivity(ExampleActivity) + + // Start workflow runner + if err := w.Start(); err != nil { + log.Fatal(err) + } + + // Create a workflow client + wfClient, err := workflow.NewClient() + if err != nil { + log.Fatal(err) + } + + // Start a new workflow + id, err := wfClient.ScheduleNewWorkflow(context.Background(), "ExampleWorkflow") + if err != nil { + log.Fatal(err) + } + + // Wait for the workflow to complete + metadata, err := wfClient.WaitForWorkflowCompletion(ctx, id) + if err != nil { + log.Fatal(err) + } + + // Print workflow status post-completion + fmt.Println(metadata.RuntimeStatus) + + // Shutdown Worker + w.Shutdown() +} +``` + +- For a more comprehensive guide on workflows visit these How-To guides: + - [How-To: Author a workflow]({{< ref howto-author-workflow.md >}}). + - [How-To: Manage a workflow]({{< ref howto-manage-workflow.md >}}). +- Visit the Go SDK Examples to jump into complete examples: + - [Workflow Example](https://github.com/dapr/go-sdk/tree/main/examples/workflow) + - [Workflow - Parallelised](https://github.com/dapr/go-sdk/tree/main/examples/workflow-parallel) + ### State Management For simple use-cases, Dapr client provides easy to use `Save`, `Get`, `Delete` methods: diff --git a/examples/conversation/main.go b/examples/conversation/main.go index 00b347e..34bdc81 100644 --- a/examples/conversation/main.go +++ b/examples/conversation/main.go @@ -17,8 +17,9 @@ package main import ( "context" "fmt" - dapr "github.com/dapr/go-sdk/client" "log" + + dapr "github.com/dapr/go-sdk/client" ) func main() { @@ -28,12 +29,12 @@ func main() { } input := dapr.ConversationInput{ - Message: "hello world", + Content: "hello world", // Role: nil, // Optional // ScrubPII: nil, // Optional } - fmt.Printf("conversation input: %s\n", input.Message) + fmt.Printf("conversation input: %s\n", input.Content) var conversationComponent = "echo" diff --git a/examples/go.mod b/examples/go.mod index aa0d45b..29ec1ea 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -1,6 +1,6 @@ module github.com/dapr/go-sdk/examples -go 1.23.5 +go 1.23.6 replace github.com/dapr/go-sdk => ../ @@ -18,9 +18,9 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/dapr/dapr v1.15.0-rc.9 // indirect - github.com/dapr/durabletask-go v0.5.1-0.20250124181508-a1b42ae65aee // indirect - github.com/dapr/kit v0.13.1-0.20250129050741-c46009f360b0 // indirect + github.com/dapr/dapr v1.15.0-rc.17 // indirect + github.com/dapr/durabletask-go v0.6.3 // indirect + github.com/dapr/kit v0.15.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-chi/chi/v5 v5.1.0 // indirect github.com/go-logr/logr v1.4.2 // indirect diff --git a/examples/go.sum b/examples/go.sum index 3dad7d9..39285c3 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -6,12 +6,12 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/dapr/dapr v1.15.0-rc.9 h1:70JmZiTimPGe5jDcwaUJJ0Mg2jE9P+5RwalGaISJN6I= -github.com/dapr/dapr v1.15.0-rc.9/go.mod h1:AIX1bK+nEsIq3qWGXz3gVp/OLyUTyGBMLhSzvU9jook= -github.com/dapr/durabletask-go v0.5.1-0.20250124181508-a1b42ae65aee h1:uRleodP/a0X360mVtSau97wygHtIdQY5DZ9oMe1s0DY= -github.com/dapr/durabletask-go v0.5.1-0.20250124181508-a1b42ae65aee/go.mod h1:nTZ5fCbJLnZbVdi6Z2YxdDF1OgQZL3LroogGuetrwuA= -github.com/dapr/kit v0.13.1-0.20250129050741-c46009f360b0 h1:NDEXog+/PDZoHquVQKj495UIbfrtejH+R+NlHQoAc9U= -github.com/dapr/kit v0.13.1-0.20250129050741-c46009f360b0/go.mod h1:HwFsBKEbcyLanWlDZE7u/jnaDCD/tU+n3pkFNUctQNw= +github.com/dapr/dapr v1.15.0-rc.17 h1:bR0rd4FH81IteuOHTWVNyl58ZuQTDp3DYaTtXnpZ6JA= +github.com/dapr/dapr v1.15.0-rc.17/go.mod h1:SD0AXom2XpX7pr8eYlbJ+gHfNREsflsrzCR19AZJ7/Q= +github.com/dapr/durabletask-go v0.6.3 h1:WHhSAw1YL4xneK3Jo5nGfmMaJxfFodIIF5q1rpkDDfs= +github.com/dapr/durabletask-go v0.6.3/go.mod h1:nTZ5fCbJLnZbVdi6Z2YxdDF1OgQZL3LroogGuetrwuA= +github.com/dapr/kit v0.15.0 h1:446jrEOQV/0rt6FwmoKrifP3vav5+Uh/u38DqU8q+JM= +github.com/dapr/kit v0.15.0/go.mod h1:HwFsBKEbcyLanWlDZE7u/jnaDCD/tU+n3pkFNUctQNw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= diff --git a/examples/service/README.md b/examples/service/README.md index 7a536b6..7fe90ef 100644 --- a/examples/service/README.md +++ b/examples/service/README.md @@ -91,7 +91,7 @@ expected_stdout_lines: ```bash dapr run --app-id custom-grpc-client \ -d ./config \ - --dapr-http-max-request-size 41 \ + --max-body-size 41Mi \ --log-level debug \ go run ./custom-grpc-client/main.go ``` diff --git a/go.mod b/go.mod index be54c4e..5aa38d8 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,10 @@ module github.com/dapr/go-sdk -go 1.23.5 +go 1.23.6 require ( - github.com/dapr/dapr v1.15.0-rc.9 - github.com/dapr/durabletask-go v0.5.1-0.20250124181508-a1b42ae65aee + github.com/dapr/dapr v1.15.0-rc.17 + github.com/dapr/durabletask-go v0.6.3 github.com/go-chi/chi/v5 v5.1.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 @@ -16,7 +16,7 @@ require ( require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/dapr/kit v0.13.1-0.20250129050741-c46009f360b0 // indirect + github.com/dapr/kit v0.15.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index de7d751..a477f78 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,11 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/dapr/dapr v1.15.0-rc.9 h1:70JmZiTimPGe5jDcwaUJJ0Mg2jE9P+5RwalGaISJN6I= -github.com/dapr/dapr v1.15.0-rc.9/go.mod h1:AIX1bK+nEsIq3qWGXz3gVp/OLyUTyGBMLhSzvU9jook= -github.com/dapr/durabletask-go v0.5.1-0.20250124181508-a1b42ae65aee h1:uRleodP/a0X360mVtSau97wygHtIdQY5DZ9oMe1s0DY= -github.com/dapr/durabletask-go v0.5.1-0.20250124181508-a1b42ae65aee/go.mod h1:nTZ5fCbJLnZbVdi6Z2YxdDF1OgQZL3LroogGuetrwuA= -github.com/dapr/kit v0.13.1-0.20250129050741-c46009f360b0 h1:NDEXog+/PDZoHquVQKj495UIbfrtejH+R+NlHQoAc9U= -github.com/dapr/kit v0.13.1-0.20250129050741-c46009f360b0/go.mod h1:HwFsBKEbcyLanWlDZE7u/jnaDCD/tU+n3pkFNUctQNw= +github.com/dapr/dapr v1.15.0-rc.17 h1:bR0rd4FH81IteuOHTWVNyl58ZuQTDp3DYaTtXnpZ6JA= +github.com/dapr/dapr v1.15.0-rc.17/go.mod h1:SD0AXom2XpX7pr8eYlbJ+gHfNREsflsrzCR19AZJ7/Q= +github.com/dapr/durabletask-go v0.6.3 h1:WHhSAw1YL4xneK3Jo5nGfmMaJxfFodIIF5q1rpkDDfs= +github.com/dapr/durabletask-go v0.6.3/go.mod h1:nTZ5fCbJLnZbVdi6Z2YxdDF1OgQZL3LroogGuetrwuA= +github.com/dapr/kit v0.15.0 h1:446jrEOQV/0rt6FwmoKrifP3vav5+Uh/u38DqU8q+JM= +github.com/dapr/kit v0.15.0/go.mod h1:HwFsBKEbcyLanWlDZE7u/jnaDCD/tU+n3pkFNUctQNw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= diff --git a/service/common/type.go b/service/common/type.go index 250e0de..fda3307 100644 --- a/service/common/type.go +++ b/service/common/type.go @@ -107,6 +107,8 @@ type Subscription struct { Priority int `json:"priority"` // DisableTopicValidation allows to receive events from publisher topics that differ from the subscribed topic. DisableTopicValidation bool `json:"disableTopicValidation"` + // DeadLetterTopic is the name of the deadletter topic. + DeadLetterTopic string `json:"deadLetterTopic"` } type SubscriptionResponseStatus string diff --git a/service/grpc/topic.go b/service/grpc/topic.go index 14f3ee0..b3ef4dc 100644 --- a/service/grpc/topic.go +++ b/service/grpc/topic.go @@ -53,10 +53,11 @@ func (s *Server) ListTopicSubscriptions(ctx context.Context, in *emptypb.Empty) for _, v := range s.topicRegistrar { s := v.Subscription sub := &runtimev1pb.TopicSubscription{ - PubsubName: s.PubsubName, - Topic: s.Topic, - Metadata: s.Metadata, - Routes: convertRoutes(s.Routes), + PubsubName: s.PubsubName, + Topic: s.Topic, + Metadata: s.Metadata, + Routes: convertRoutes(s.Routes), + DeadLetterTopic: s.DeadLetterTopic, } subs = append(subs, sub) } diff --git a/service/internal/topicregistrar.go b/service/internal/topicregistrar.go index 1ff8262..67af946 100644 --- a/service/internal/topicregistrar.go +++ b/service/internal/topicregistrar.go @@ -39,7 +39,7 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi ts, ok := m[key] if !ok { ts = &TopicRegistration{ - Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic), + Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic, sub.DeadLetterTopic), RouteHandlers: make(map[string]common.TopicEventSubscriber), DefaultHandler: nil, } diff --git a/service/internal/topicsubscription.go b/service/internal/topicsubscription.go index ca62899..aeb10d3 100644 --- a/service/internal/topicsubscription.go +++ b/service/internal/topicsubscription.go @@ -18,6 +18,8 @@ type TopicSubscription struct { Routes *TopicRoutes `json:"routes,omitempty"` // Metadata is the subscription metadata. Metadata map[string]string `json:"metadata,omitempty"` + // DeadLetterTopic is the name of the deadletter topic. + DeadLetterTopic string `json:"deadLetterTopic"` } // TopicRoutes encapsulates the default route and multiple routing rules. @@ -42,10 +44,11 @@ type TopicRule struct { } // NewTopicSubscription creates a new `TopicSubscription`. -func NewTopicSubscription(pubsubName, topic string) *TopicSubscription { +func NewTopicSubscription(pubsubName, topic, deadLetterTopic string) *TopicSubscription { return &TopicSubscription{ //nolint:exhaustivestruct - PubsubName: pubsubName, - Topic: topic, + PubsubName: pubsubName, + Topic: topic, + DeadLetterTopic: deadLetterTopic, } } diff --git a/service/internal/topicsubscription_test.go b/service/internal/topicsubscription_test.go index 3762b36..6aa3e9f 100644 --- a/service/internal/topicsubscription_test.go +++ b/service/internal/topicsubscription_test.go @@ -12,7 +12,7 @@ import ( func TestTopicSubscripiton(t *testing.T) { t.Run("duplicate metadata", func(t *testing.T) { - sub := internal.NewTopicSubscription("test", "mytopic") + sub := internal.NewTopicSubscription("test", "mytopic", "") require.NoError(t, sub.SetMetadata(map[string]string{ "test": "test", })) @@ -22,7 +22,7 @@ func TestTopicSubscripiton(t *testing.T) { }) t.Run("duplicate route", func(t *testing.T) { - sub := internal.NewTopicSubscription("test", "mytopic") + sub := internal.NewTopicSubscription("test", "mytopic", "") require.NoError(t, sub.SetDefaultRoute("/test")) assert.Equal(t, "/test", sub.Route) require.EqualError(t, sub.SetDefaultRoute("/test"), @@ -30,7 +30,7 @@ func TestTopicSubscripiton(t *testing.T) { }) t.Run("duplicate route after routing rule", func(t *testing.T) { - sub := internal.NewTopicSubscription("test", "mytopic") + sub := internal.NewTopicSubscription("test", "mytopic", "") require.NoError(t, sub.AddRoutingRule("/other", `event.type == "test"`, 0)) require.NoError(t, sub.SetDefaultRoute("/test")) require.EqualError(t, sub.SetDefaultRoute("/test"), @@ -38,7 +38,7 @@ func TestTopicSubscripiton(t *testing.T) { }) t.Run("default route after routing rule", func(t *testing.T) { - sub := internal.NewTopicSubscription("test", "mytopic") + sub := internal.NewTopicSubscription("test", "mytopic", "") require.NoError(t, sub.SetDefaultRoute("/test")) assert.Equal(t, "/test", sub.Route) require.NoError(t, sub.AddRoutingRule("/other", `event.type == "test"`, 0)) @@ -49,14 +49,14 @@ func TestTopicSubscripiton(t *testing.T) { }) t.Run("duplicate routing rule priority", func(t *testing.T) { - sub := internal.NewTopicSubscription("test", "mytopic") + sub := internal.NewTopicSubscription("test", "mytopic", "") require.NoError(t, sub.AddRoutingRule("/other", `event.type == "other"`, 1)) require.EqualError(t, sub.AddRoutingRule("/test", `event.type == "test"`, 1), "subscription for topic mytopic on pubsub test already has a routing rule with priority 1") }) t.Run("priority ordering", func(t *testing.T) { - sub := internal.NewTopicSubscription("test", "mytopic") + sub := internal.NewTopicSubscription("test", "mytopic", "") require.NoError(t, sub.AddRoutingRule("/100", `event.type == "100"`, 100)) require.NoError(t, sub.AddRoutingRule("/1", `event.type == "1"`, 1)) require.NoError(t, sub.AddRoutingRule("/50", `event.type == "50"`, 50)) diff --git a/version/sdk-version b/version/sdk-version index 32da86c..13a15bd 100644 --- a/version/sdk-version +++ b/version/sdk-version @@ -1 +1 @@ -v1.12.0-rc.2 +v1.12.0 \ No newline at end of file diff --git a/workflow/worker.go b/workflow/worker.go index dc9e571..a37e543 100644 --- a/workflow/worker.go +++ b/workflow/worker.go @@ -125,7 +125,13 @@ func wrapActivity(a Activity) task.Activity { return func(ctx task.ActivityContext) (any, error) { aCtx := ActivityContext{ctx: ctx} - return a(aCtx) + result, err := a(aCtx) + if err != nil { + activityName, _ := getFunctionName(a) // Get the activity name for context + return nil, fmt.Errorf("activity %s failed: %w", activityName, err) + } + + return result, nil } }