Invoke binding tracing fix (#7916)

* fix traceparent header for output binding and write int tests

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>

* refactor courtesy of @JoshVanL

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>

* PR feedback

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>

---------

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>
This commit is contained in:
Cassie Coyle 2024-07-19 13:38:12 -06:00 committed by GitHub
parent b106e8ccb2
commit 9c69f64aa8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 458 additions and 127 deletions

View File

@ -501,6 +501,21 @@ func (a *api) InvokeBinding(ctx context.Context, in *runtimev1pb.InvokeBindingRe
req.Metadata[key] = val
}
// this is for the http binding, so dont need grpc-trace-bin
span := diagUtils.SpanFromContext(ctx)
sc := span.SpanContext()
tp := diag.SpanContextToW3CString(sc)
if span != nil {
if _, ok := req.Metadata[diag.TraceparentHeader]; !ok {
req.Metadata[diag.TraceparentHeader] = tp
}
if _, ok := req.Metadata[diag.TracestateHeader]; !ok {
if sc.TraceState().Len() > 0 {
req.Metadata[diag.TracestateHeader] = diag.TraceStateToW3CString(sc)
}
}
}
// Allow for distributed tracing by passing context metadata.
if incomingMD, ok := metadata.FromIncomingContext(ctx); ok {
for key, val := range incomingMD {

View File

@ -21,6 +21,7 @@ import (
rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
procgrpc "github.com/dapr/dapr/tests/integration/framework/process/grpc"
testpb "github.com/dapr/dapr/tests/integration/framework/process/grpc/app/proto"
)
// Option is a function that configures the process.
@ -50,10 +51,12 @@ func New(t *testing.T, fopts ...Option) *App {
listInputBindFn: opts.listInputBindFn,
onBindingEventFn: opts.onBindingEventFn,
healthCheckFn: opts.healthCheckFn,
pingFn: opts.pingFn,
}
rtv1.RegisterAppCallbackServer(s, srv)
rtv1.RegisterAppCallbackAlphaServer(s, srv)
rtv1.RegisterAppCallbackHealthCheckServer(s, srv)
testpb.RegisterTestServiceServer(s, srv)
if opts.withRegister != nil {
opts.withRegister(s)
}

View File

@ -19,6 +19,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
testpb "github.com/dapr/dapr/tests/integration/framework/process/grpc/app/proto"
commonv1 "github.com/dapr/dapr/pkg/proto/common/v1"
rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
procgrpc "github.com/dapr/dapr/tests/integration/framework/process/grpc"
@ -37,6 +39,7 @@ type options struct {
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
pingFn func(context.Context, *testpb.PingRequest) (*testpb.PingResponse, error)
}
func WithGRPCOptions(opts ...procgrpc.Option) func(*options) {
@ -93,6 +96,12 @@ func WithHealthCheckFn(fn func(context.Context, *emptypb.Empty) (*rtv1.HealthChe
}
}
func WithPingFn(fn func(context.Context, *testpb.PingRequest) (*testpb.PingResponse, error)) func(*options) {
return func(opts *options) {
opts.pingFn = fn
}
}
func WithRegister(fn func(s *grpc.Server)) func(*options) {
return func(opts *options) {
opts.withRegister = fn

View File

@ -4,13 +4,14 @@
// protoc v4.24.4
// source: test.v1.proto
package __
package proto
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (

View File

@ -4,10 +4,11 @@
// - protoc v4.24.4
// source: test.v1.proto
package __
package proto
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"

View File

@ -20,9 +20,12 @@ import (
commonv1 "github.com/dapr/dapr/pkg/proto/common/v1"
rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
testpb "github.com/dapr/dapr/tests/integration/framework/process/grpc/app/proto"
)
type server struct {
testpb.UnsafeTestServiceServer
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
onJobEventFn func(context.Context, *rtv1.JobEventRequest) (*rtv1.JobEventResponse, error)
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
@ -31,6 +34,7 @@ type server struct {
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
pingFn func(context.Context, *testpb.PingRequest) (*testpb.PingResponse, error)
}
func (s *server) OnInvoke(ctx context.Context, in *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error) {
@ -88,3 +92,10 @@ func (s *server) HealthCheck(ctx context.Context, e *emptypb.Empty) (*rtv1.Healt
}
return s.healthCheckFn(ctx, e)
}
func (s *server) Ping(ctx context.Context, req *testpb.PingRequest) (*testpb.PingResponse, error) {
if s.pingFn != nil {
return s.pingFn(ctx, req)
}
return new(testpb.PingResponse), nil
}

View File

@ -34,5 +34,6 @@ import (
_ "github.com/dapr/dapr/tests/integration/suite/daprd/shutdown"
_ "github.com/dapr/dapr/tests/integration/suite/daprd/state"
_ "github.com/dapr/dapr/tests/integration/suite/daprd/subscriptions"
_ "github.com/dapr/dapr/tests/integration/suite/daprd/tracing"
_ "github.com/dapr/dapr/tests/integration/suite/daprd/workflow"
)

View File

@ -28,8 +28,8 @@ import (
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
testpb "github.com/dapr/dapr/tests/integration/framework/process/grpc/app/proto"
"github.com/dapr/dapr/tests/integration/suite"
testpb "github.com/dapr/dapr/tests/integration/suite/daprd/serviceinvocation/grpc/proto"
)
func init() {
@ -43,16 +43,20 @@ type remotebothtokens struct {
}
func (b *remotebothtokens) Setup(t *testing.T) []framework.Option {
fn, ch := newServer()
b.ch = ch
b.ch = make(chan metadata.MD, 1)
app := app.New(t,
app.WithRegister(fn),
app.WithOnInvokeFn(func(ctx context.Context, _ *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
require.True(t, ok)
b.ch <- md
return new(commonv1.InvokeResponse), nil
}),
app.WithPingFn(func(ctx context.Context, _ *testpb.PingRequest) (*testpb.PingResponse, error) {
md, _ := metadata.FromIncomingContext(ctx)
b.ch <- md
return new(testpb.PingResponse), nil
}),
)
b.daprd1 = daprd.New(t,

View File

@ -28,8 +28,8 @@ import (
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
testpb "github.com/dapr/dapr/tests/integration/framework/process/grpc/app/proto"
"github.com/dapr/dapr/tests/integration/suite"
testpb "github.com/dapr/dapr/tests/integration/suite/daprd/serviceinvocation/grpc/proto"
)
func init() {
@ -43,16 +43,19 @@ type remotereceiverhastoken struct {
}
func (r *remotereceiverhastoken) Setup(t *testing.T) []framework.Option {
fn, ch := newServer()
r.ch = ch
r.ch = make(chan metadata.MD, 1)
app := app.New(t,
app.WithRegister(fn),
app.WithOnInvokeFn(func(ctx context.Context, _ *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
require.True(t, ok)
r.ch <- md
return new(commonv1.InvokeResponse), nil
}),
app.WithPingFn(func(ctx context.Context, _ *testpb.PingRequest) (*testpb.PingResponse, error) {
md, _ := metadata.FromIncomingContext(ctx)
r.ch <- md
return new(testpb.PingResponse), nil
}),
)
r.daprd1 = daprd.New(t)

View File

@ -28,8 +28,8 @@ import (
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
testpb "github.com/dapr/dapr/tests/integration/framework/process/grpc/app/proto"
"github.com/dapr/dapr/tests/integration/suite"
testpb "github.com/dapr/dapr/tests/integration/suite/daprd/serviceinvocation/grpc/proto"
)
func init() {
@ -43,16 +43,19 @@ type remotereceivernotoken struct {
}
func (n *remotereceivernotoken) Setup(t *testing.T) []framework.Option {
fn, ch := newServer()
n.ch = ch
n.ch = make(chan metadata.MD, 1)
app := app.New(t,
app.WithRegister(fn),
app.WithOnInvokeFn(func(ctx context.Context, _ *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
require.True(t, ok)
n.ch <- md
return new(commonv1.InvokeResponse), nil
}),
app.WithPingFn(func(ctx context.Context, _ *testpb.PingRequest) (*testpb.PingResponse, error) {
md, _ := metadata.FromIncomingContext(ctx)
n.ch <- md
return new(testpb.PingResponse), nil
}),
)
n.daprd1 = daprd.New(t,

View File

@ -28,8 +28,8 @@ import (
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
testpb "github.com/dapr/dapr/tests/integration/framework/process/grpc/app/proto"
"github.com/dapr/dapr/tests/integration/suite"
testpb "github.com/dapr/dapr/tests/integration/suite/daprd/serviceinvocation/grpc/proto"
)
func init() {
@ -42,16 +42,19 @@ type selfnotoken struct {
}
func (n *selfnotoken) Setup(t *testing.T) []framework.Option {
fn, ch := newServer()
n.ch = ch
n.ch = make(chan metadata.MD, 1)
app := app.New(t,
app.WithRegister(fn),
app.WithOnInvokeFn(func(ctx context.Context, _ *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
require.True(t, ok)
n.ch <- md
return new(commonv1.InvokeResponse), nil
}),
app.WithPingFn(func(ctx context.Context, _ *testpb.PingRequest) (*testpb.PingResponse, error) {
md, _ := metadata.FromIncomingContext(ctx)
n.ch <- md
return new(testpb.PingResponse), nil
}),
)
n.daprd = daprd.New(t,

View File

@ -28,8 +28,8 @@ import (
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
testpb "github.com/dapr/dapr/tests/integration/framework/process/grpc/app/proto"
"github.com/dapr/dapr/tests/integration/suite"
testpb "github.com/dapr/dapr/tests/integration/suite/daprd/serviceinvocation/grpc/proto"
)
func init() {
@ -42,16 +42,19 @@ type selfwithtoken struct {
}
func (s *selfwithtoken) Setup(t *testing.T) []framework.Option {
fn, ch := newServer()
s.ch = ch
s.ch = make(chan metadata.MD, 1)
app := app.New(t,
app.WithRegister(fn),
app.WithOnInvokeFn(func(ctx context.Context, _ *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
require.True(t, ok)
s.ch <- md
return new(commonv1.InvokeResponse), nil
}),
app.WithPingFn(func(ctx context.Context, _ *testpb.PingRequest) (*testpb.PingResponse, error) {
md, _ := metadata.FromIncomingContext(ctx)
s.ch <- md
return new(testpb.PingResponse), nil
}),
)
s.daprd = daprd.New(t,

View File

@ -1,43 +0,0 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieh.
See the License for the specific language governing permissions and
limitations under the License.
*/
package appapitoken
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
testpb "github.com/dapr/dapr/tests/integration/suite/daprd/serviceinvocation/grpc/proto"
)
type pingServer struct {
testpb.UnsafeTestServiceServer
ch chan metadata.MD
}
func newServer() (func(*grpc.Server), chan metadata.MD) {
ch := make(chan metadata.MD, 1)
return func(s *grpc.Server) {
testpb.RegisterTestServiceServer(s, &pingServer{
ch: ch,
})
}, ch
}
func (p *pingServer) Ping(ctx context.Context, _ *testpb.PingRequest) (*testpb.PingResponse, error) {
md, _ := metadata.FromIncomingContext(ctx)
p.ch <- md
return new(testpb.PingResponse), nil
}

View File

@ -34,6 +34,7 @@ import (
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/parallel"
procdaprd "github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
"github.com/dapr/dapr/tests/integration/suite"
)
@ -94,8 +95,8 @@ func (b *basic) Setup(t *testing.T) []framework.Option {
}
}
srv1 := newGRPCServer(t, onInvoke)
srv2 := newGRPCServer(t, onInvoke)
srv1 := app.New(t, app.WithOnInvokeFn(onInvoke))
srv2 := app.New(t, app.WithOnInvokeFn(onInvoke))
b.daprd1 = procdaprd.New(t,
procdaprd.WithAppProtocol("grpc"),
procdaprd.WithAppPort(srv1.Port(t)),

View File

@ -32,6 +32,7 @@ import (
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/parallel"
procdaprd "github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
"github.com/dapr/dapr/tests/integration/suite"
)
@ -57,7 +58,7 @@ func (f *fuzzgrpc) Setup(t *testing.T) []framework.Option {
}, nil
}
srv := newGRPCServer(t, onInvoke)
srv := app.New(t, app.WithOnInvokeFn(onInvoke))
f.daprd1 = procdaprd.New(t, procdaprd.WithAppProtocol("grpc"), procdaprd.WithAppPort(srv.Port(t)))
f.daprd2 = procdaprd.New(t)

View File

@ -1,48 +0,0 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieh.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grpc
import (
"context"
"testing"
"google.golang.org/grpc"
commonv1 "github.com/dapr/dapr/pkg/proto/common/v1"
procgrpc "github.com/dapr/dapr/tests/integration/framework/process/grpc"
"github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
testpb "github.com/dapr/dapr/tests/integration/suite/daprd/serviceinvocation/grpc/proto"
)
type (
invokeFn func(ctx context.Context, in *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
)
type pingserver struct {
testpb.UnsafeTestServiceServer
}
func newGRPCServer(t *testing.T, onInvoke invokeFn, opts ...procgrpc.Option) *app.App {
return app.New(t,
app.WithGRPCOptions(opts...),
app.WithOnInvokeFn(onInvoke),
app.WithRegister(func(s *grpc.Server) {
testpb.RegisterTestServiceServer(s, new(pingserver))
}),
)
}
func (p *pingserver) Ping(context.Context, *testpb.PingRequest) (*testpb.PingResponse, error) {
return new(testpb.PingResponse), nil
}

View File

@ -32,9 +32,9 @@ import (
procdaprd "github.com/dapr/dapr/tests/integration/framework/process/daprd"
procgrpc "github.com/dapr/dapr/tests/integration/framework/process/grpc"
"github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
testpb "github.com/dapr/dapr/tests/integration/framework/process/grpc/app/proto"
"github.com/dapr/dapr/tests/integration/framework/process/ports"
"github.com/dapr/dapr/tests/integration/suite"
testpb "github.com/dapr/dapr/tests/integration/suite/daprd/serviceinvocation/grpc/proto"
)
func init() {
@ -64,14 +64,16 @@ func (s *slowappstartup) Setup(t *testing.T) []framework.Option {
fp := ports.Reserve(t, 1)
port := fp.Port(t)
s.app = newGRPCServer(t, onInvoke, procgrpc.WithListener(func() (net.Listener, error) {
// Simulate a slow startup by not opening the listener until 2 seconds after
// the process starts. This sleep value must be more than the health probe
// interval.
time.Sleep(time.Second * 2)
return net.Listen("tcp", "localhost:"+strconv.Itoa(port))
}))
s.app = app.New(t, app.WithOnInvokeFn(onInvoke),
app.WithGRPCOptions(
procgrpc.WithListener(func() (net.Listener, error) {
// Simulate a slow startup by not opening the listener until 2 seconds after
// the process starts. This sleep value must be more than the health probe
// interval.
time.Sleep(time.Second * 2)
return net.Listen("tcp", "localhost:"+strconv.Itoa(port))
})),
)
s.daprd = procdaprd.New(t,
procdaprd.WithAppProtocol("grpc"),
procdaprd.WithAppPort(port),

View File

@ -0,0 +1,138 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package binding
import (
"context"
"fmt"
"net/http"
"strings"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
grpcMetadata "google.golang.org/grpc/metadata"
"github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/client"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
prochttp "github.com/dapr/dapr/tests/integration/framework/process/http"
"github.com/dapr/dapr/tests/integration/suite"
)
func init() {
suite.Register(new(output))
}
type output struct {
httpapp *prochttp.HTTP
daprd *daprd.Daprd
traceparent atomic.Bool
}
func (b *output) Setup(t *testing.T) []framework.Option {
handler := http.NewServeMux()
handler.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
if tp := r.Header.Get("traceparent"); tp != "" {
b.traceparent.Store(true)
} else {
b.traceparent.Store(false)
}
w.Write([]byte(`OK`))
})
b.httpapp = prochttp.New(t, prochttp.WithHandler(handler))
b.daprd = daprd.New(t,
daprd.WithAppPort(b.httpapp.Port()),
daprd.WithResourceFiles(fmt.Sprintf(`apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: http-binding-traceparent
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: http://127.0.0.1:%d/test
`, b.httpapp.Port())))
return []framework.Option{
framework.WithProcesses(b.httpapp, b.daprd),
}
}
func (b *output) Run(t *testing.T, ctx context.Context) {
b.daprd.WaitUntilRunning(t, ctx)
httpClient := client.HTTP(t)
client := b.daprd.GRPCClient(t, ctx)
t.Run("no traceparent header provided", func(t *testing.T) {
// invoke binding
reqURL := fmt.Sprintf("http://localhost:%d/v1.0/bindings/http-binding-traceparent", b.daprd.HTTPPort())
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL, strings.NewReader("{\"operation\":\"get\"}"))
require.NoError(t, err)
resp, err := httpClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.True(t, b.traceparent.Load())
invokereq := runtime.InvokeBindingRequest{
Name: "http-binding-traceparent",
Operation: "get",
}
// invoke binding
invokeresp, err := client.InvokeBinding(ctx, &invokereq)
require.NoError(t, err)
require.NotNil(t, invokeresp)
assert.True(t, b.traceparent.Load())
})
t.Run("traceparent header provided", func(t *testing.T) {
// invoke binding
ctx := context.Background()
reqURL := fmt.Sprintf("http://localhost:%d/v1.0/bindings/http-binding-traceparent", b.daprd.HTTPPort())
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL, strings.NewReader("{\"operation\":\"get\"}"))
require.NoError(t, err)
tp := "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
req.Header.Set("traceparent", tp)
resp, err := httpClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.True(t, b.traceparent.Load())
invokereq := runtime.InvokeBindingRequest{
Name: "http-binding-traceparent",
Operation: "get",
}
// invoke binding
tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-02"
ctx = grpcMetadata.AppendToOutgoingContext(ctx, "traceparent", tp)
invokeresp, err := client.InvokeBinding(ctx, &invokereq)
require.NoError(t, err)
require.NotNil(t, invokeresp)
assert.True(t, b.traceparent.Load())
})
}

View File

@ -0,0 +1,203 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package serviceinvocation
import (
"context"
"fmt"
"net/http"
"strings"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
grpcMetadata "google.golang.org/grpc/metadata"
"github.com/dapr/dapr/pkg/proto/common/v1"
"github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/client"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
procgrpc "github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
prochttp "github.com/dapr/dapr/tests/integration/framework/process/http"
"github.com/dapr/dapr/tests/integration/suite"
)
func init() {
suite.Register(new(invoke))
}
type invoke struct {
grpcapp *procgrpc.App
httpapp *prochttp.HTTP
daprd *daprd.Daprd
grpcdaprd *daprd.Daprd
traceparent atomic.Bool
grpctracectxkey atomic.Bool
}
func (i *invoke) Setup(t *testing.T) []framework.Option {
handler := http.NewServeMux()
handler.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
if tp := r.Header.Get("traceparent"); tp != "" {
i.traceparent.Store(true)
} else {
i.traceparent.Store(false)
}
w.Write([]byte(`OK`))
})
i.httpapp = prochttp.New(t, prochttp.WithHandler(handler))
i.grpcapp = procgrpc.New(t,
procgrpc.WithOnInvokeFn(func(ctx context.Context, in *common.InvokeRequest) (*common.InvokeResponse, error) {
switch in.GetMethod() {
case "test":
if md, ok := grpcMetadata.FromIncomingContext(ctx); ok {
if _, exists := md["grpc-trace-bin"]; exists {
i.grpctracectxkey.Store(true)
} else {
i.grpctracectxkey.Store(false)
}
}
}
return nil, nil
}),
)
i.daprd = daprd.New(t, daprd.WithAppPort(i.httpapp.Port()))
i.grpcdaprd = daprd.New(t, daprd.WithAppPort(i.grpcapp.Port(t)), daprd.WithAppProtocol("grpc"))
return []framework.Option{
framework.WithProcesses(i.httpapp, i.daprd, i.grpcapp, i.grpcdaprd),
}
}
func (i *invoke) Run(t *testing.T, ctx context.Context) {
i.daprd.WaitUntilRunning(t, ctx)
i.grpcdaprd.WaitUntilRunning(t, ctx)
httpClient := client.HTTP(t)
client := i.daprd.GRPCClient(t, ctx)
t.Run("no traceparent header provided", func(t *testing.T) {
// invoke both grpc & http apps
appURL := fmt.Sprintf("http://localhost:%d/v1.0/invoke/%s/method/test", i.daprd.HTTPPort(), i.daprd.AppID())
appreq, err := http.NewRequestWithContext(ctx, http.MethodPost, appURL, strings.NewReader("{\"operation\":\"get\"}"))
require.NoError(t, err)
appresp, err := httpClient.Do(appreq)
require.NoError(t, err)
defer appresp.Body.Close()
assert.Equal(t, http.StatusOK, appresp.StatusCode)
assert.True(t, i.traceparent.Load())
svcreq := runtime.InvokeServiceRequest{
Id: i.daprd.AppID(),
Message: &common.InvokeRequest{
Method: "test",
Data: nil,
ContentType: "",
HttpExtension: &common.HTTPExtension{
Verb: common.HTTPExtension_GET,
Querystring: "",
},
},
}
svcresp, err := client.InvokeService(ctx, &svcreq)
require.NoError(t, err)
require.NotNil(t, svcresp)
assert.True(t, i.traceparent.Load())
grpcappreq := runtime.InvokeServiceRequest{
Id: i.grpcdaprd.AppID(),
Message: &common.InvokeRequest{
Method: "test",
Data: nil,
ContentType: "",
HttpExtension: &common.HTTPExtension{
Verb: common.HTTPExtension_GET,
Querystring: "",
},
},
}
// grpc app check
grpcclient := i.grpcdaprd.GRPCClient(t, ctx)
svcresp, err = grpcclient.InvokeService(ctx, &grpcappreq)
require.NoError(t, err)
require.NotNil(t, svcresp)
assert.True(t, i.grpctracectxkey.Load()) // this is set for grpc, instead of traceparent
})
t.Run("traceparent header provided", func(t *testing.T) {
// invoke both grpc & http apps
appURL := fmt.Sprintf("http://localhost:%d/v1.0/invoke/%s/method/test", i.daprd.HTTPPort(), i.daprd.AppID())
appreq, err := http.NewRequestWithContext(ctx, http.MethodPost, appURL, strings.NewReader("{\"operation\":\"get\"}"))
require.NoError(t, err)
tp := "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
appreq.Header.Set("traceparent", tp)
appresp, err := httpClient.Do(appreq)
require.NoError(t, err)
defer appresp.Body.Close()
assert.Equal(t, http.StatusOK, appresp.StatusCode)
assert.True(t, i.traceparent.Load())
svcreq := runtime.InvokeServiceRequest{
Id: i.daprd.AppID(),
Message: &common.InvokeRequest{
Method: "test",
Data: nil,
ContentType: "",
HttpExtension: &common.HTTPExtension{
Verb: common.HTTPExtension_GET,
Querystring: "",
},
},
}
tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-02"
ctx = grpcMetadata.AppendToOutgoingContext(ctx, "traceparent", tp)
svcresp, err := client.InvokeService(ctx, &svcreq)
require.NoError(t, err)
require.NotNil(t, svcresp)
assert.True(t, i.traceparent.Load())
grpcappreq := runtime.InvokeServiceRequest{
Id: i.grpcdaprd.AppID(),
Message: &common.InvokeRequest{
Method: "test",
Data: nil,
ContentType: "",
HttpExtension: &common.HTTPExtension{
Verb: common.HTTPExtension_GET,
Querystring: "",
},
},
}
tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-03"
ctx = grpcMetadata.AppendToOutgoingContext(ctx, "traceparent", tp)
grpcclient := i.grpcdaprd.GRPCClient(t, ctx)
svcresp, err = grpcclient.InvokeService(ctx, &grpcappreq)
require.NoError(t, err)
require.NotNil(t, svcresp)
assert.True(t, i.grpctracectxkey.Load()) // this is set for grpc, instead of traceparent
})
}

View File

@ -0,0 +1,20 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tracing
import (
_ "github.com/dapr/dapr/tests/integration/suite/daprd/tracing/binding"
_ "github.com/dapr/dapr/tests/integration/suite/daprd/tracing/serviceinvocation"
)