go-sdk/client/subscribe.go

300 lines
7.7 KiB
Go

/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"encoding/json"
"errors"
"fmt"
"mime"
"strings"
"sync"
"sync/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/service/common"
)
type SubscriptionHandleFunction func(event *common.TopicEvent) common.SubscriptionResponseStatus
type SubscriptionOptions struct {
PubsubName string
Topic string
DeadLetterTopic *string
Metadata map[string]string
}
type Subscription struct {
ctx context.Context
stream pb.Dapr_SubscribeTopicEventsAlpha1Client
// lock locks concurrent writes to subscription stream.
lock sync.Mutex
closed atomic.Bool
createStream func(ctx context.Context, opts SubscriptionOptions) (pb.Dapr_SubscribeTopicEventsAlpha1Client, error)
opts SubscriptionOptions
}
type SubscriptionMessage struct {
*common.TopicEvent
sub *Subscription
}
func (c *GRPCClient) Subscribe(ctx context.Context, opts SubscriptionOptions) (*Subscription, error) {
stream, err := c.subscribeInitialRequest(ctx, opts)
if err != nil {
return nil, err
}
s := &Subscription{
ctx: ctx,
stream: stream,
createStream: c.subscribeInitialRequest,
opts: opts,
}
return s, nil
}
func (c *GRPCClient) SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error) {
s, err := c.Subscribe(ctx, opts)
if err != nil {
return nil, err
}
go func() {
defer s.Close()
for {
msg, err := s.Receive()
if err != nil {
if !s.closed.Load() {
logger.Printf("Error receiving messages from subscription pubsub=%s topic=%s, closing subscription: %s",
opts.PubsubName, opts.Topic, err)
}
return
}
go func() {
if err := msg.respondStatus(handler(msg.TopicEvent)); err != nil {
logger.Printf("Error responding to topic with event status pubsub=%s topic=%s message_id=%s: %s",
opts.PubsubName, opts.Topic, msg.ID, err)
}
}()
}
}()
return s.Close, nil
}
func (s *Subscription) Close() error {
if !s.closed.CompareAndSwap(false, true) {
return errors.New("subscription already closed")
}
return s.stream.CloseSend()
}
func (s *Subscription) Receive() (*SubscriptionMessage, error) {
for {
resp, err := s.stream.Recv()
if err != nil {
select {
case <-s.ctx.Done():
return nil, errors.New("subscription context closed")
default:
// proceed to check the gRPC status error
}
st, ok := status.FromError(err)
if !ok {
// not a grpc status error
return nil, err
}
switch st.Code() {
case codes.Unavailable, codes.Unknown:
logger.Printf("gRPC error while reading from stream: %s (code=%v)",
st.Message(), st.Code())
// close the current stream and reconnect
if s.closed.Load() {
return nil, errors.New("subscription is permanently closed; cannot reconnect")
}
if err := s.closeStreamOnly(); err != nil {
logger.Printf("error closing current stream: %v", err)
}
newStream, nerr := s.createStream(s.ctx, s.opts)
if nerr != nil {
return nil, errors.New("re-subscribe failed")
}
s.lock.Lock()
s.stream = newStream
s.lock.Unlock()
// try receiving again
continue
case codes.Canceled:
return nil, errors.New("stream canceled")
default:
return nil, errors.New("subscription recv error")
}
}
event := resp.GetEventMessage()
data := any(event.GetData())
if len(event.GetData()) > 0 {
mediaType, _, err := mime.ParseMediaType(event.GetDataContentType())
if err == nil {
var v interface{}
switch mediaType {
case "application/json":
if err := json.Unmarshal(event.GetData(), &v); err == nil {
data = v
}
case "text/plain":
// Assume UTF-8 encoded string.
data = string(event.GetData())
default:
if strings.HasPrefix(mediaType, "application/") &&
strings.HasSuffix(mediaType, "+json") {
if err := json.Unmarshal(event.GetData(), &v); err == nil {
data = v
}
}
}
}
}
topicEvent := &common.TopicEvent{
ID: event.GetId(),
Source: event.GetSource(),
Type: event.GetType(),
SpecVersion: event.GetSpecVersion(),
DataContentType: event.GetDataContentType(),
Data: data,
RawData: event.GetData(),
Topic: event.GetTopic(),
PubsubName: event.GetPubsubName(),
}
return &SubscriptionMessage{
sub: s,
TopicEvent: topicEvent,
}, nil
}
}
func (s *SubscriptionMessage) Success() error {
return s.respond(pb.TopicEventResponse_SUCCESS)
}
func (s *SubscriptionMessage) Retry() error {
return s.respond(pb.TopicEventResponse_RETRY)
}
func (s *SubscriptionMessage) Drop() error {
return s.respond(pb.TopicEventResponse_DROP)
}
func (s *SubscriptionMessage) respondStatus(status common.SubscriptionResponseStatus) error {
var statuspb pb.TopicEventResponse_TopicEventResponseStatus
switch status {
case common.SubscriptionResponseStatusSuccess:
statuspb = pb.TopicEventResponse_SUCCESS
case common.SubscriptionResponseStatusRetry:
statuspb = pb.TopicEventResponse_RETRY
case common.SubscriptionResponseStatusDrop:
statuspb = pb.TopicEventResponse_DROP
default:
return fmt.Errorf("unknown status, expected one of %s, %s, %s: %s",
common.SubscriptionResponseStatusSuccess, common.SubscriptionResponseStatusRetry,
common.SubscriptionResponseStatusDrop, status)
}
return s.respond(statuspb)
}
func (s *SubscriptionMessage) respond(status pb.TopicEventResponse_TopicEventResponseStatus) error {
s.sub.lock.Lock()
defer s.sub.lock.Unlock()
return s.sub.stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventProcessed{
EventProcessed: &pb.SubscribeTopicEventsRequestProcessedAlpha1{
Id: s.ID,
Status: &pb.TopicEventResponse{Status: status},
},
},
})
}
func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts SubscriptionOptions) (pb.Dapr_SubscribeTopicEventsAlpha1Client, error) {
if len(opts.PubsubName) == 0 {
return nil, errors.New("pubsub name required")
}
if len(opts.Topic) == 0 {
return nil, errors.New("topic required")
}
stream, err := c.protoClient.SubscribeTopicEventsAlpha1(ctx)
if err != nil {
return nil, err
}
err = stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_InitialRequest{
InitialRequest: &pb.SubscribeTopicEventsRequestInitialAlpha1{
PubsubName: opts.PubsubName, Topic: opts.Topic,
Metadata: opts.Metadata, DeadLetterTopic: opts.DeadLetterTopic,
},
},
})
if err != nil {
return nil, errors.Join(err, stream.CloseSend())
}
resp, err := stream.Recv()
if err != nil {
return nil, errors.Join(err, stream.CloseSend())
}
switch resp.GetSubscribeTopicEventsResponseType().(type) {
case *pb.SubscribeTopicEventsResponseAlpha1_InitialResponse:
default:
return nil, fmt.Errorf("unexpected initial response from server : %v", resp)
}
return stream, nil
}
func (s *Subscription) closeStreamOnly() error {
s.lock.Lock()
defer s.lock.Unlock()
if s.stream != nil {
return s.stream.CloseSend()
}
return nil
}