add deadlettertopic support to non-streaming subscriptions

Signed-off-by: yaron2 <schneider.yaron@live.com>
This commit is contained in:
yaron2 2025-02-05 14:54:30 -08:00
parent 3833d13591
commit 459e1f243f
4 changed files with 14 additions and 8 deletions

View File

@ -107,6 +107,8 @@ type Subscription struct {
Priority int `json:"priority"`
// DisableTopicValidation allows to receive events from publisher topics that differ from the subscribed topic.
DisableTopicValidation bool `json:"disableTopicValidation"`
// DeadLetterTopic is the name of the deadletter topic.
DeadLetterTopic string `json:"deadLetterTopic"`
}
type SubscriptionResponseStatus string

View File

@ -53,10 +53,11 @@ func (s *Server) ListTopicSubscriptions(ctx context.Context, in *emptypb.Empty)
for _, v := range s.topicRegistrar {
s := v.Subscription
sub := &runtimev1pb.TopicSubscription{
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
DeadLetterTopic: s.DeadLetterTopic,
}
subs = append(subs, sub)
}

View File

@ -39,7 +39,7 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi
ts, ok := m[key]
if !ok {
ts = &TopicRegistration{
Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic),
Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic, sub.DeadLetterTopic),
RouteHandlers: make(map[string]common.TopicEventSubscriber),
DefaultHandler: nil,
}

View File

@ -18,6 +18,8 @@ type TopicSubscription struct {
Routes *TopicRoutes `json:"routes,omitempty"`
// Metadata is the subscription metadata.
Metadata map[string]string `json:"metadata,omitempty"`
// DeadLetterTopic is the name of the deadletter topic.
DeadLetterTopic string `json:"deadLetterTopic"`
}
// TopicRoutes encapsulates the default route and multiple routing rules.
@ -42,10 +44,11 @@ type TopicRule struct {
}
// NewTopicSubscription creates a new `TopicSubscription`.
func NewTopicSubscription(pubsubName, topic string) *TopicSubscription {
func NewTopicSubscription(pubsubName, topic, deadLetterTopic string) *TopicSubscription {
return &TopicSubscription{ //nolint:exhaustivestruct
PubsubName: pubsubName,
Topic: topic,
PubsubName: pubsubName,
Topic: topic,
DeadLetterTopic: deadLetterTopic,
}
}