mirror of https://github.com/dapr/go-sdk.git
300 lines
7.7 KiB
Go
300 lines
7.7 KiB
Go
/*
|
|
Copyright 2024 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package client
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"mime"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
|
|
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
|
|
"github.com/dapr/go-sdk/service/common"
|
|
)
|
|
|
|
type SubscriptionHandleFunction func(event *common.TopicEvent) common.SubscriptionResponseStatus
|
|
|
|
type SubscriptionOptions struct {
|
|
PubsubName string
|
|
Topic string
|
|
DeadLetterTopic *string
|
|
Metadata map[string]string
|
|
}
|
|
|
|
type Subscription struct {
|
|
ctx context.Context
|
|
stream pb.Dapr_SubscribeTopicEventsAlpha1Client
|
|
|
|
// lock locks concurrent writes to subscription stream.
|
|
lock sync.Mutex
|
|
closed atomic.Bool
|
|
|
|
createStream func(ctx context.Context, opts SubscriptionOptions) (pb.Dapr_SubscribeTopicEventsAlpha1Client, error)
|
|
opts SubscriptionOptions
|
|
}
|
|
|
|
type SubscriptionMessage struct {
|
|
*common.TopicEvent
|
|
sub *Subscription
|
|
}
|
|
|
|
func (c *GRPCClient) Subscribe(ctx context.Context, opts SubscriptionOptions) (*Subscription, error) {
|
|
stream, err := c.subscribeInitialRequest(ctx, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s := &Subscription{
|
|
ctx: ctx,
|
|
stream: stream,
|
|
createStream: c.subscribeInitialRequest,
|
|
opts: opts,
|
|
}
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func (c *GRPCClient) SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error) {
|
|
s, err := c.Subscribe(ctx, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
go func() {
|
|
defer s.Close()
|
|
|
|
for {
|
|
msg, err := s.Receive()
|
|
if err != nil {
|
|
if !s.closed.Load() {
|
|
logger.Printf("Error receiving messages from subscription pubsub=%s topic=%s, closing subscription: %s",
|
|
opts.PubsubName, opts.Topic, err)
|
|
}
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
if err := msg.respondStatus(handler(msg.TopicEvent)); err != nil {
|
|
logger.Printf("Error responding to topic with event status pubsub=%s topic=%s message_id=%s: %s",
|
|
opts.PubsubName, opts.Topic, msg.ID, err)
|
|
}
|
|
}()
|
|
}
|
|
}()
|
|
|
|
return s.Close, nil
|
|
}
|
|
|
|
func (s *Subscription) Close() error {
|
|
if !s.closed.CompareAndSwap(false, true) {
|
|
return errors.New("subscription already closed")
|
|
}
|
|
|
|
return s.stream.CloseSend()
|
|
}
|
|
|
|
func (s *Subscription) Receive() (*SubscriptionMessage, error) {
|
|
for {
|
|
resp, err := s.stream.Recv()
|
|
if err != nil {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return nil, errors.New("subscription context closed")
|
|
default:
|
|
// proceed to check the gRPC status error
|
|
}
|
|
|
|
st, ok := status.FromError(err)
|
|
if !ok {
|
|
// not a grpc status error
|
|
return nil, err
|
|
}
|
|
|
|
switch st.Code() {
|
|
case codes.Unavailable, codes.Unknown:
|
|
logger.Printf("gRPC error while reading from stream: %s (code=%v)",
|
|
st.Message(), st.Code())
|
|
// close the current stream and reconnect
|
|
if s.closed.Load() {
|
|
return nil, errors.New("subscription is permanently closed; cannot reconnect")
|
|
}
|
|
if err := s.closeStreamOnly(); err != nil {
|
|
logger.Printf("error closing current stream: %v", err)
|
|
}
|
|
|
|
newStream, nerr := s.createStream(s.ctx, s.opts)
|
|
if nerr != nil {
|
|
return nil, errors.New("re-subscribe failed")
|
|
}
|
|
|
|
s.lock.Lock()
|
|
s.stream = newStream
|
|
s.lock.Unlock()
|
|
|
|
// try receiving again
|
|
continue
|
|
|
|
case codes.Canceled:
|
|
return nil, errors.New("stream canceled")
|
|
|
|
default:
|
|
return nil, errors.New("subscription recv error")
|
|
}
|
|
}
|
|
|
|
event := resp.GetEventMessage()
|
|
data := any(event.GetData())
|
|
if len(event.GetData()) > 0 {
|
|
mediaType, _, err := mime.ParseMediaType(event.GetDataContentType())
|
|
if err == nil {
|
|
var v interface{}
|
|
switch mediaType {
|
|
case "application/json":
|
|
if err := json.Unmarshal(event.GetData(), &v); err == nil {
|
|
data = v
|
|
}
|
|
case "text/plain":
|
|
// Assume UTF-8 encoded string.
|
|
data = string(event.GetData())
|
|
default:
|
|
if strings.HasPrefix(mediaType, "application/") &&
|
|
strings.HasSuffix(mediaType, "+json") {
|
|
if err := json.Unmarshal(event.GetData(), &v); err == nil {
|
|
data = v
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
topicEvent := &common.TopicEvent{
|
|
ID: event.GetId(),
|
|
Source: event.GetSource(),
|
|
Type: event.GetType(),
|
|
SpecVersion: event.GetSpecVersion(),
|
|
DataContentType: event.GetDataContentType(),
|
|
Data: data,
|
|
RawData: event.GetData(),
|
|
Topic: event.GetTopic(),
|
|
PubsubName: event.GetPubsubName(),
|
|
}
|
|
|
|
return &SubscriptionMessage{
|
|
sub: s,
|
|
TopicEvent: topicEvent,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
func (s *SubscriptionMessage) Success() error {
|
|
return s.respond(pb.TopicEventResponse_SUCCESS)
|
|
}
|
|
|
|
func (s *SubscriptionMessage) Retry() error {
|
|
return s.respond(pb.TopicEventResponse_RETRY)
|
|
}
|
|
|
|
func (s *SubscriptionMessage) Drop() error {
|
|
return s.respond(pb.TopicEventResponse_DROP)
|
|
}
|
|
|
|
func (s *SubscriptionMessage) respondStatus(status common.SubscriptionResponseStatus) error {
|
|
var statuspb pb.TopicEventResponse_TopicEventResponseStatus
|
|
switch status {
|
|
case common.SubscriptionResponseStatusSuccess:
|
|
statuspb = pb.TopicEventResponse_SUCCESS
|
|
case common.SubscriptionResponseStatusRetry:
|
|
statuspb = pb.TopicEventResponse_RETRY
|
|
case common.SubscriptionResponseStatusDrop:
|
|
statuspb = pb.TopicEventResponse_DROP
|
|
default:
|
|
return fmt.Errorf("unknown status, expected one of %s, %s, %s: %s",
|
|
common.SubscriptionResponseStatusSuccess, common.SubscriptionResponseStatusRetry,
|
|
common.SubscriptionResponseStatusDrop, status)
|
|
}
|
|
|
|
return s.respond(statuspb)
|
|
}
|
|
|
|
func (s *SubscriptionMessage) respond(status pb.TopicEventResponse_TopicEventResponseStatus) error {
|
|
s.sub.lock.Lock()
|
|
defer s.sub.lock.Unlock()
|
|
|
|
return s.sub.stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
|
|
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventProcessed{
|
|
EventProcessed: &pb.SubscribeTopicEventsRequestProcessedAlpha1{
|
|
Id: s.ID,
|
|
Status: &pb.TopicEventResponse{Status: status},
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts SubscriptionOptions) (pb.Dapr_SubscribeTopicEventsAlpha1Client, error) {
|
|
if len(opts.PubsubName) == 0 {
|
|
return nil, errors.New("pubsub name required")
|
|
}
|
|
|
|
if len(opts.Topic) == 0 {
|
|
return nil, errors.New("topic required")
|
|
}
|
|
|
|
stream, err := c.protoClient.SubscribeTopicEventsAlpha1(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
|
|
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_InitialRequest{
|
|
InitialRequest: &pb.SubscribeTopicEventsRequestInitialAlpha1{
|
|
PubsubName: opts.PubsubName, Topic: opts.Topic,
|
|
Metadata: opts.Metadata, DeadLetterTopic: opts.DeadLetterTopic,
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, errors.Join(err, stream.CloseSend())
|
|
}
|
|
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
return nil, errors.Join(err, stream.CloseSend())
|
|
}
|
|
|
|
switch resp.GetSubscribeTopicEventsResponseType().(type) {
|
|
case *pb.SubscribeTopicEventsResponseAlpha1_InitialResponse:
|
|
default:
|
|
return nil, fmt.Errorf("unexpected initial response from server : %v", resp)
|
|
}
|
|
|
|
return stream, nil
|
|
}
|
|
|
|
func (s *Subscription) closeStreamOnly() error {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
if s.stream != nil {
|
|
return s.stream.CloseSend()
|
|
}
|
|
return nil
|
|
}
|