From 459e1f243f54d496290e50349209b809e12e4819 Mon Sep 17 00:00:00 2001 From: yaron2 Date: Wed, 5 Feb 2025 14:54:30 -0800 Subject: [PATCH] add deadlettertopic support to non-streaming subscriptions Signed-off-by: yaron2 --- service/common/type.go | 2 ++ service/grpc/topic.go | 9 +++++---- service/internal/topicregistrar.go | 2 +- service/internal/topicsubscription.go | 9 ++++++--- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/service/common/type.go b/service/common/type.go index 250e0de..fda3307 100644 --- a/service/common/type.go +++ b/service/common/type.go @@ -107,6 +107,8 @@ type Subscription struct { Priority int `json:"priority"` // DisableTopicValidation allows to receive events from publisher topics that differ from the subscribed topic. DisableTopicValidation bool `json:"disableTopicValidation"` + // DeadLetterTopic is the name of the deadletter topic. + DeadLetterTopic string `json:"deadLetterTopic"` } type SubscriptionResponseStatus string diff --git a/service/grpc/topic.go b/service/grpc/topic.go index 14f3ee0..b3ef4dc 100644 --- a/service/grpc/topic.go +++ b/service/grpc/topic.go @@ -53,10 +53,11 @@ func (s *Server) ListTopicSubscriptions(ctx context.Context, in *emptypb.Empty) for _, v := range s.topicRegistrar { s := v.Subscription sub := &runtimev1pb.TopicSubscription{ - PubsubName: s.PubsubName, - Topic: s.Topic, - Metadata: s.Metadata, - Routes: convertRoutes(s.Routes), + PubsubName: s.PubsubName, + Topic: s.Topic, + Metadata: s.Metadata, + Routes: convertRoutes(s.Routes), + DeadLetterTopic: s.DeadLetterTopic, } subs = append(subs, sub) } diff --git a/service/internal/topicregistrar.go b/service/internal/topicregistrar.go index 1ff8262..67af946 100644 --- a/service/internal/topicregistrar.go +++ b/service/internal/topicregistrar.go @@ -39,7 +39,7 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi ts, ok := m[key] if !ok { ts = &TopicRegistration{ - Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic), + Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic, sub.DeadLetterTopic), RouteHandlers: make(map[string]common.TopicEventSubscriber), DefaultHandler: nil, } diff --git a/service/internal/topicsubscription.go b/service/internal/topicsubscription.go index ca62899..aeb10d3 100644 --- a/service/internal/topicsubscription.go +++ b/service/internal/topicsubscription.go @@ -18,6 +18,8 @@ type TopicSubscription struct { Routes *TopicRoutes `json:"routes,omitempty"` // Metadata is the subscription metadata. Metadata map[string]string `json:"metadata,omitempty"` + // DeadLetterTopic is the name of the deadletter topic. + DeadLetterTopic string `json:"deadLetterTopic"` } // TopicRoutes encapsulates the default route and multiple routing rules. @@ -42,10 +44,11 @@ type TopicRule struct { } // NewTopicSubscription creates a new `TopicSubscription`. -func NewTopicSubscription(pubsubName, topic string) *TopicSubscription { +func NewTopicSubscription(pubsubName, topic, deadLetterTopic string) *TopicSubscription { return &TopicSubscription{ //nolint:exhaustivestruct - PubsubName: pubsubName, - Topic: topic, + PubsubName: pubsubName, + Topic: topic, + DeadLetterTopic: deadLetterTopic, } }