This commit is contained in:
majkio 2025-04-08 08:05:29 +00:00 committed by GitHub
commit 7ad32caf45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 73 additions and 0 deletions

View File

@ -25,6 +25,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/service/common"
@ -195,6 +196,8 @@ func (s *Subscription) Receive() (*SubscriptionMessage, error) {
RawData: event.GetData(),
Topic: event.GetTopic(),
PubsubName: event.GetPubsubName(),
TraceID: getStringValueFromExtension(event.GetExtensions(), "traceid"),
TraceParent: getStringValueFromExtension(event.GetExtensions(), "traceparent"),
}
return &SubscriptionMessage{
@ -297,3 +300,18 @@ func (s *Subscription) closeStreamOnly() error {
}
return nil
}
func getStringValueFromExtension(extension *structpb.Struct, key string) string {
if extension == nil {
return ""
}
value, ok := extension.GetFields()[key]
if !ok {
return ""
}
typed, ok := value.GetKind().(*structpb.Value_StringValue)
if !ok {
return ""
}
return typed.StringValue
}

View File

@ -23,6 +23,7 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/structpb"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/service/common"
@ -139,6 +140,8 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq
Topic: in.GetTopic(),
PubsubName: in.GetPubsubName(),
Metadata: getCustomMetadataFromContext(ctx),
TraceID: getStringValueFromExtension(in.GetExtensions(), "traceid"),
TraceParent: getStringValueFromExtension(in.GetExtensions(), "traceparent"),
}
h := sub.DefaultHandler
if in.GetPath() != "" {
@ -183,3 +186,18 @@ func getCustomMetadataFromContext(ctx context.Context) map[string]string {
func (s *Server) OnBulkTopicEventAlpha1(ctx context.Context, in *runtimev1pb.TopicEventBulkRequest) (*runtimev1pb.TopicEventBulkResponse, error) {
panic("This API callback is not supported.")
}
func getStringValueFromExtension(extension *structpb.Struct, key string) string {
if extension == nil {
return ""
}
value, ok := extension.GetFields()[key]
if !ok {
return ""
}
typed, ok := value.GetKind().(*structpb.Value_StringValue)
if !ok {
return ""
}
return typed.StringValue
}

View File

@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/structpb"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/emptypb"
@ -163,6 +164,42 @@ func TestTopic(t *testing.T) {
require.NoError(t, err)
})
t.Run("topic event with traceid and traceparent", func(t *testing.T) {
sub2 := &common.Subscription{
PubsubName: "messages",
Topic: "test2",
}
err := server.AddTopicEventHandler(sub2,
func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
assert.Equal(t, "expected-traceid-value", e.TraceID)
assert.Equal(t, "expected-traceparent-value", e.TraceParent)
return false, nil
})
require.NoError(t, err)
extensions, err := structpb.NewStruct(map[string]any{
"traceid": "expected-traceid-value",
"traceparent": "expected-traceparent-value",
})
require.NoError(t, err)
in := &runtime.TopicEventRequest{
Id: "a123",
Source: "test",
Type: "test",
SpecVersion: "v1.0",
DataContentType: "text/plain",
Data: []byte("test"),
Topic: sub2.Topic,
PubsubName: sub2.PubsubName,
Extensions: extensions,
}
ctx := context.Background()
_, err = server.OnTopicEvent(ctx, in)
require.NoError(t, err)
})
stopTestServer(t, server)
}