Refactor SPIFFE from `pkg/security` to `kit` (#7669)

* Refactor SPIFFE from `pkg/security` to `kit`

Updates the `pkg/security` package to move the SPIFFE implementation to
a new kit package. This new kit package is more modulated and fuller
test coverage. This package has been moved so that it can be both
imported by dapr & components-contrib, as well as making the package
more suitable for further development to support X.509 Component auth.

https://github.com/dapr/proposals/pull/51

Also moves in test/utils from dapr to crypto/test for shared usage.

Part of https://github.com/dapr/proposals/pull/51

Uses go mod fork of https://github.com/dapr/kit/pull/92

Signed-off-by: joshvanl <me@joshvanl.dev>

* Include SVID context with `Init`ing Component

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds security to processor options

Signed-off-by: joshvanl <me@joshvanl.dev>

* Update github.com/dapr/dapr to master

Signed-off-by: joshvanl <me@joshvanl.dev>

* Update `util` to new `test` package import

Signed-off-by: joshvanl <me@joshvanl.dev>

* Update go.sum

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
This commit is contained in:
Josh van Leeuwen 2024-05-10 03:40:57 +01:00 committed by GitHub
parent 635dc49f95
commit bc77d50a5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 361 additions and 1278 deletions

View File

@ -91,7 +91,7 @@ func Run() {
SentryAddress: cfg.SentryAddress,
ControlPlaneTrustDomain: cfg.ControlPlaneTrustDomain,
ControlPlaneNamespace: namespace,
TrustAnchorsFile: cfg.TrustAnchorsFile,
TrustAnchorsFile: &cfg.TrustAnchorsFile,
AppID: "dapr-injector",
MTLSEnabled: true,
Mode: modes.KubernetesMode,
@ -165,7 +165,7 @@ func Run() {
return rerr
}
caBundle, rErr := sec.CurrentTrustAnchors()
caBundle, rErr := sec.CurrentTrustAnchors(ctx)
if rErr != nil {
return rErr
}

View File

@ -74,7 +74,7 @@ func Run() {
SentryAddress: opts.SentryAddress,
ControlPlaneTrustDomain: opts.TrustDomain,
ControlPlaneNamespace: security.CurrentNamespace(),
TrustAnchorsFile: opts.TrustAnchorsFile,
TrustAnchorsFile: &opts.TrustAnchorsFile,
AppID: "dapr-placement",
MTLSEnabled: opts.TLSEnabled,
Mode: modes.DaprMode(opts.Mode),

6
go.mod
View File

@ -11,7 +11,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/dapr/components-contrib v1.13.0-rc.2.0.20240503231149-1f46231d875c
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e
github.com/dapr/kit v0.13.1-0.20240415171926-a3f906d60908
github.com/evanphx/json-patch/v5 v5.8.1
github.com/go-chi/chi/v5 v5.0.11
github.com/go-chi/cors v1.2.1
@ -45,7 +45,7 @@ require (
github.com/sony/gobreaker v0.5.0
github.com/spf13/cast v1.6.0
github.com/spf13/pflag v1.0.5
github.com/spiffe/go-spiffe/v2 v2.1.6
github.com/spiffe/go-spiffe/v2 v2.1.7
github.com/stretchr/testify v1.9.0
github.com/valyala/fasthttp v1.51.0
go.mongodb.org/mongo-driver v1.12.1
@ -66,7 +66,6 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0
google.golang.org/grpc v1.60.1
google.golang.org/grpc/examples v0.0.0-20230224211313-3775f633ce20
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.28.4
@ -414,6 +413,7 @@ require (
google.golang.org/api v0.149.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20231211222908-989df2bf70f3 // indirect
google.golang.org/grpc/examples v0.0.0-20230224211313-3775f633ce20 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/couchbase/gocb.v1 v1.6.7 // indirect
gopkg.in/couchbase/gocbcore.v7 v7.1.18 // indirect

16
go.sum
View File

@ -121,8 +121,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXY
github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw=
github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg=
github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d h1:wvStE9wLpws31NiWUx+38wny1msZ/tm+eL5xmm4Y7So=
github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d/go.mod h1:9XMFaCeRyW7fC9XJOWQ+NdAv8VLG7ys7l3x4ozEGLUQ=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
@ -439,8 +439,8 @@ github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuA
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/dapr/components-contrib v1.13.0-rc.2.0.20240503231149-1f46231d875c h1:vzu6TjW2XYZAQY+g9fHmTzOGYNB1lPvRAun7YLV73Nk=
github.com/dapr/components-contrib v1.13.0-rc.2.0.20240503231149-1f46231d875c/go.mod h1:8/+3UcZLcNytOKLXPpseDT3gB0Mo4ryoMaiud+9u60k=
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e h1:mLvqfGuppb6uhsijmwTlF5sZVtGvig+Ua5ESKF17SxA=
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e/go.mod h1:dons8V2bF6MPR2yFdxtTa86PfaE7EJtKAOkZ9hOavBQ=
github.com/dapr/kit v0.13.1-0.20240415171926-a3f906d60908 h1:8Bs9nVJh00BVNJxsB5Djf0xICW53kiKi3QL/jZ5qp8s=
github.com/dapr/kit v0.13.1-0.20240415171926-a3f906d60908/go.mod h1:LkPZyrSpa2xLBgYMwUhDbWZcZVt/WdL7FSPlN0PrSog=
github.com/dave/jennifer v1.4.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -579,8 +579,8 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-jose/go-jose/v3 v3.0.0 h1:s6rrhirfEP/CGIoc6p+PZAeogN2SxKav6Wp7+dyMWVo=
github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8=
github.com/go-jose/go-jose/v3 v3.0.1 h1:pWmKFVtt+Jl0vBZTIpz/eAKwsm6LkIxDVVbFHKkchhA=
github.com/go-jose/go-jose/v3 v3.0.1/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.10.0 h1:dXFJfIHVvUcpSgDOV+Ne6t7jXri8Tfv2uOLHUZ2XNuo=
@ -1522,8 +1522,8 @@ github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
github.com/spiffe/go-spiffe/v2 v2.1.6 h1:4SdizuQieFyL9eNU+SPiCArH4kynzaKOOj0VvM8R7Xo=
github.com/spiffe/go-spiffe/v2 v2.1.6/go.mod h1:eVDqm9xFvyqao6C+eQensb9ZPkyNEeaUbqbBpOhBnNk=
github.com/spiffe/go-spiffe/v2 v2.1.7 h1:VUkM1yIyg/x8X7u1uXqSRVRCdMdfRIEdFBzpqoeASGk=
github.com/spiffe/go-spiffe/v2 v2.1.7/go.mod h1:QJDGdhXllxjxvd5B+2XnhhXB/+rC8gr+lNrtOryiWeE=
github.com/stealthrocket/wasi-go v0.8.1-0.20230912180546-8efbab50fb58 h1:mTC4gyv3lcJ1XpzZMAckqkvWUqeT5Bva4RAT1IoHAAA=
github.com/stealthrocket/wasi-go v0.8.1-0.20230912180546-8efbab50fb58/go.mod h1:ZAYCOqLJkc9P6fcq14TV4cf+gJ2fHthp9kCGxBViagE=
github.com/stealthrocket/wazergo v0.19.1 h1:BPrITETPgSFwiytwmToO0MbUC/+RGC39JScz1JmmG6c=

View File

@ -161,7 +161,7 @@ func testSecurity(t *testing.T) security.Handler {
ControlPlaneTrustDomain: "test.example.com",
ControlPlaneNamespace: "default",
MTLSEnabled: false,
OverrideCertRequestSource: func(context.Context, []byte) ([]*x509.Certificate, error) {
OverrideCertRequestFn: func(context.Context, []byte) ([]*x509.Certificate, error) {
return []*x509.Certificate{nil}, nil
},
})

View File

@ -15,6 +15,7 @@ package service
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
@ -52,7 +53,7 @@ func TestHandleRequest(t *testing.T) {
require.NoError(t, err)
injector := i.(*injector)
injector.currentTrustAnchors = func() ([]byte, error) {
injector.currentTrustAnchors = func(context.Context) ([]byte, error) {
return nil, nil
}

View File

@ -56,7 +56,7 @@ var AllowedServiceAccountInfos = []string{
}
type (
currentTrustAnchorsFn func() (ca []byte, err error)
currentTrustAnchorsFn func(context.Context) (ca []byte, err error)
)
// Injector is the interface for the Dapr runtime sidecar injection component.

View File

@ -51,7 +51,7 @@ func (i *injector) getPodPatchOperations(ctx context.Context, ar *admissionv1.Ad
sentryAddress := patcher.ServiceSentry.Address(i.config.Namespace, i.config.KubeClusterDomain)
operatorAddress := patcher.ServiceAPI.Address(i.config.Namespace, i.config.KubeClusterDomain)
trustAnchors, err := i.currentTrustAnchors()
trustAnchors, err := i.currentTrustAnchors(ctx)
if err != nil {
return nil, err
}

View File

@ -42,7 +42,7 @@ import (
"github.com/dapr/dapr/pkg/operator/api/informer"
informerfake "github.com/dapr/dapr/pkg/operator/api/informer/fake"
operatorv1pb "github.com/dapr/dapr/pkg/proto/operator/v1"
"github.com/dapr/dapr/tests/util"
"github.com/dapr/kit/crypto/test"
)
type mockComponentUpdateServer struct {
@ -196,7 +196,7 @@ func TestProcessComponentSecrets(t *testing.T) {
func TestComponentUpdate(t *testing.T) {
appID := spiffeid.RequireFromString("spiffe://example.org/ns/ns1/app1")
serverID := spiffeid.RequireFromString("spiffe://example.org/ns/dapr-system/dapr-operator")
pki := util.GenPKI(t, util.PKIOptions{
pki := test.GenPKI(t, test.PKIOptions{
LeafID: serverID,
ClientID: appID,
})
@ -317,7 +317,7 @@ func TestComponentUpdate(t *testing.T) {
func TestHTTPEndpointUpdate(t *testing.T) {
appID := spiffeid.RequireFromString("spiffe://example.org/ns/ns1/app1")
serverID := spiffeid.RequireFromString("spiffe://example.org/ns/dapr-system/dapr-operator")
pki := util.GenPKI(t, util.PKIOptions{
pki := test.GenPKI(t, test.PKIOptions{
LeafID: serverID,
ClientID: appID,
})
@ -411,7 +411,7 @@ func TestHTTPEndpointUpdate(t *testing.T) {
func TestListScopes(t *testing.T) {
appID := spiffeid.RequireFromString("spiffe://example.org/ns/namespace-a/app1")
serverID := spiffeid.RequireFromString("spiffe://example.org/ns/dapr-system/dapr-operator")
pki := util.GenPKI(t, util.PKIOptions{
pki := test.GenPKI(t, test.PKIOptions{
LeafID: serverID,
ClientID: appID,
})
@ -482,7 +482,7 @@ func TestListScopes(t *testing.T) {
func TestListsNamespaced(t *testing.T) {
appID := spiffeid.RequireFromString("spiffe://example.org/ns/namespace-a/app1")
serverID := spiffeid.RequireFromString("spiffe://example.org/ns/dapr-system/dapr-operator")
pki := util.GenPKI(t, util.PKIOptions{
pki := test.GenPKI(t, test.PKIOptions{
LeafID: serverID,
ClientID: appID,
})

View File

@ -24,13 +24,13 @@ import (
"google.golang.org/grpc/status"
"github.com/dapr/dapr/pkg/security/spiffe"
"github.com/dapr/dapr/tests/util"
"github.com/dapr/kit/crypto/test"
)
func Test_Request(t *testing.T) {
appID := spiffeid.RequireFromString("spiffe://example.org/ns/ns1/app1")
serverID := spiffeid.RequireFromString("spiffe://example.org/ns/dapr-system/dapr-operator")
pki := util.GenPKI(t, util.PKIOptions{LeafID: serverID, ClientID: appID})
pki := test.GenPKI(t, test.PKIOptions{LeafID: serverID, ClientID: appID})
t.Run("no auth context should error", func(t *testing.T) {
id, err := Request(context.Background(), "ns1")
@ -55,7 +55,7 @@ func Test_Request(t *testing.T) {
t.Run("invalid SPIFFE path should error", func(t *testing.T) {
appID := spiffeid.RequireFromString("spiffe://example.org/foo/bar")
pki2 := util.GenPKI(t, util.PKIOptions{LeafID: serverID, ClientID: appID})
pki2 := test.GenPKI(t, test.PKIOptions{LeafID: serverID, ClientID: appID})
id, err := Request(pki2.ClientGRPCCtx(t), "ns1")
require.Error(t, err)
assert.Equal(t, codes.PermissionDenied, status.Code(err))

View File

@ -30,14 +30,14 @@ import (
compapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1"
"github.com/dapr/dapr/pkg/proto/operator/v1"
"github.com/dapr/dapr/tests/util"
"github.com/dapr/kit/crypto/test"
)
func Test_WatchUpdates(t *testing.T) {
t.Run("bad authz should error", func(t *testing.T) {
appID := spiffeid.RequireFromString("spiffe://example.org/ns/ns1/app1")
serverID := spiffeid.RequireFromString("spiffe://example.org/ns/dapr-system/dapr-operator")
pki := util.GenPKI(t, util.PKIOptions{LeafID: serverID, ClientID: appID})
pki := test.GenPKI(t, test.PKIOptions{LeafID: serverID, ClientID: appID})
i := New[compapi.Component](Options{}).(*informer[compapi.Component])
@ -55,7 +55,7 @@ func Test_WatchUpdates(t *testing.T) {
t.Run("should receive app events on batch events in order", func(t *testing.T) {
appID := spiffeid.RequireFromString("spiffe://example.org/ns/ns1/app1")
serverID := spiffeid.RequireFromString("spiffe://example.org/ns/dapr-system/dapr-operator")
pki := util.GenPKI(t, util.PKIOptions{LeafID: serverID, ClientID: appID})
pki := test.GenPKI(t, test.PKIOptions{LeafID: serverID, ClientID: appID})
i := New[compapi.Component](Options{}).(*informer[compapi.Component])
t.Cleanup(func() { close(i.closeCh) })

View File

@ -105,7 +105,7 @@ func NewOperator(ctx context.Context, opts Options) (Operator, error) {
SentryAddress: config.SentryAddress,
ControlPlaneTrustDomain: config.ControlPlaneTrustDomain,
ControlPlaneNamespace: security.CurrentNamespace(),
TrustAnchorsFile: opts.TrustAnchorsFile,
TrustAnchorsFile: &opts.TrustAnchorsFile,
AppID: "dapr-operator",
// mTLS is always enabled for the operator.
MTLSEnabled: true,
@ -299,7 +299,7 @@ func (o *operator) Run(ctx context.Context) error {
return rErr
}
caBundle, rErr := sec.CurrentTrustAnchors()
caBundle, rErr := sec.CurrentTrustAnchors(ctx)
if rErr != nil {
return rErr
}

View File

@ -28,6 +28,7 @@ import (
"github.com/dapr/dapr/pkg/runtime/meta"
"github.com/dapr/dapr/pkg/runtime/processor"
"github.com/dapr/dapr/pkg/runtime/registry"
"github.com/dapr/dapr/pkg/security/fake"
daprt "github.com/dapr/dapr/pkg/testing"
"github.com/dapr/kit/logger"
)
@ -47,6 +48,7 @@ func TestInitBindings(t *testing.T) {
GlobalConfig: new(config.Configuration),
Meta: meta.New(meta.Options{}),
GRPC: manager.NewManager(nil, modes.StandaloneMode, &manager.AppChannelConfig{Port: 0}),
Security: fake.New(),
})
c := compapi.Component{}
@ -69,6 +71,7 @@ func TestInitBindings(t *testing.T) {
ComponentStore: compstore.New(),
GlobalConfig: new(config.Configuration),
Meta: meta.New(meta.Options{}),
Security: fake.New(),
})
c := compapi.Component{}
@ -99,6 +102,7 @@ func TestInitBindings(t *testing.T) {
GlobalConfig: new(config.Configuration),
Meta: meta.New(meta.Options{}),
GRPC: manager.NewManager(nil, modes.StandaloneMode, &manager.AppChannelConfig{Port: 0}),
Security: fake.New(),
})
input := compapi.Component{}
@ -124,6 +128,7 @@ func TestInitBindings(t *testing.T) {
GlobalConfig: new(config.Configuration),
Meta: meta.New(meta.Options{}),
GRPC: manager.NewManager(nil, modes.StandaloneMode, &manager.AppChannelConfig{Port: 0}),
Security: fake.New(),
})
c := compapi.Component{}

View File

@ -183,7 +183,7 @@ func TestGetSubscribedBindingsGRPC(t *testing.T) {
ControlPlaneTrustDomain: "test.example.com",
ControlPlaneNamespace: "default",
MTLSEnabled: false,
OverrideCertRequestSource: func(context.Context, []byte) ([]*x509.Certificate, error) {
OverrideCertRequestFn: func(context.Context, []byte) ([]*x509.Certificate, error) {
return []*x509.Certificate{nil}, nil
},
})

View File

@ -40,7 +40,7 @@ func (p *Processor) Init(ctx context.Context, comp componentsapi.Component) erro
return err
}
if err := m.Init(ctx, comp); err != nil {
if err := m.Init(p.security.WithSVIDContext(ctx), comp); err != nil {
return errors.Join(err, p.compStore.DropPendingComponent())
}

View File

@ -42,6 +42,7 @@ import (
"github.com/dapr/dapr/pkg/runtime/processor/state"
"github.com/dapr/dapr/pkg/runtime/processor/wfbackend"
"github.com/dapr/dapr/pkg/runtime/registry"
"github.com/dapr/dapr/pkg/security"
"github.com/dapr/kit/concurrency"
"github.com/dapr/kit/logger"
)
@ -93,6 +94,8 @@ type Options struct {
OperatorClient operatorv1.OperatorClient
MiddlewareHTTP *http.HTTP
Security security.Handler
}
// Processor manages the lifecycle of all components categories.
@ -105,6 +108,7 @@ type Processor struct {
pubsub PubsubManager
binding BindingManager
workflowBackend WorkflowBackendManager
security security.Handler
pendingHTTPEndpoints chan httpendpointsapi.HTTPEndpoint
pendingComponents chan componentsapi.Component
@ -182,6 +186,7 @@ func New(opts Options) *Processor {
binding: binding,
secret: secret,
workflowBackend: wfbe,
security: opts.Security,
managers: map[components.Category]manager{
components.CategoryBindings: binding,
components.CategoryConfiguration: configuration.New(configuration.Options{

View File

@ -39,6 +39,7 @@ import (
"github.com/dapr/dapr/pkg/runtime/meta"
rtmock "github.com/dapr/dapr/pkg/runtime/mock"
"github.com/dapr/dapr/pkg/runtime/registry"
"github.com/dapr/dapr/pkg/security/fake"
daprt "github.com/dapr/dapr/pkg/testing"
"github.com/dapr/kit/logger"
)
@ -63,6 +64,7 @@ func newTestProcWithID(id string) (*Processor, *registry.Registry) {
GRPC: nil,
Channels: new(channels.Channels),
GlobalConfig: new(config.Configuration),
Security: fake.New(),
}), reg
}

View File

@ -38,6 +38,7 @@ import (
"github.com/dapr/dapr/pkg/runtime/mock"
"github.com/dapr/dapr/pkg/runtime/processor"
"github.com/dapr/dapr/pkg/runtime/registry"
"github.com/dapr/dapr/pkg/security/fake"
daprt "github.com/dapr/dapr/pkg/testing"
"github.com/dapr/kit/logger"
)
@ -50,6 +51,7 @@ func TestInitState(t *testing.T) {
ComponentStore: compStore,
GlobalConfig: new(config.Configuration),
Meta: meta.New(meta.Options{Mode: modes.StandaloneMode}),
Security: fake.New(),
})
bytes := make([]byte, 32)

View File

@ -197,6 +197,7 @@ func newDaprRuntime(ctx context.Context,
GRPC: grpc,
Channels: channels,
MiddlewareHTTP: httpMiddleware,
Security: sec,
})
var reloader *hotreload.Reloader

View File

@ -1959,6 +1959,7 @@ func TestGracefulShutdownPubSub(t *testing.T) {
Mode: rt.runtimeConfig.mode,
Channels: rt.channels,
GRPC: rt.grpc,
Security: rt.sec,
})
require.NoError(t, rt.processor.Init(context.Background(), cPubSub))
@ -2134,7 +2135,7 @@ func testSecurity(t *testing.T) security.Handler {
ControlPlaneTrustDomain: "test.example.com",
ControlPlaneNamespace: "default",
MTLSEnabled: false,
OverrideCertRequestSource: func(context.Context, []byte) ([]*x509.Certificate, error) {
OverrideCertRequestFn: func(context.Context, []byte) ([]*x509.Certificate, error) {
return []*x509.Certificate{nil}, nil
},
})

View File

@ -32,7 +32,7 @@ import (
type Fake struct {
controlPlaneTrustDomainFn func() spiffeid.TrustDomain
controlPlaneNamespaceFn func() string
currentTrustAnchorsFn func() ([]byte, error)
currentTrustAnchorsFn func(context.Context) ([]byte, error)
watchTrustAnchorsFn func(context.Context, chan<- []byte)
mtls bool
@ -75,7 +75,7 @@ func New() *Fake {
grpcServerOptionNoClientAuthFn: func() grpc.ServerOption {
return grpc.Creds(nil)
},
currentTrustAnchorsFn: func() ([]byte, error) {
currentTrustAnchorsFn: func(context.Context) ([]byte, error) {
return []byte{}, nil
},
watchTrustAnchorsFn: func(context.Context, chan<- []byte) {
@ -173,7 +173,7 @@ func (f *Fake) GRPCServerOptionMTLS() grpc.ServerOption {
return f.grpcServerOptionMTLSFn()
}
func (f *Fake) WithCurrentTrustAnchorsFn(fn func() ([]byte, error)) *Fake {
func (f *Fake) WithCurrentTrustAnchorsFn(fn func(context.Context) ([]byte, error)) *Fake {
f.currentTrustAnchorsFn = fn
return f
}
@ -202,14 +202,18 @@ func (f *Fake) GRPCServerOptionNoClientAuth() grpc.ServerOption {
return f.grpcServerOptionNoClientAuthFn()
}
func (f *Fake) CurrentTrustAnchors() ([]byte, error) {
return f.currentTrustAnchorsFn()
func (f *Fake) CurrentTrustAnchors(ctx context.Context) ([]byte, error) {
return f.currentTrustAnchorsFn(ctx)
}
func (f *Fake) WatchTrustAnchors(ctx context.Context, ch chan<- []byte) {
f.watchTrustAnchorsFn(ctx, ch)
}
func (f *Fake) WithSVIDContext(ctx context.Context) context.Context {
return ctx
}
func (f *Fake) GRPCDialOption(id spiffeid.ID) grpc.DialOption {
return f.grpcDialOptionFn(id)
}

View File

@ -1,189 +0,0 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pem
import (
"bytes"
"crypto"
"crypto/ecdsa"
"crypto/ed25519"
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
)
// DecodePEMCertificatesChain takes a PEM-encoded x509 certificates byte array
// and returns all certificates in a slice of x509.Certificate objects.
// Expects certificates to be a chain with leaf certificate to be first in the
// byte array.
func DecodePEMCertificatesChain(crtb []byte) ([]*x509.Certificate, error) {
certs, err := DecodePEMCertificates(crtb)
if err != nil {
return nil, err
}
for i := 0; i < len(certs)-1; i++ {
if certs[i].CheckSignatureFrom(certs[i+1]) != nil {
return nil, errors.New("certificate chain is not valid")
}
}
return certs, nil
}
// DecodePEMCertificatesChain takes a PEM-encoded x509 certificates byte array
// and returns all certificates in a slice of x509.Certificate objects.
func DecodePEMCertificates(crtb []byte) ([]*x509.Certificate, error) {
certs := []*x509.Certificate{}
for len(crtb) > 0 {
var err error
var cert *x509.Certificate
cert, crtb, err = decodeCertificatePEM(crtb)
if err != nil {
return nil, err
}
if cert != nil {
// it's a cert, add to pool
certs = append(certs, cert)
}
}
if len(certs) == 0 {
return nil, errors.New("no certificates found")
}
return certs, nil
}
func decodeCertificatePEM(crtb []byte) (*x509.Certificate, []byte, error) {
block, crtb := pem.Decode(crtb)
if block == nil {
return nil, nil, nil
}
if block.Type != "CERTIFICATE" {
return nil, nil, nil
}
c, err := x509.ParseCertificate(block.Bytes)
return c, crtb, err
}
// DecodePEMPrivateKey takes a key PEM byte array and returns an object that
// represents either an RSA or EC private key.
func DecodePEMPrivateKey(key []byte) (crypto.Signer, error) {
block, _ := pem.Decode(key)
if block == nil {
return nil, errors.New("key is not PEM encoded")
}
switch block.Type {
case "EC PRIVATE KEY":
return x509.ParseECPrivateKey(block.Bytes)
case "RSA PRIVATE KEY":
return x509.ParsePKCS1PrivateKey(block.Bytes)
case "PRIVATE KEY":
key, err := x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
return key.(crypto.Signer), nil
default:
return nil, fmt.Errorf("unsupported block type %s", block.Type)
}
}
// EncodePrivateKey will encode a private key into PEM format.
func EncodePrivateKey(key any) ([]byte, error) {
var (
keyBytes []byte
err error
blockType string
)
switch key := key.(type) {
case *ecdsa.PrivateKey, *ed25519.PrivateKey:
keyBytes, err = x509.MarshalPKCS8PrivateKey(key)
if err != nil {
return nil, err
}
blockType = "PRIVATE KEY"
default:
return nil, fmt.Errorf("unsupported key type %T", key)
}
return pem.EncodeToMemory(&pem.Block{
Type: blockType, Bytes: keyBytes,
}), nil
}
// EncodeX509 will encode a single *x509.Certificate into PEM format.
func EncodeX509(cert *x509.Certificate) ([]byte, error) {
caPem := bytes.NewBuffer([]byte{})
err := pem.Encode(caPem, &pem.Block{Type: "CERTIFICATE", Bytes: cert.Raw})
if err != nil {
return nil, err
}
return caPem.Bytes(), nil
}
// EncodeX509Chain will encode a list of *x509.Certificates into a PEM format chain.
// Self-signed certificates are not included as per
// https://datatracker.ietf.org/doc/html/rfc5246#section-7.4.2
// Certificates are output in the order they're given; if the input is not ordered
// as specified in RFC5246 section 7.4.2, the resulting chain might not be valid
// for use in TLS.
func EncodeX509Chain(certs []*x509.Certificate) ([]byte, error) {
if len(certs) == 0 {
return nil, errors.New("no certificates in chain")
}
certPEM := bytes.NewBuffer([]byte{})
for _, cert := range certs {
if cert == nil {
continue
}
if cert.CheckSignatureFrom(cert) == nil {
// Don't include self-signed certificate
continue
}
err := pem.Encode(certPEM, &pem.Block{Type: "CERTIFICATE", Bytes: cert.Raw})
if err != nil {
return nil, err
}
}
return certPEM.Bytes(), nil
}
// PublicKeysEqual compares two given public keys for equality.
// The definition of "equality" depends on the type of the public keys.
// Returns true if the keys are the same, false if they differ or an error if
// the key type of `a` cannot be determined.
func PublicKeysEqual(a, b crypto.PublicKey) (bool, error) {
switch pub := a.(type) {
case *rsa.PublicKey:
return pub.Equal(b), nil
case *ecdsa.PublicKey:
return pub.Equal(b), nil
case ed25519.PublicKey:
return pub.Equal(b), nil
default:
return false, fmt.Errorf("unrecognised public key type: %T", a)
}
}

View File

@ -16,12 +16,10 @@ package security
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"sync/atomic"
"time"
@ -30,19 +28,18 @@ import (
"github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/utils/clock"
"github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/dapr/pkg/modes"
"github.com/dapr/kit/concurrency"
"github.com/dapr/kit/fswatcher"
"github.com/dapr/kit/crypto/spiffe"
spiffecontext "github.com/dapr/kit/crypto/spiffe/context"
"github.com/dapr/kit/crypto/spiffe/trustanchors"
"github.com/dapr/kit/logger"
)
var log = logger.NewLogger("dapr.runtime.security")
type RequestFn func(ctx context.Context, der []byte) ([]*x509.Certificate, error)
// Handler implements middleware for client and server connection security.
//
//nolint:interfacebloat
@ -58,7 +55,8 @@ type Handler interface {
ControlPlaneTrustDomain() spiffeid.TrustDomain
ControlPlaneNamespace() string
CurrentTrustAnchors() ([]byte, error)
CurrentTrustAnchors(context.Context) ([]byte, error)
WithSVIDContext(context.Context) context.Context
MTLSEnabled() bool
WatchTrustAnchors(context.Context, chan<- []byte)
@ -91,7 +89,7 @@ type Options struct {
// TrustAnchorsFile is the path to the X.509 PEM encoded CA certificates for
// this Dapr installation. Prefer this over TrustAnchors so changes to the
// file are automatically picked up. Cannot be used with TrustAnchors.
TrustAnchorsFile string
TrustAnchorsFile *string
// AppID is the application ID of this workload.
AppID string
@ -99,9 +97,9 @@ type Options struct {
// MTLS is true if mTLS is enabled.
MTLSEnabled bool
// OverrideCertRequestSource is used to override where certificates are requested
// OverrideCertRequestFn is used to override where certificates are requested
// from. Default to an implementation requesting from Sentry.
OverrideCertRequestSource RequestFn
OverrideCertRequestFn spiffe.RequestSVIDFn
// Mode is the operation mode of this security instance (self-hosted or
// Kubernetes).
@ -113,15 +111,9 @@ type Options struct {
}
type provider struct {
sec *security
running atomic.Bool
readyCh chan struct{}
trustAnchorsFile string
// fswatcherInterval is the interval at which the trust anchors file changes
// are batched. Used for testing only, and 500ms otherwise.
fswatcherInterval time.Duration
sec *security
running atomic.Bool
readyCh chan struct{}
}
// security implements the Security interface.
@ -129,8 +121,9 @@ type security struct {
controlPlaneTrustDomain spiffeid.TrustDomain
controlPlaneNamespace string
source *x509source
mtls bool
trustAnchors trustanchors.Interface
spiffe *spiffe.SPIFFE
mtls bool
}
func New(ctx context.Context, opts Options) (Provider, error) {
@ -138,7 +131,7 @@ func New(ctx context.Context, opts Options) (Provider, error) {
return nil, errors.New("control plane trust domain is required")
}
td, err := spiffeid.TrustDomainFromString(opts.ControlPlaneTrustDomain)
cptd, err := spiffeid.TrustDomainFromString(opts.ControlPlaneTrustDomain)
if err != nil {
return nil, fmt.Errorf("invalid control plane trust domain: %w", err)
}
@ -146,33 +139,51 @@ func New(ctx context.Context, opts Options) (Provider, error) {
// Always request certificates from Sentry if mTLS is enabled or running in
// Kubernetes. In Kubernetes, Daprd always communicates mTLS with the control
// plane.
var source *x509source
var spf *spiffe.SPIFFE
var trustAnchors trustanchors.Interface
if opts.MTLSEnabled || opts.Mode == modes.KubernetesMode {
if len(opts.TrustAnchors) > 0 && len(opts.TrustAnchorsFile) > 0 {
if len(opts.TrustAnchors) > 0 && opts.TrustAnchorsFile != nil {
return nil, errors.New("trust anchors cannot be specified in both TrustAnchors and TrustAnchorsFile")
}
if len(opts.TrustAnchors) == 0 && len(opts.TrustAnchorsFile) == 0 {
if len(opts.TrustAnchors) == 0 && opts.TrustAnchorsFile == nil {
return nil, errors.New("trust anchors are required")
}
var err error
source, err = newX509Source(ctx, clock.RealClock{}, td, opts)
if err != nil {
return nil, err
switch {
case len(opts.TrustAnchors) > 0:
trustAnchors, err = trustanchors.FromStatic(opts.TrustAnchors)
if err != nil {
return nil, err
}
case opts.TrustAnchorsFile != nil:
trustAnchors = trustanchors.FromFile(trustanchors.OptionsFile{
Log: log,
Path: *opts.TrustAnchorsFile,
})
}
var reqFn spiffe.RequestSVIDFn
if opts.OverrideCertRequestFn != nil {
reqFn = opts.OverrideCertRequestFn
} else {
reqFn, err = newRequestFn(opts, trustAnchors, cptd)
if err != nil {
return nil, err
}
}
spf = spiffe.New(spiffe.Options{Log: log, RequestSVIDFn: reqFn})
} else {
log.Warn("mTLS is disabled. Skipping certificate request and tls validation")
}
return &provider{
fswatcherInterval: time.Millisecond * 500,
readyCh: make(chan struct{}),
trustAnchorsFile: opts.TrustAnchorsFile,
readyCh: make(chan struct{}),
sec: &security{
source: source,
trustAnchors: trustAnchors,
spiffe: spf,
mtls: opts.MTLSEnabled,
controlPlaneTrustDomain: td,
controlPlaneTrustDomain: cptd,
controlPlaneNamespace: opts.ControlPlaneNamespace,
},
}, nil
@ -185,69 +196,26 @@ func (p *provider) Run(ctx context.Context) error {
return errors.New("security provider already started")
}
// If the security source has not been initialized, then just wait to exit.
if p.sec.source == nil {
// If spiffe has not been initialized, then just wait to exit.
if p.sec.spiffe == nil {
close(p.readyCh)
<-ctx.Done()
return nil
}
if p.sec.source.requestFn == nil {
p.sec.source.requestFn = p.sec.source.requestFromSentry
log.Infof("Fetching initial identity certificate from %s", p.sec.source.sentryAddress)
}
initialCert, err := p.sec.source.renewIdentityCertificate(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve the initial identity certificate: %w", err)
}
mngr := concurrency.NewRunnerManager(func(ctx context.Context) error {
p.sec.source.startRotation(ctx, p.sec.source.renewIdentityCertificate, initialCert)
return nil
})
if len(p.trustAnchorsFile) > 0 {
caEvent := make(chan struct{})
fs, err := fswatcher.New(fswatcher.Options{
Targets: []string{filepath.Dir(p.trustAnchorsFile)},
Interval: &p.fswatcherInterval,
})
if err != nil {
return err
}
err = mngr.Add(
func(ctx context.Context) error {
log.Infof("Watching trust anchors file '%s' for changes", p.trustAnchorsFile)
return fs.Run(ctx, caEvent)
},
func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-caEvent:
log.Info("Trust anchors file changed, reloading trust anchors")
if uErr := p.sec.source.updateTrustAnchorFromFile(ctx, p.trustAnchorsFile); uErr != nil {
log.Errorf("Failed to read trust anchors file '%s': %v", p.trustAnchorsFile, uErr)
}
}
}
},
)
if err != nil {
return err
}
}
diagnostics.DefaultMonitoring.MTLSInitCompleted()
close(p.readyCh)
log.Infof("Security is initialized successfully")
return mngr.Run(ctx)
return concurrency.NewRunnerManager(
p.sec.spiffe.Run,
p.sec.trustAnchors.Run,
func(ctx context.Context) error {
if err := p.sec.spiffe.Ready(ctx); err != nil {
return err
}
close(p.readyCh)
diagnostics.DefaultMonitoring.MTLSInitCompleted()
<-ctx.Done()
return nil
},
).Run(ctx)
}
// Handler returns a ready handler from the security provider. Blocks until
@ -268,12 +236,12 @@ func (s *security) GRPCDialOptionMTLS(appID spiffeid.ID) grpc.DialOption {
// option. We don't check on `mtls` here as we still want to use mTLS with
// control plane peers when running in Kubernetes mode even if mTLS is
// disabled.
if s.source == nil {
if s.spiffe == nil {
return grpc.WithTransportCredentials(insecure.NewCredentials())
}
return grpc.WithTransportCredentials(
grpccredentials.MTLSClientCredentials(s.source, s.source, tlsconfig.AuthorizeID(appID)),
grpccredentials.MTLSClientCredentials(s.spiffe.SVIDSource(), s.trustAnchors, tlsconfig.AuthorizeID(appID)),
)
}
@ -287,7 +255,7 @@ func (s *security) GRPCServerOptionMTLS() grpc.ServerOption {
return grpc.Creds(
// TODO: It would be better if we could give a subset of trust domains in
// which this server authorizes.
grpccredentials.MTLSServerCredentials(s.source, s.source, tlsconfig.AuthorizeAny()),
grpccredentials.MTLSServerCredentials(s.spiffe.SVIDSource(), s.trustAnchors, tlsconfig.AuthorizeAny()),
)
}
@ -295,7 +263,7 @@ func (s *security) GRPCServerOptionMTLS() grpc.ServerOption {
// authentication of clients using the current trust anchors. Doesn't require
// clients to present a certificate.
func (s *security) GRPCServerOptionNoClientAuth() grpc.ServerOption {
return grpc.Creds(grpccredentials.TLSServerCredentials(s.source))
return grpc.Creds(grpccredentials.TLSServerCredentials(s.spiffe.SVIDSource()))
}
// GRPCDialOptionMTLSUnknownTrustDomain returns a gRPC dial option which
@ -317,22 +285,25 @@ func (s *security) GRPCDialOptionMTLSUnknownTrustDomain(ns, appID string) grpc.D
}
return grpc.WithTransportCredentials(
grpccredentials.MTLSClientCredentials(s.source, s.source, tlsconfig.AdaptMatcher(matcher)),
grpccredentials.MTLSClientCredentials(s.spiffe.SVIDSource(), s.trustAnchors, tlsconfig.AdaptMatcher(matcher)),
)
}
// CurrentTrustAnchors returns the current trust anchors for this Dapr
// installation.
func (s *security) CurrentTrustAnchors() ([]byte, error) {
if s.source == nil {
func (s *security) CurrentTrustAnchors(ctx context.Context) ([]byte, error) {
if s.spiffe == nil {
return nil, nil
}
ta, err := s.source.trustAnchors.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal trust anchors: %w", err)
}
return ta, nil
return s.trustAnchors.CurrentTrustAnchors(ctx)
}
// WatchTrustAnchors watches for changes to the trust domains and returns the
// PEM encoded trust domain roots.
// Returns when the given context is canceled.
func (s *security) WatchTrustAnchors(ctx context.Context, trustAnchors chan<- []byte) {
s.trustAnchors.Watch(ctx, trustAnchors)
}
// ControlPlaneTrustDomain returns the trust domain of the control plane.
@ -345,39 +316,11 @@ func (s *security) ControlPlaneNamespace() string {
return s.controlPlaneNamespace
}
// WatchTrustAnchors watches for changes to the trust domains and returns the
// PEM encoded trust domain roots.
// Returns when the given context is canceled.
func (s *security) WatchTrustAnchors(ctx context.Context, trustAnchors chan<- []byte) {
sub := make(chan struct{})
s.source.lock.Lock()
s.source.trustAnchorSubscribers = append(s.source.trustAnchorSubscribers, sub)
s.source.lock.Unlock()
for {
select {
case <-ctx.Done():
return
case <-sub:
caBundle, err := s.CurrentTrustAnchors()
if err != nil {
log.Errorf("failed to marshal trust anchors: %s", err)
continue
}
select {
case trustAnchors <- caBundle:
case <-ctx.Done():
}
}
}
}
// TLSServerConfigNoClientAuth returns a TLS server config which instruments
// using the current signed server certificate. Authorizes client certificate
// chains against the trust anchors.
func (s *security) TLSServerConfigNoClientAuth() *tls.Config {
return tlsconfig.TLSServerConfig(s.source)
return tlsconfig.TLSServerConfig(s.spiffe.SVIDSource())
}
// NetListenerID returns a mTLS net listener which instruments using the
@ -388,7 +331,7 @@ func (s *security) NetListenerID(lis net.Listener, id spiffeid.ID) net.Listener
return lis
}
return tls.NewListener(lis,
tlsconfig.MTLSServerConfig(s.source, s.source, tlsconfig.AuthorizeID(id)),
tlsconfig.MTLSServerConfig(s.spiffe.SVIDSource(), s.trustAnchors, tlsconfig.AuthorizeID(id)),
)
}
@ -401,7 +344,7 @@ func (s *security) NetDialerID(ctx context.Context, spiffeID spiffeid.ID, timeou
}
return (&tls.Dialer{
NetDialer: (&net.Dialer{Timeout: timeout, Cancel: ctx.Done()}),
Config: tlsconfig.MTLSClientConfig(s.source, s.source, tlsconfig.AuthorizeID(spiffeID)),
Config: tlsconfig.MTLSClientConfig(s.spiffe.SVIDSource(), s.trustAnchors, tlsconfig.AuthorizeID(spiffeID)),
}).Dial
}
@ -443,3 +386,11 @@ func SentryID(sentryTrustDomain spiffeid.TrustDomain, sentryNamespace string) (s
return sentryID, nil
}
func (s *security) WithSVIDContext(ctx context.Context) context.Context {
if s.spiffe == nil {
return ctx
}
return spiffecontext.With(ctx, s.spiffe)
}

View File

@ -90,20 +90,17 @@ func Test_Start(t *testing.T) {
require.NoError(t, os.WriteFile(tdFile, root1, 0o600))
p, err := New(context.Background(), Options{
TrustAnchorsFile: tdFile,
TrustAnchorsFile: &tdFile,
AppID: "test",
ControlPlaneTrustDomain: "test.example.com",
ControlPlaneNamespace: "default",
MTLSEnabled: true,
OverrideCertRequestSource: func(context.Context, []byte) ([]*x509.Certificate, error) {
OverrideCertRequestFn: func(context.Context, []byte) ([]*x509.Certificate, error) {
return []*x509.Certificate{workloadCert}, nil
},
})
require.NoError(t, err)
// Override the default of 500ms to 0 to speed up the test.
p.(*provider).fswatcherInterval = 0
ctx, cancel := context.WithCancel(context.Background())
providerStopped := make(chan struct{})
@ -123,7 +120,7 @@ func Test_Start(t *testing.T) {
sec, err := p.Handler(ctx)
require.NoError(t, err)
td, err := sec.CurrentTrustAnchors()
td, err := sec.CurrentTrustAnchors(ctx)
require.NoError(t, err)
assert.Equal(t, root1, td)
@ -134,22 +131,18 @@ func Test_Start(t *testing.T) {
sec.WatchTrustAnchors(ctx, caBundleCh)
}()
assert.Eventually(t, func() bool {
prov.sec.source.lock.RLock()
defer prov.sec.source.lock.RUnlock()
return len(prov.sec.source.trustAnchorSubscribers) > 0
assert.EventuallyWithT(t, func(c *assert.CollectT) {
curr, err := prov.sec.trustAnchors.CurrentTrustAnchors(ctx)
require.NoError(t, err)
assert.Equal(c, root1, curr)
}, time.Second, time.Millisecond)
curr, err := prov.sec.source.trustAnchors.Marshal()
require.NoError(t, err)
require.Equal(t, root1, curr)
assert.Eventually(t, func() bool {
// We put the write file inside this assert loop since we have to wait
// for the fsnotify go rountine to warm up.
require.NoError(t, os.WriteFile(tdFile, root2, 0o600))
curr, err := prov.sec.source.trustAnchors.Marshal()
curr, err := prov.sec.trustAnchors.CurrentTrustAnchors(ctx)
require.NoError(t, err)
return bytes.Equal(root2, curr)
}, time.Second*5, time.Millisecond*750)
@ -201,3 +194,37 @@ func TestCurrentNamespace(t *testing.T) {
assert.Equal(t, "foo", ns)
})
}
func Test_isControlPlaneService(t *testing.T) {
tests := map[string]struct {
name string
exp bool
}{
"operator should be control plane service": {
name: "dapr-operator",
exp: true,
},
"sentry should be control plane service": {
name: "dapr-sentry",
exp: true,
},
"placement should be control plane service": {
name: "dapr-placement",
exp: true,
},
"sidecar injector should be control plane service": {
name: "dapr-injector",
exp: true,
},
"not a control plane service": {
name: "my-app",
exp: false,
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, test.exp, isControlPlaneService(test.name))
})
}
}

158
pkg/security/sentry.go Normal file
View File

@ -0,0 +1,158 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package security
import (
"context"
"crypto/x509"
"encoding/pem"
"fmt"
"os"
"time"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/spiffe/go-spiffe/v2/spiffegrpc/grpccredentials"
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig"
"google.golang.org/grpc"
"github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/dapr/pkg/modes"
sentryv1pb "github.com/dapr/dapr/pkg/proto/sentry/v1"
sentryToken "github.com/dapr/dapr/pkg/security/token"
cryptopem "github.com/dapr/kit/crypto/pem"
"github.com/dapr/kit/crypto/spiffe"
"github.com/dapr/kit/crypto/spiffe/trustanchors"
)
const (
sentrySignTimeout = time.Second * 3
sentryMaxRetries = 5
)
func newRequestFn(opts Options, trustAnchors trustanchors.Interface, cptd spiffeid.TrustDomain) (spiffe.RequestSVIDFn, error) {
sentryID, err := SentryID(cptd, opts.ControlPlaneNamespace)
if err != nil {
return nil, err
}
var trustDomain *string
ns := CurrentNamespace()
// If the service is a control plane service, set the trust domain to the
// control plane trust domain.
if isControlPlaneService(opts.AppID) && opts.ControlPlaneNamespace == ns {
trustDomain = &opts.ControlPlaneTrustDomain
}
// return injected identity, default id if not present
sentryIdentifier := os.Getenv("SENTRY_LOCAL_IDENTITY")
if sentryIdentifier == "" {
sentryIdentifier = opts.AppID
}
sentryAddress := opts.SentryAddress
sentryTokenFile := opts.SentryTokenFile
kubernetesMode := opts.Mode == modes.KubernetesMode
fn := func(ctx context.Context, csrDER []byte) ([]*x509.Certificate, error) {
unaryClientInterceptor := retry.UnaryClientInterceptor(
retry.WithMax(sentryMaxRetries),
retry.WithPerRetryTimeout(sentrySignTimeout),
)
if diagnostics.DefaultGRPCMonitoring.IsEnabled() {
unaryClientInterceptor = middleware.ChainUnaryClient(
unaryClientInterceptor,
diagnostics.DefaultGRPCMonitoring.UnaryClientInterceptor(),
)
}
conn, err := grpc.DialContext(ctx,
sentryAddress,
grpc.WithTransportCredentials(
grpccredentials.TLSClientCredentials(trustAnchors, tlsconfig.AuthorizeID(sentryID)),
),
grpc.WithUnaryInterceptor(unaryClientInterceptor),
grpc.WithReturnConnectionError(),
)
if err != nil {
diagnostics.DefaultMonitoring.MTLSWorkLoadCertRotationFailed("sentry_conn")
return nil, fmt.Errorf("error establishing connection to sentry: %w", err)
}
defer conn.Close()
var token string
var tokenValidator sentryv1pb.SignCertificateRequest_TokenValidator
if sentryTokenFile != nil {
token, tokenValidator, err = sentryToken.GetSentryTokenFromFile(*sentryTokenFile)
} else {
token, tokenValidator, err = sentryToken.GetSentryToken(kubernetesMode)
}
if err != nil {
diagnostics.DefaultMonitoring.MTLSWorkLoadCertRotationFailed("sentry_token")
return nil, fmt.Errorf("error obtaining token: %w", err)
}
req := &sentryv1pb.SignCertificateRequest{
CertificateSigningRequest: pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE REQUEST", Bytes: csrDER,
}),
Id: sentryIdentifier,
Token: token,
Namespace: ns,
TokenValidator: tokenValidator,
}
if trustDomain != nil {
req.TrustDomain = *trustDomain
}
resp, err := sentryv1pb.NewCAClient(conn).SignCertificate(ctx, req)
if err != nil {
diagnostics.DefaultMonitoring.MTLSWorkLoadCertRotationFailed("sign")
return nil, fmt.Errorf("error from sentry SignCertificate: %w", err)
}
if err = resp.GetValidUntil().CheckValid(); err != nil {
diagnostics.DefaultMonitoring.MTLSWorkLoadCertRotationFailed("invalid_ts")
return nil, fmt.Errorf("error parsing ValidUntil: %w", err)
}
workloadcert, err := cryptopem.DecodePEMCertificates(resp.GetWorkloadCertificate())
if err != nil {
return nil, fmt.Errorf("error parsing newly signed certificate: %w", err)
}
return workloadcert, nil
}
return fn, nil
}
// isControlPlaneService returns true if the app ID corresponds to a Dapr
// control plane service.
func isControlPlaneService(id string) bool {
switch id {
case "dapr-operator",
"dapr-placement",
"dapr-injector",
"dapr-sentry":
return true
default:
return false
}
}

View File

@ -1,406 +0,0 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package security
import (
"context"
"crypto"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"os"
"sync"
"time"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/spiffe/go-spiffe/v2/bundle/x509bundle"
"github.com/spiffe/go-spiffe/v2/spiffegrpc/grpccredentials"
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig"
"github.com/spiffe/go-spiffe/v2/svid/x509svid"
"google.golang.org/grpc"
"k8s.io/utils/clock"
"github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/dapr/pkg/modes"
sentryv1pb "github.com/dapr/dapr/pkg/proto/sentry/v1"
secpem "github.com/dapr/dapr/pkg/security/pem"
sentryToken "github.com/dapr/dapr/pkg/security/token"
)
const (
sentrySignTimeout = time.Second * 3
sentryMaxRetries = 5
)
type renewFn func(context.Context) (*x509.Certificate, error)
// x509source implements the go-spiffe x509 source interface.
// We use a custom source as our SPIFFE ID's come from the Sentry API and not
// the SPIFFE Workload API (SPIRE).
type x509source struct {
currentSVID *x509svid.SVID
// sentryAddress is the network address of the sentry server.
sentryAddress string
// sentryID is the SPIFFE ID of the sentry server which is validated when
// request the identity document.
sentryID spiffeid.ID
// trustAnchors is the set of trusted root certificates of the dapr cluster.
trustAnchors *x509bundle.Bundle
// appID is the self selected APP ID of this Dapr instance.
appID string
// appNamespace is the dapr namespace this app belongs to.
appNamespace string
// kubernetesMode is true if Dapr is running in Kubernetes mode.
kubernetesMode bool
// requestFn is the function used to request the identity document from a
// remote server. Used for overriding requesting from Sentry.
requestFn RequestFn
// trustAnchorSubscribers is a list of channels to notify when the trust
// anchors are updated.
trustAnchorSubscribers []chan<- struct{}
// trustDomain is the optional trust domain which will be set when requesting
// the identity certificate. Used by control plane services to request for
// the control plane trust domain.
trustDomain *string
// sentryTokenFile is the optional file path to the sentry token file.
sentryTokenFile *string
lock sync.RWMutex
clock clock.Clock
}
func newX509Source(ctx context.Context, clock clock.Clock, cptd spiffeid.TrustDomain, opts Options) (*x509source, error) {
rootPEMs := opts.TrustAnchors
if len(rootPEMs) == 0 {
for {
_, err := os.Stat(opts.TrustAnchorsFile)
if err == nil {
break
}
if !errors.Is(err, os.ErrNotExist) {
return nil, err
}
// Trust anchors file not be provided yet, wait.
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-clock.After(time.Second):
log.Warnf("Trust anchors file '%s' not found, waiting...", opts.TrustAnchorsFile)
}
}
log.Infof("Trust anchors file '%s' found", opts.TrustAnchorsFile)
var err error
rootPEMs, err = os.ReadFile(opts.TrustAnchorsFile)
if err != nil {
return nil, fmt.Errorf("failed to read trust anchors file '%s': %w", opts.TrustAnchorsFile, err)
}
}
trustAnchorCerts, err := secpem.DecodePEMCertificates(rootPEMs)
if err != nil {
return nil, fmt.Errorf("failed to decode trust anchors: %w", err)
}
sentryID, err := SentryID(cptd, opts.ControlPlaneNamespace)
if err != nil {
return nil, err
}
var trustDomain *string
ns := CurrentNamespace()
// If the service is a control plane service, set the trust domain to the
// control plane trust domain.
if isControlPlaneService(opts.AppID) && opts.ControlPlaneNamespace == ns {
trustDomain = &opts.ControlPlaneTrustDomain
}
return &x509source{
sentryAddress: opts.SentryAddress,
sentryID: sentryID,
trustAnchors: x509bundle.FromX509Authorities(sentryID.TrustDomain(), trustAnchorCerts),
appID: opts.AppID,
appNamespace: ns,
trustDomain: trustDomain,
kubernetesMode: opts.Mode == modes.KubernetesMode,
requestFn: opts.OverrideCertRequestSource,
clock: clock,
sentryTokenFile: opts.SentryTokenFile,
}, nil
}
// GetX509SVID returns the current X.509 certificate identity as a SPIFFE SVID.
// Implements the go-spiffe x509 source interface.
func (x *x509source) GetX509SVID() (*x509svid.SVID, error) {
x.lock.RLock()
defer x.lock.RUnlock()
return x.currentSVID, nil
}
// GetX509BundleForTrustDomain returns the static Trust Bundle for the Dapr
// cluster.
// Dapr does not support trust bundles for multiple trust domains.
// Implements the go-spiffe x509 bundle source interface.
func (x *x509source) GetX509BundleForTrustDomain(_ spiffeid.TrustDomain) (*x509bundle.Bundle, error) {
x.lock.RLock()
defer x.lock.RUnlock()
return x.trustAnchors, nil
}
// startRotation starts up the manager responsible for renewing the workload
// certificate. Receives the initial certificate to calculate the next
// rotation time.
func (x *x509source) startRotation(ctx context.Context, fn renewFn, cert *x509.Certificate) {
defer log.Debug("stopping workload cert expiry watcher")
renewTime := renewalTime(cert.NotBefore, cert.NotAfter)
log.Infof("Starting workload cert expiry watcher; current cert expires on: %s, renewing at %s",
cert.NotAfter.String(), renewTime.String())
for {
select {
case <-x.clock.After(min(time.Minute, renewTime.Sub(x.clock.Now()))):
if x.clock.Now().Before(renewTime) {
continue
}
log.Infof("Renewing workload cert; current cert expires on: %s", cert.NotAfter.String())
newCert, err := fn(ctx)
if err != nil {
log.Errorf("Error renewing identity certificate, trying again in 10 seconds: %v", err)
select {
case <-x.clock.After(10 * time.Second):
continue
case <-ctx.Done():
return
}
}
cert = newCert
renewTime = renewalTime(cert.NotBefore, cert.NotAfter)
log.Infof("Successfully renewed workload cert; new cert expires on: %s", cert.NotAfter.String())
case <-ctx.Done():
return
}
}
}
// renewIdentityCertificate renews the identity certificate for the workload.
func (x *x509source) renewIdentityCertificate(ctx context.Context) (*x509.Certificate, error) {
csrDER, pk, err := generateCSRAndPrivateKey(x.appID)
if err != nil {
return nil, err
}
workloadcert, err := x.requestFn(ctx, csrDER)
if err != nil {
return nil, err
}
if len(workloadcert) == 0 {
return nil, errors.New("no certificates received from sentry")
}
spiffeID, err := x509svid.IDFromCert(workloadcert[0])
if err != nil {
return nil, fmt.Errorf("error parsing spiffe id from newly signed certificate: %w", err)
}
x.lock.Lock()
defer x.lock.Unlock()
x.currentSVID = &x509svid.SVID{
ID: spiffeID,
Certificates: workloadcert,
PrivateKey: pk,
}
return workloadcert[0], nil
}
func generateCSRAndPrivateKey(id string) ([]byte, crypto.Signer, error) {
if id == "" {
return nil, nil, errors.New("id must not be empty")
}
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
diagnostics.DefaultMonitoring.MTLSInitFailed("prikeygen")
return nil, nil, fmt.Errorf("failed to generate private key: %w", err)
}
csrDER, err := x509.CreateCertificateRequest(rand.Reader,
&x509.CertificateRequest{
Subject: pkix.Name{CommonName: id},
DNSNames: []string{id},
}, key)
if err != nil {
diagnostics.DefaultMonitoring.MTLSInitFailed("csr")
return nil, nil, fmt.Errorf("failed to create sidecar csr: %w", err)
}
return csrDER, key, nil
}
func (x *x509source) requestFromSentry(ctx context.Context, csrDER []byte) ([]*x509.Certificate, error) {
unaryClientInterceptor := retry.UnaryClientInterceptor(
retry.WithMax(sentryMaxRetries),
retry.WithPerRetryTimeout(sentrySignTimeout),
)
if diagnostics.DefaultGRPCMonitoring.IsEnabled() {
unaryClientInterceptor = middleware.ChainUnaryClient(
unaryClientInterceptor,
diagnostics.DefaultGRPCMonitoring.UnaryClientInterceptor(),
)
}
conn, err := grpc.DialContext(ctx,
x.sentryAddress,
grpc.WithTransportCredentials(
grpccredentials.MTLSClientCredentials(x, x, tlsconfig.AuthorizeID(x.sentryID)),
),
grpc.WithUnaryInterceptor(unaryClientInterceptor),
grpc.WithReturnConnectionError(),
)
if err != nil {
diagnostics.DefaultMonitoring.MTLSWorkLoadCertRotationFailed("sentry_conn")
return nil, fmt.Errorf("error establishing connection to sentry: %w", err)
}
defer conn.Close()
var token string
var tokenValidator sentryv1pb.SignCertificateRequest_TokenValidator
if x.sentryTokenFile != nil {
token, tokenValidator, err = sentryToken.GetSentryTokenFromFile(*x.sentryTokenFile)
} else {
token, tokenValidator, err = sentryToken.GetSentryToken(x.kubernetesMode)
}
if err != nil {
diagnostics.DefaultMonitoring.MTLSWorkLoadCertRotationFailed("sentry_token")
return nil, fmt.Errorf("error obtaining token: %w", err)
}
req := &sentryv1pb.SignCertificateRequest{
CertificateSigningRequest: pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE REQUEST", Bytes: csrDER,
}),
Id: getSentryIdentifier(x.appID),
Token: token,
Namespace: x.appNamespace,
TokenValidator: tokenValidator,
}
if x.trustDomain != nil {
req.TrustDomain = *x.trustDomain
}
resp, err := sentryv1pb.NewCAClient(conn).SignCertificate(ctx, req)
if err != nil {
diagnostics.DefaultMonitoring.MTLSWorkLoadCertRotationFailed("sign")
return nil, fmt.Errorf("error from sentry SignCertificate: %w", err)
}
if err = resp.GetValidUntil().CheckValid(); err != nil {
diagnostics.DefaultMonitoring.MTLSWorkLoadCertRotationFailed("invalid_ts")
return nil, fmt.Errorf("error parsing ValidUntil: %w", err)
}
workloadcert, err := secpem.DecodePEMCertificates(resp.GetWorkloadCertificate())
if err != nil {
return nil, fmt.Errorf("error parsing newly signed certificate: %w", err)
}
return workloadcert, nil
}
func (x *x509source) updateTrustAnchorFromFile(ctx context.Context, filepath string) error {
x.lock.RLock()
defer x.lock.RUnlock()
rootPEMs, err := os.ReadFile(filepath)
if err != nil {
return fmt.Errorf("failed to read trust anchors file '%s': %w", filepath, err)
}
trustAnchorCerts, err := secpem.DecodePEMCertificates(rootPEMs)
if err != nil {
return fmt.Errorf("failed to decode trust anchors: %w", err)
}
x.trustAnchors.SetX509Authorities(trustAnchorCerts)
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(len(x.trustAnchorSubscribers))
for _, ch := range x.trustAnchorSubscribers {
go func(chi chan<- struct{}) {
defer wg.Done()
select {
case chi <- struct{}{}:
case <-ctx.Done():
}
}(ch)
}
return nil
}
// renewalTime is 70% through the certificate validity period.
func renewalTime(notBefore, notAfter time.Time) time.Time {
return notBefore.Add(notAfter.Sub(notBefore) * 7 / 10)
}
// isControlPlaneService returns true if the app ID corresponds to a Dapr
// control plane service.
func isControlPlaneService(id string) bool {
switch id {
case "dapr-operator",
"dapr-placement",
"dapr-injector",
"dapr-sentry":
return true
default:
return false
}
}
func getSentryIdentifier(appID string) string {
// return injected identity, default id if not present
localID := os.Getenv("SENTRY_LOCAL_IDENTITY")
if localID != "" {
return localID
}
return appID
}

View File

@ -1,241 +0,0 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package security
import (
"context"
"crypto/x509"
"sync/atomic"
"testing"
"time"
"github.com/spiffe/go-spiffe/v2/bundle/x509bundle"
"github.com/spiffe/go-spiffe/v2/svid/x509svid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clocktesting "k8s.io/utils/clock/testing"
)
func TestValidator(t *testing.T) {
var _ x509svid.Source = &x509source{}
var _ x509bundle.Source = &x509source{}
}
func TestStartRotation(t *testing.T) {
t.Run("if context is cancelled, expect return", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
clock := clocktesting.NewFakeClock(time.Now())
x := &x509source{clock: clock}
ch := make(chan struct{})
go func() {
defer close(ch)
x.startRotation(ctx, func(context.Context) (*x509.Certificate, error) {
t.Error("unexpected call")
return nil, nil
}, &x509.Certificate{NotBefore: time.Now(), NotAfter: time.Now().Add(time.Hour)})
}()
select {
case <-time.After(time.Second):
t.Fatal("expected return")
case <-ch:
}
})
t.Run("if expiry is in the past, expect renewal to be called", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
clock := clocktesting.NewFakeClock(time.Now())
x := &x509source{clock: clock}
ch, renewCalled := make(chan struct{}), make(chan struct{})
go func() {
defer close(ch)
x.startRotation(ctx, func(context.Context) (*x509.Certificate, error) {
close(renewCalled)
return &x509.Certificate{NotBefore: clock.Now(), NotAfter: clock.Now().Add(time.Hour)}, nil
}, &x509.Certificate{NotBefore: clock.Now().Add(-2 * time.Hour), NotAfter: clock.Now().Add(-time.Hour)})
}()
assert.Eventually(t, clock.HasWaiters, time.Second, time.Millisecond)
// Step 0 triggers waiters but doesn't move the time forward.
clock.Step(0)
select {
case <-time.After(time.Second):
t.Fatal("expected renewal to be called")
case <-renewCalled:
cancel()
}
select {
case <-time.After(time.Second):
t.Fatal("expected return")
case <-ch:
}
})
t.Run("if expiry is 1 second in the future, expect renew to be called before 3 seconds", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
clock := clocktesting.NewFakeClock(time.Now())
x := &x509source{clock: clock}
ch, renewCalled := make(chan struct{}), make(chan struct{})
go func() {
defer close(ch)
x.startRotation(ctx, func(context.Context) (*x509.Certificate, error) {
close(renewCalled)
return &x509.Certificate{NotBefore: clock.Now(), NotAfter: clock.Now().Add(time.Hour)}, nil
}, &x509.Certificate{NotBefore: clock.Now(), NotAfter: clock.Now().Add(time.Second)})
}()
assert.Eventually(t, clock.HasWaiters, time.Second, time.Millisecond)
clock.Step(time.Second)
select {
case <-time.After(3 * time.Second):
t.Fatal("expected renewal to be called")
case <-renewCalled:
cancel()
}
select {
case <-time.After(time.Second):
t.Fatal("expected return")
case <-ch:
}
})
t.Run("if expiry is 5 seconds in the future, don't expect renew to be called for first 2 seconds", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
clock := clocktesting.NewFakeClock(time.Now())
x := &x509source{clock: clock}
ch := make(chan struct{})
go func() {
defer close(ch)
x.startRotation(ctx, func(context.Context) (*x509.Certificate, error) {
assert.Fail(t, "unexpected renewal to be called")
return nil, nil
}, &x509.Certificate{NotBefore: clock.Now(), NotAfter: clock.Now().Add(time.Second * 5)})
}()
clock.Step(time.Second * 2)
cancel()
select {
case <-time.After(time.Second):
t.Fatal("expected return")
case <-ch:
}
})
t.Run("renewed certificate should be renewed again when appropriate", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
clock := clocktesting.NewFakeClock(time.Now())
x := &x509source{clock: clock}
ch := make(chan struct{})
var i atomic.Int32
go func() {
defer close(ch)
x.startRotation(ctx, func(context.Context) (*x509.Certificate, error) {
i.Add(1)
return &x509.Certificate{NotBefore: clock.Now(), NotAfter: clock.Now().Add(time.Second / 4)}, nil
}, &x509.Certificate{NotBefore: clock.Now(), NotAfter: clock.Now().Add(time.Second / 4)})
}()
assert.Eventually(t, clock.HasWaiters, time.Second, time.Millisecond)
require.Eventually(t, func() bool {
clock.Step(time.Second / 4)
return i.Load() == 3
}, time.Second, time.Millisecond)
cancel()
select {
case <-time.After(time.Second):
t.Fatal("expected return")
case <-ch:
}
})
}
func Test_renewalTime(t *testing.T) {
now := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
tests := map[string]struct {
notBefore time.Time
notAfter time.Time
expected time.Time
}{
"not before is in the future": {
notBefore: now.Add(time.Hour),
notAfter: now.Add(2 * time.Hour),
expected: now.Add(time.Hour * 17 / 10),
},
"not before is in the past": {
notBefore: now.Add(-time.Hour),
notAfter: now.Add(time.Hour),
expected: now.Add(time.Hour * 4 / 10),
},
"not before is now": {
notBefore: now,
notAfter: now.Add(time.Hour),
expected: now.Add(time.Hour * 7 / 10),
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
actual := renewalTime(test.notBefore, test.notAfter)
assert.Equal(t, test.expected, actual)
})
}
}
func Test_isControlPlaneService(t *testing.T) {
tests := map[string]struct {
name string
exp bool
}{
"operator should be control plane service": {
name: "dapr-operator",
exp: true,
},
"sentry should be control plane service": {
name: "dapr-sentry",
exp: true,
},
"placement should be control plane service": {
name: "dapr-placement",
exp: true,
},
"sidecar injector should be control plane service": {
name: "dapr-injector",
exp: true,
},
"not a control plane service": {
name: "my-app",
exp: false,
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, test.exp, isControlPlaneService(test.name))
})
}
}

View File

@ -84,7 +84,7 @@ func (s *sentry) Start(parentCtx context.Context) error {
TrustAnchors: camngr.TrustAnchors(),
MTLSEnabled: true,
// Override the request source to our in memory CA since _we_ are sentry!
OverrideCertRequestSource: func(ctx context.Context, csrDER []byte) ([]*x509.Certificate, error) {
OverrideCertRequestFn: func(ctx context.Context, csrDER []byte) ([]*x509.Certificate, error) {
csr, csrErr := x509.ParseCertificateRequest(csrDER)
if csrErr != nil {
monitoring.ServerCertIssueFailed("invalid_csr")

View File

@ -26,8 +26,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/dapr/pkg/security/pem"
"github.com/dapr/dapr/pkg/sentry/config"
"github.com/dapr/kit/crypto/pem"
)
func TestNew(t *testing.T) {

View File

@ -18,7 +18,7 @@ import (
"errors"
"fmt"
"github.com/dapr/dapr/pkg/security/pem"
"github.com/dapr/kit/crypto/pem"
)
// verifyBundle verifies issuer certificate key pair, and trust anchor set.

View File

@ -27,10 +27,10 @@ import (
sentryv1pb "github.com/dapr/dapr/pkg/proto/sentry/v1"
"github.com/dapr/dapr/pkg/security"
secpem "github.com/dapr/dapr/pkg/security/pem"
"github.com/dapr/dapr/pkg/sentry/monitoring"
"github.com/dapr/dapr/pkg/sentry/server/ca"
"github.com/dapr/dapr/pkg/sentry/server/validator"
secpem "github.com/dapr/kit/crypto/pem"
"github.com/dapr/kit/logger"
)

View File

@ -35,7 +35,7 @@ import (
"github.com/dapr/dapr/tests/e2e/utils"
kube "github.com/dapr/dapr/tests/platforms/kubernetes"
"github.com/dapr/dapr/tests/runner"
"github.com/dapr/dapr/tests/util"
cryptotest "github.com/dapr/kit/crypto/test"
kitUtils "github.com/dapr/kit/utils"
apiv1 "k8s.io/api/core/v1"
)
@ -81,7 +81,7 @@ func TestMain(m *testing.M) {
utils.SetupLogs("service_invocation")
utils.InitHTTPClient(false)
pki, err := util.GenPKIError(util.PKIOptions{
pki, err := cryptotest.GenPKIError(cryptotest.PKIOptions{
LeafDNS: "service-invocation-external",
})
if err != nil {

View File

@ -33,10 +33,10 @@ import (
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"sigs.k8s.io/yaml"
securitypem "github.com/dapr/dapr/pkg/security/pem"
"github.com/dapr/dapr/pkg/sentry/server/ca"
prochttp "github.com/dapr/dapr/tests/integration/framework/process/http"
"github.com/dapr/dapr/tests/integration/framework/process/kubernetes/informer"
cryptopem "github.com/dapr/kit/crypto/pem"
)
const (
@ -130,9 +130,9 @@ func New(t *testing.T, fopts ...Option) *Kubernetes {
leafCert, err = x509.ParseCertificate(leafCertDER)
require.NoError(t, err)
chainPEM, err := securitypem.EncodeX509Chain(append([]*x509.Certificate{leafCert}, bundle.IssChain...))
chainPEM, err := cryptopem.EncodeX509Chain(append([]*x509.Certificate{leafCert}, bundle.IssChain...))
require.NoError(t, err)
keyPEM, err := securitypem.EncodePrivateKey(leafpk)
keyPEM, err := cryptopem.EncodePrivateKey(leafpk)
require.NoError(t, err)
return &Kubernetes{

View File

@ -35,6 +35,7 @@ import (
"github.com/dapr/dapr/tests/integration/framework/process/ports"
"github.com/dapr/dapr/tests/integration/framework/process/sentry"
"github.com/dapr/dapr/tests/integration/framework/util"
"github.com/dapr/kit/ptr"
)
type Operator struct {
@ -147,7 +148,7 @@ func (o *Operator) Dial(t *testing.T, ctx context.Context, sentry *sentry.Sentry
SentryAddress: "localhost:" + strconv.Itoa(sentry.Port()),
ControlPlaneTrustDomain: "integration.test.dapr.io",
ControlPlaneNamespace: o.namespace,
TrustAnchorsFile: sentry.TrustAnchorsFile(t),
TrustAnchorsFile: ptr.Of(sentry.TrustAnchorsFile(t)),
AppID: appID,
Mode: modes.StandaloneMode,
MTLSEnabled: true,

View File

@ -32,7 +32,7 @@ import (
prochttp "github.com/dapr/dapr/tests/integration/framework/process/http"
"github.com/dapr/dapr/tests/integration/framework/util"
"github.com/dapr/dapr/tests/integration/suite"
testsutil "github.com/dapr/dapr/tests/util"
cryptotest "github.com/dapr/kit/crypto/test"
)
func init() {
@ -46,8 +46,8 @@ type httpendpoints struct {
}
func (h *httpendpoints) Setup(t *testing.T) []framework.Option {
pki1 := testsutil.GenPKI(t, testsutil.PKIOptions{LeafDNS: "localhost"})
pki2 := testsutil.GenPKI(t, testsutil.PKIOptions{LeafDNS: "localhost"})
pki1 := cryptotest.GenPKI(t, cryptotest.PKIOptions{LeafDNS: "localhost"})
pki2 := cryptotest.GenPKI(t, cryptotest.PKIOptions{LeafDNS: "localhost"})
newHTTPServer := func() *prochttp.HTTP {
handler := http.NewServeMux()

View File

@ -27,12 +27,12 @@ import (
"github.com/stretchr/testify/require"
sentrypbv1 "github.com/dapr/dapr/pkg/proto/sentry/v1"
secpem "github.com/dapr/dapr/pkg/security/pem"
"github.com/dapr/dapr/pkg/sentry/server/ca"
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/exec"
"github.com/dapr/dapr/tests/integration/framework/process/sentry"
"github.com/dapr/dapr/tests/integration/suite"
secpem "github.com/dapr/kit/crypto/pem"
)
func init() {

View File

@ -1,239 +0,0 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieh.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"crypto"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"net"
"net/url"
"testing"
"time"
"github.com/spiffe/go-spiffe/v2/bundle/x509bundle"
"github.com/spiffe/go-spiffe/v2/spiffegrpc/grpccredentials"
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig"
"github.com/spiffe/go-spiffe/v2/svid/x509svid"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/examples/helloworld/helloworld"
"google.golang.org/grpc/peer"
)
type PKIOptions struct {
LeafDNS string
LeafID spiffeid.ID
ClientDNS string
ClientID spiffeid.ID
}
type PKI struct {
RootCertPEM []byte
RootCert *x509.Certificate
LeafCert *x509.Certificate
LeafCertPEM []byte
LeafPKPEM []byte
LeafPK crypto.Signer
ClientCertPEM []byte
ClientCert *x509.Certificate
ClientPKPEM []byte
ClientPK crypto.Signer
leafID spiffeid.ID
clientID spiffeid.ID
}
func GenPKI(t *testing.T, opts PKIOptions) PKI {
t.Helper()
pki, err := GenPKIError(opts)
require.NoError(t, err)
return pki
}
func GenPKIError(opts PKIOptions) (PKI, error) {
rootPK, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return PKI{}, err
}
rootCert := &x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{CommonName: "Dapr Test Root CA"},
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour),
IsCA: true,
KeyUsage: x509.KeyUsageCertSign,
BasicConstraintsValid: true,
}
rootCertBytes, err := x509.CreateCertificate(rand.Reader, rootCert, rootCert, &rootPK.PublicKey, rootPK)
if err != nil {
return PKI{}, err
}
rootCert, err = x509.ParseCertificate(rootCertBytes)
if err != nil {
return PKI{}, err
}
rootCertPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: rootCertBytes})
leafCertPEM, leafPKPEM, leafCert, leafPK, err := genLeafCert(rootPK, rootCert, opts.LeafID, opts.LeafDNS)
if err != nil {
return PKI{}, err
}
clientCertPEM, clientPKPEM, clientCert, clientPK, err := genLeafCert(rootPK, rootCert, opts.ClientID, opts.ClientDNS)
if err != nil {
return PKI{}, err
}
return PKI{
RootCert: rootCert,
RootCertPEM: rootCertPEM,
LeafCertPEM: leafCertPEM,
LeafPKPEM: leafPKPEM,
LeafCert: leafCert,
LeafPK: leafPK,
ClientCertPEM: clientCertPEM,
ClientPKPEM: clientPKPEM,
ClientCert: clientCert,
ClientPK: clientPK,
leafID: opts.LeafID,
clientID: opts.ClientID,
}, nil
}
func (p PKI) ClientGRPCCtx(t *testing.T) context.Context {
t.Helper()
bundle := x509bundle.New(spiffeid.RequireTrustDomainFromString("example.org"))
bundle.AddX509Authority(p.RootCert)
serverSVID := &mockSVID{
bundle: bundle,
svid: &x509svid.SVID{
ID: p.leafID,
Certificates: []*x509.Certificate{p.LeafCert},
PrivateKey: p.LeafPK,
},
}
clientSVID := &mockSVID{
bundle: bundle,
svid: &x509svid.SVID{
ID: p.clientID,
Certificates: []*x509.Certificate{p.ClientCert},
PrivateKey: p.ClientPK,
},
}
server := grpc.NewServer(grpc.Creds(grpccredentials.MTLSServerCredentials(serverSVID, serverSVID, tlsconfig.AuthorizeAny())))
gs := new(greeterServer)
helloworld.RegisterGreeterServer(server, gs)
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
go func() {
server.Serve(lis)
}()
conn, err := grpc.DialContext(context.Background(), lis.Addr().String(),
grpc.WithTransportCredentials(grpccredentials.MTLSClientCredentials(clientSVID, clientSVID, tlsconfig.AuthorizeAny())),
)
require.NoError(t, err)
_, err = helloworld.NewGreeterClient(conn).SayHello(context.Background(), new(helloworld.HelloRequest))
require.NoError(t, err)
lis.Close()
server.Stop()
return gs.ctx
}
func genLeafCert(rootPK *ecdsa.PrivateKey, rootCert *x509.Certificate, id spiffeid.ID, dns string) ([]byte, []byte, *x509.Certificate, crypto.Signer, error) {
pk, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return nil, nil, nil, nil, err
}
pkBytes, err := x509.MarshalPKCS8PrivateKey(pk)
if err != nil {
return nil, nil, nil, nil, err
}
cert := &x509.Certificate{
SerialNumber: big.NewInt(1),
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour),
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment,
ExtKeyUsage: []x509.ExtKeyUsage{
x509.ExtKeyUsageServerAuth,
x509.ExtKeyUsageClientAuth,
},
}
if len(dns) > 0 {
cert.DNSNames = []string{dns}
}
if !id.IsZero() {
cert.URIs = []*url.URL{id.URL()}
}
certBytes, err := x509.CreateCertificate(rand.Reader, cert, rootCert, &pk.PublicKey, rootPK)
if err != nil {
return nil, nil, nil, nil, err
}
cert, err = x509.ParseCertificate(certBytes)
if err != nil {
return nil, nil, nil, nil, err
}
pkPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkBytes})
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certBytes})
return certPEM, pkPEM, cert, pk, nil
}
type mockSVID struct {
svid *x509svid.SVID
bundle *x509bundle.Bundle
}
func (m *mockSVID) GetX509BundleForTrustDomain(_ spiffeid.TrustDomain) (*x509bundle.Bundle, error) {
return m.bundle, nil
}
func (m *mockSVID) GetX509SVID() (*x509svid.SVID, error) {
return m.svid, nil
}
type greeterServer struct {
helloworld.UnimplementedGreeterServer
ctx context.Context
}
func (s *greeterServer) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
p, _ := peer.FromContext(ctx)
s.ctx = peer.NewContext(context.Background(), p)
return new(helloworld.HelloReply), nil
}