From 539c7729487c097eca49bbc6060c76a80ccce9b9 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Fri, 19 Aug 2022 14:35:18 +0800 Subject: [PATCH] feat: manager implements issue certificate grpc (#1577) Signed-off-by: Jim Ma --- go.mod | 2 +- go.sum | 2 + manager/manager.go | 6 +- manager/rpcserver/cert.go | 116 +++++++++++++++++++++ manager/rpcserver/cert_test.go | 174 +++++++++++++++++++++++++++++++ manager/rpcserver/rpcserver.go | 55 ++++++++-- pkg/rpc/manager/server/server.go | 2 + 7 files changed, 349 insertions(+), 8 deletions(-) create mode 100644 manager/rpcserver/cert.go create mode 100644 manager/rpcserver/cert_test.go diff --git a/go.mod b/go.mod index dab58e79e..4d2f60524 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.18 require ( - d7y.io/api v1.0.9 + d7y.io/api v1.1.0 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 github.com/VividCortex/mysqlerr v1.0.0 diff --git a/go.sum b/go.sum index c3bff36ab..ec2dec084 100644 --- a/go.sum +++ b/go.sum @@ -71,6 +71,8 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y= d7y.io/api v1.0.9 h1:KrKk1ryT771msyBxRXEX9B7abzh6h3yHyVPuSeu3Ypo= d7y.io/api v1.0.9/go.mod h1:GFnWPZFe4DUW70aOQikRZF0pvXpbUwAsGSCAZFFitPo= +d7y.io/api v1.1.0 h1:+PkkqokEe+82Fjimgcrq0aVp6irGOGyYRXd9ZZ12aKk= +d7y.io/api v1.1.0/go.mod h1:GFnWPZFe4DUW70aOQikRZF0pvXpbUwAsGSCAZFFitPo= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0/go.mod h1:h6H6c8enJmmocHUbLiiGY6sx7f9i+X3m1CHdd5c6Rdw= diff --git a/manager/manager.go b/manager/manager.go index 328fb004c..1b8d50387 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -154,7 +154,11 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) { } // Initialize GRPC server - grpcServer := rpcserver.New(cfg, db, cache, searcher, objectStorage, cfg.ObjectStorage) + grpcServer, err := rpcserver.New(cfg, db, cache, searcher, objectStorage, cfg.ObjectStorage, "", "") + if err != nil { + return nil, err + } + s.grpcServer = grpcServer // Initialize prometheus diff --git a/manager/rpcserver/cert.go b/manager/rpcserver/cert.go new file mode 100644 index 000000000..554121b19 --- /dev/null +++ b/manager/rpcserver/cert.go @@ -0,0 +1,116 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpcserver + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/x509" + "encoding/pem" + "fmt" + "math/big" + "net" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + + securityv1 "d7y.io/api/pkg/apis/security/v1" +) + +func (s *Server) IssueCertificate(ctx context.Context, request *securityv1.CertificateRequest) (*securityv1.CertificateResponse, error) { + if s.ca == nil { + return nil, status.Errorf(codes.Unavailable, "ca is missing for this manager instance") + } + + var ( + ip string + err error + ) + + p, ok := peer.FromContext(ctx) + if !ok { + return nil, status.Errorf(codes.InvalidArgument, "invalid grpc peer info") + } + + if addr, ok := p.Addr.(*net.TCPAddr); ok { + ip = addr.IP.String() + } else { + ip, _, err = net.SplitHostPort(p.Addr.String()) + if err != nil { + return nil, err + } + } + + // decode csr pem + block, _ := pem.Decode([]byte(request.Csr)) + if block == nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid csr format") + } + + // parse csr + csr, err := x509.ParseCertificateRequest(block.Bytes) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid csr format: %s", err.Error()) + } + + // check csr signature + // TODO check csr common name and so on + if err = csr.CheckSignature(); err != nil { + return nil, err + } + + serial, err := rand.Int(rand.Reader, (&big.Int{}).Exp(big.NewInt(2), big.NewInt(159), nil)) + if err != nil { + return nil, err + } + + // check certificate duration + now := time.Now() + duration := time.Duration(request.ValidityDuration) * time.Second + if duration == 0 { + duration = time.Hour + } + + template := x509.Certificate{ + SerialNumber: serial, + Subject: csr.Subject, + IPAddresses: []net.IP{net.ParseIP(ip)}, // only valid for peer ip + NotBefore: now.Add(-10 * time.Minute).UTC(), + NotAfter: now.Add(duration).UTC(), + BasicConstraintsValid: true, + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageDataEncipherment | x509.KeyUsageKeyEncipherment, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + } + + certificate, err := x509.CreateCertificate(rand.Reader, &template, s.ca.Leaf, csr.PublicKey, s.ca.PrivateKey) + if err != nil { + return nil, fmt.Errorf("failed to generate certificate, error: %s", err) + } + + // encode into PEM format + var certPEM bytes.Buffer + if err = pem.Encode(&certPEM, &pem.Block{Type: "CERTIFICATE", Bytes: certificate}); err != nil { + return nil, err + } + + return &securityv1.CertificateResponse{ + CertificateChain: append([]string{certPEM.String()}, s.certChain...), + }, nil +} diff --git a/manager/rpcserver/cert_test.go b/manager/rpcserver/cert_test.go new file mode 100644 index 000000000..33f8e24be --- /dev/null +++ b/manager/rpcserver/cert_test.go @@ -0,0 +1,174 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpcserver + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + "net" + "testing" + "time" + + "github.com/go-redis/redis/v8" + testifyassert "github.com/stretchr/testify/assert" + testifyrequire "github.com/stretchr/testify/require" + "google.golang.org/grpc/peer" + "gorm.io/gorm" + + securityv1 "d7y.io/api/pkg/apis/security/v1" + + "d7y.io/dragonfly/v2/manager/database" +) + +func TestIssueCertificate(t *testing.T) { + assert := testifyassert.New(t) + require := testifyrequire.New(t) + + caCert, caKey := genCA() + + testCases := []struct { + name string + peerIP string + }{ + { + name: "ipv4", + peerIP: "1.1.1.1", + }, + { + name: "ipv6", + peerIP: "1::1", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + template := &x509.CertificateRequest{ + Subject: pkix.Name{ + Country: []string{"China"}, + Organization: []string{"Dragonfly"}, + OrganizationalUnit: []string{"Development"}, + }, + } + + pk, err := rsa.GenerateKey(rand.Reader, 4096) + require.Nilf(err, "GenerateKey should be ok") + + csr, err := x509.CreateCertificateRequest(rand.Reader, template, pk) + require.Nilf(err, "CreateCertificateRequest should be ok") + + var csrPEM bytes.Buffer + err = pem.Encode(&csrPEM, &pem.Block{Type: "CERTIFICATE REQUEST", Bytes: csr}) + require.Nilf(err, "pem.Encode should be ok") + + server, err := newServer(nil, + &database.Database{ + DB: &gorm.DB{}, + RDB: &redis.Client{}, + }, + nil, nil, nil, nil, caCert, caKey) + + require.Nilf(err, "newServer should be ok") + + ctx := peer.NewContext( + context.Background(), + &peer.Peer{ + Addr: &net.TCPAddr{ + IP: net.ParseIP(tc.peerIP), + Port: 65008, + }, + }) + + response, err := server.IssueCertificate( + ctx, + &securityv1.CertificateRequest{ + Csr: csrPEM.String(), + ValidityDuration: 0, + }) + + assert.Nilf(err, "IssueCertificate should be ok") + assert.NotNilf(response, "IssueCertificate should not be nil") + assert.Equal(len(response.CertificateChain), len(server.certChain)+1) + + cert := readCert(response.CertificateChain[0]) + assert.Equal(len(cert.IPAddresses), 1) + assert.True(cert.IPAddresses[0].Equal(net.ParseIP(tc.peerIP))) + + assert.Equal(cert.KeyUsage, x509.KeyUsageDigitalSignature|x509.KeyUsageDataEncipherment|x509.KeyUsageKeyEncipherment) + assert.Equal(cert.ExtKeyUsage, []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}) + }) + } +} + +func genCA() (cert, key string) { + pk, err := rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + panic(err) + } + + ca := &x509.Certificate{ + SerialNumber: big.NewInt(2022), + Subject: pkix.Name{ + Country: []string{"China"}, + Organization: []string{"Dragonfly"}, + OrganizationalUnit: []string{"Development"}, + Locality: []string{"Hangzhou"}, + Province: []string{"Zhejiang"}, + }, + NotBefore: time.Now().Add(-10 * time.Minute).UTC(), + NotAfter: time.Now().Add(time.Hour).UTC(), + BasicConstraintsValid: true, + IsCA: true, + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + } + + caBytes, err := x509.CreateCertificate(rand.Reader, ca, ca, &pk.PublicKey, pk) + if err != nil { + panic(err) + } + + var ( + caCertPEM bytes.Buffer + caPrivateKeyPEM bytes.Buffer + ) + + if err = pem.Encode(&caCertPEM, &pem.Block{Type: "CERTIFICATE", Bytes: caBytes}); err != nil { + panic(err) + } + + if err = pem.Encode(&caPrivateKeyPEM, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(pk)}); err != nil { + panic(err) + } + + key, cert = caPrivateKeyPEM.String(), caCertPEM.String() + return +} + +func readCert(certPEM string) *x509.Certificate { + p, _ := pem.Decode([]byte(certPEM)) + cert, err := x509.ParseCertificate(p.Bytes) + if err != nil { + panic(err) + } + return cert +} diff --git a/manager/rpcserver/rpcserver.go b/manager/rpcserver/rpcserver.go index 9d8964ca6..e851ee4a5 100644 --- a/manager/rpcserver/rpcserver.go +++ b/manager/rpcserver/rpcserver.go @@ -17,7 +17,11 @@ package rpcserver import ( + "bytes" "context" + "crypto/tls" + "crypto/x509" + "encoding/pem" "errors" "io" "time" @@ -28,8 +32,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - emptypb "google.golang.org/protobuf/types/known/emptypb" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" "gorm.io/gorm" managerv1 "d7y.io/api/pkg/apis/manager/v1" @@ -62,14 +66,29 @@ type Server struct { objectStorage objectstorage.ObjectStorage // Object storage configuration. objectStorageConfig *config.ObjectStorageConfig + // ca certificate to sign certificates + ca *tls.Certificate + // ca certificate chain + certChain []string } // New returns a new manager server from the given options. func New( cfg *config.Config, database *database.Database, cache *cache.Cache, searcher searcher.Searcher, - objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig, opts ...grpc.ServerOption, -) *grpc.Server { - return managerserver.New(&Server{ + objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig, + caCertPEM string, caPrivateKeyPEM string, opts ...grpc.ServerOption, +) (*grpc.Server, error) { + server, err := newServer(cfg, database, cache, searcher, objectStorage, objectStorageConfig, caCertPEM, caPrivateKeyPEM) + if err != nil { + return nil, err + } + return managerserver.New(server, opts...), nil +} + +func newServer(cfg *config.Config, database *database.Database, cache *cache.Cache, searcher searcher.Searcher, + objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig, + caCertPEM string, caPrivateKeyPEM string) (*Server, error) { + s := &Server{ config: cfg, db: database.DB, rdb: database.RDB, @@ -77,7 +96,31 @@ func New( searcher: searcher, objectStorage: objectStorage, objectStorageConfig: objectStorageConfig, - }, opts...) + } + + if caCertPEM != "" || caPrivateKeyPEM != "" { + ca, err := tls.X509KeyPair([]byte(caCertPEM), []byte(caPrivateKeyPEM)) + if err != nil { + return nil, err + } + // load x509 cert + ca.Leaf, err = x509.ParseCertificate(ca.Certificate[0]) + if err != nil { + return nil, err + } + + s.ca = &ca + + // parse all cert + for _, cert := range ca.Certificate { + var certChainPEM bytes.Buffer + if err = pem.Encode(&certChainPEM, &pem.Block{Type: "CERTIFICATE", Bytes: cert}); err != nil { + return nil, err + } + s.certChain = append(s.certChain, certChainPEM.String()) + } + } + return s, nil } // Get SeedPeer and SeedPeer cluster configuration. diff --git a/pkg/rpc/manager/server/server.go b/pkg/rpc/manager/server/server.go index 25990d521..ba27f8bfe 100644 --- a/pkg/rpc/manager/server/server.go +++ b/pkg/rpc/manager/server/server.go @@ -29,6 +29,7 @@ import ( healthpb "google.golang.org/grpc/health/grpc_health_v1" managerv1 "d7y.io/api/pkg/apis/manager/v1" + securityv1 "d7y.io/api/pkg/apis/security/v1" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc" @@ -67,6 +68,7 @@ func New(svr managerv1.ManagerServer, opts ...grpc.ServerOption) *grpc.Server { // Register servers on grpc server. managerv1.RegisterManagerServer(grpcServer, svr) + securityv1.RegisterCertificateServiceServer(grpcServer, svr.(securityv1.CertificateServiceServer)) // Register health on grpc server. healthpb.RegisterHealthServer(grpcServer, health.NewServer())