feat: manager implements issue certificate grpc (#1577)

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2022-08-19 14:35:18 +08:00 committed by Gaius
parent 9e30632a39
commit 539c772948
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
7 changed files with 349 additions and 8 deletions

2
go.mod
View File

@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.18 go 1.18
require ( require (
d7y.io/api v1.0.9 d7y.io/api v1.1.0
github.com/RichardKnop/machinery v1.10.6 github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0 github.com/Showmax/go-fqdn v1.0.0
github.com/VividCortex/mysqlerr v1.0.0 github.com/VividCortex/mysqlerr v1.0.0

2
go.sum
View File

@ -71,6 +71,8 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f
cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y= cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y=
d7y.io/api v1.0.9 h1:KrKk1ryT771msyBxRXEX9B7abzh6h3yHyVPuSeu3Ypo= d7y.io/api v1.0.9 h1:KrKk1ryT771msyBxRXEX9B7abzh6h3yHyVPuSeu3Ypo=
d7y.io/api v1.0.9/go.mod h1:GFnWPZFe4DUW70aOQikRZF0pvXpbUwAsGSCAZFFitPo= d7y.io/api v1.0.9/go.mod h1:GFnWPZFe4DUW70aOQikRZF0pvXpbUwAsGSCAZFFitPo=
d7y.io/api v1.1.0 h1:+PkkqokEe+82Fjimgcrq0aVp6irGOGyYRXd9ZZ12aKk=
d7y.io/api v1.1.0/go.mod h1:GFnWPZFe4DUW70aOQikRZF0pvXpbUwAsGSCAZFFitPo=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0/go.mod h1:h6H6c8enJmmocHUbLiiGY6sx7f9i+X3m1CHdd5c6Rdw= github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0/go.mod h1:h6H6c8enJmmocHUbLiiGY6sx7f9i+X3m1CHdd5c6Rdw=

View File

@ -154,7 +154,11 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
} }
// Initialize GRPC server // Initialize GRPC server
grpcServer := rpcserver.New(cfg, db, cache, searcher, objectStorage, cfg.ObjectStorage) grpcServer, err := rpcserver.New(cfg, db, cache, searcher, objectStorage, cfg.ObjectStorage, "", "")
if err != nil {
return nil, err
}
s.grpcServer = grpcServer s.grpcServer = grpcServer
// Initialize prometheus // Initialize prometheus

116
manager/rpcserver/cert.go Normal file
View File

@ -0,0 +1,116 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rpcserver
import (
"bytes"
"context"
"crypto/rand"
"crypto/x509"
"encoding/pem"
"fmt"
"math/big"
"net"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
securityv1 "d7y.io/api/pkg/apis/security/v1"
)
func (s *Server) IssueCertificate(ctx context.Context, request *securityv1.CertificateRequest) (*securityv1.CertificateResponse, error) {
if s.ca == nil {
return nil, status.Errorf(codes.Unavailable, "ca is missing for this manager instance")
}
var (
ip string
err error
)
p, ok := peer.FromContext(ctx)
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "invalid grpc peer info")
}
if addr, ok := p.Addr.(*net.TCPAddr); ok {
ip = addr.IP.String()
} else {
ip, _, err = net.SplitHostPort(p.Addr.String())
if err != nil {
return nil, err
}
}
// decode csr pem
block, _ := pem.Decode([]byte(request.Csr))
if block == nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid csr format")
}
// parse csr
csr, err := x509.ParseCertificateRequest(block.Bytes)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid csr format: %s", err.Error())
}
// check csr signature
// TODO check csr common name and so on
if err = csr.CheckSignature(); err != nil {
return nil, err
}
serial, err := rand.Int(rand.Reader, (&big.Int{}).Exp(big.NewInt(2), big.NewInt(159), nil))
if err != nil {
return nil, err
}
// check certificate duration
now := time.Now()
duration := time.Duration(request.ValidityDuration) * time.Second
if duration == 0 {
duration = time.Hour
}
template := x509.Certificate{
SerialNumber: serial,
Subject: csr.Subject,
IPAddresses: []net.IP{net.ParseIP(ip)}, // only valid for peer ip
NotBefore: now.Add(-10 * time.Minute).UTC(),
NotAfter: now.Add(duration).UTC(),
BasicConstraintsValid: true,
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageDataEncipherment | x509.KeyUsageKeyEncipherment,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
}
certificate, err := x509.CreateCertificate(rand.Reader, &template, s.ca.Leaf, csr.PublicKey, s.ca.PrivateKey)
if err != nil {
return nil, fmt.Errorf("failed to generate certificate, error: %s", err)
}
// encode into PEM format
var certPEM bytes.Buffer
if err = pem.Encode(&certPEM, &pem.Block{Type: "CERTIFICATE", Bytes: certificate}); err != nil {
return nil, err
}
return &securityv1.CertificateResponse{
CertificateChain: append([]string{certPEM.String()}, s.certChain...),
}, nil
}

View File

@ -0,0 +1,174 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rpcserver
import (
"bytes"
"context"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"net"
"testing"
"time"
"github.com/go-redis/redis/v8"
testifyassert "github.com/stretchr/testify/assert"
testifyrequire "github.com/stretchr/testify/require"
"google.golang.org/grpc/peer"
"gorm.io/gorm"
securityv1 "d7y.io/api/pkg/apis/security/v1"
"d7y.io/dragonfly/v2/manager/database"
)
func TestIssueCertificate(t *testing.T) {
assert := testifyassert.New(t)
require := testifyrequire.New(t)
caCert, caKey := genCA()
testCases := []struct {
name string
peerIP string
}{
{
name: "ipv4",
peerIP: "1.1.1.1",
},
{
name: "ipv6",
peerIP: "1::1",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
template := &x509.CertificateRequest{
Subject: pkix.Name{
Country: []string{"China"},
Organization: []string{"Dragonfly"},
OrganizationalUnit: []string{"Development"},
},
}
pk, err := rsa.GenerateKey(rand.Reader, 4096)
require.Nilf(err, "GenerateKey should be ok")
csr, err := x509.CreateCertificateRequest(rand.Reader, template, pk)
require.Nilf(err, "CreateCertificateRequest should be ok")
var csrPEM bytes.Buffer
err = pem.Encode(&csrPEM, &pem.Block{Type: "CERTIFICATE REQUEST", Bytes: csr})
require.Nilf(err, "pem.Encode should be ok")
server, err := newServer(nil,
&database.Database{
DB: &gorm.DB{},
RDB: &redis.Client{},
},
nil, nil, nil, nil, caCert, caKey)
require.Nilf(err, "newServer should be ok")
ctx := peer.NewContext(
context.Background(),
&peer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP(tc.peerIP),
Port: 65008,
},
})
response, err := server.IssueCertificate(
ctx,
&securityv1.CertificateRequest{
Csr: csrPEM.String(),
ValidityDuration: 0,
})
assert.Nilf(err, "IssueCertificate should be ok")
assert.NotNilf(response, "IssueCertificate should not be nil")
assert.Equal(len(response.CertificateChain), len(server.certChain)+1)
cert := readCert(response.CertificateChain[0])
assert.Equal(len(cert.IPAddresses), 1)
assert.True(cert.IPAddresses[0].Equal(net.ParseIP(tc.peerIP)))
assert.Equal(cert.KeyUsage, x509.KeyUsageDigitalSignature|x509.KeyUsageDataEncipherment|x509.KeyUsageKeyEncipherment)
assert.Equal(cert.ExtKeyUsage, []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth})
})
}
}
func genCA() (cert, key string) {
pk, err := rsa.GenerateKey(rand.Reader, 4096)
if err != nil {
panic(err)
}
ca := &x509.Certificate{
SerialNumber: big.NewInt(2022),
Subject: pkix.Name{
Country: []string{"China"},
Organization: []string{"Dragonfly"},
OrganizationalUnit: []string{"Development"},
Locality: []string{"Hangzhou"},
Province: []string{"Zhejiang"},
},
NotBefore: time.Now().Add(-10 * time.Minute).UTC(),
NotAfter: time.Now().Add(time.Hour).UTC(),
BasicConstraintsValid: true,
IsCA: true,
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
}
caBytes, err := x509.CreateCertificate(rand.Reader, ca, ca, &pk.PublicKey, pk)
if err != nil {
panic(err)
}
var (
caCertPEM bytes.Buffer
caPrivateKeyPEM bytes.Buffer
)
if err = pem.Encode(&caCertPEM, &pem.Block{Type: "CERTIFICATE", Bytes: caBytes}); err != nil {
panic(err)
}
if err = pem.Encode(&caPrivateKeyPEM, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(pk)}); err != nil {
panic(err)
}
key, cert = caPrivateKeyPEM.String(), caCertPEM.String()
return
}
func readCert(certPEM string) *x509.Certificate {
p, _ := pem.Decode([]byte(certPEM))
cert, err := x509.ParseCertificate(p.Bytes)
if err != nil {
panic(err)
}
return cert
}

View File

@ -17,7 +17,11 @@
package rpcserver package rpcserver
import ( import (
"bytes"
"context" "context"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors" "errors"
"io" "io"
"time" "time"
@ -28,8 +32,8 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
emptypb "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
timestamppb "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"gorm.io/gorm" "gorm.io/gorm"
managerv1 "d7y.io/api/pkg/apis/manager/v1" managerv1 "d7y.io/api/pkg/apis/manager/v1"
@ -62,14 +66,29 @@ type Server struct {
objectStorage objectstorage.ObjectStorage objectStorage objectstorage.ObjectStorage
// Object storage configuration. // Object storage configuration.
objectStorageConfig *config.ObjectStorageConfig objectStorageConfig *config.ObjectStorageConfig
// ca certificate to sign certificates
ca *tls.Certificate
// ca certificate chain
certChain []string
} }
// New returns a new manager server from the given options. // New returns a new manager server from the given options.
func New( func New(
cfg *config.Config, database *database.Database, cache *cache.Cache, searcher searcher.Searcher, cfg *config.Config, database *database.Database, cache *cache.Cache, searcher searcher.Searcher,
objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig, opts ...grpc.ServerOption, objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig,
) *grpc.Server { caCertPEM string, caPrivateKeyPEM string, opts ...grpc.ServerOption,
return managerserver.New(&Server{ ) (*grpc.Server, error) {
server, err := newServer(cfg, database, cache, searcher, objectStorage, objectStorageConfig, caCertPEM, caPrivateKeyPEM)
if err != nil {
return nil, err
}
return managerserver.New(server, opts...), nil
}
func newServer(cfg *config.Config, database *database.Database, cache *cache.Cache, searcher searcher.Searcher,
objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig,
caCertPEM string, caPrivateKeyPEM string) (*Server, error) {
s := &Server{
config: cfg, config: cfg,
db: database.DB, db: database.DB,
rdb: database.RDB, rdb: database.RDB,
@ -77,7 +96,31 @@ func New(
searcher: searcher, searcher: searcher,
objectStorage: objectStorage, objectStorage: objectStorage,
objectStorageConfig: objectStorageConfig, objectStorageConfig: objectStorageConfig,
}, opts...) }
if caCertPEM != "" || caPrivateKeyPEM != "" {
ca, err := tls.X509KeyPair([]byte(caCertPEM), []byte(caPrivateKeyPEM))
if err != nil {
return nil, err
}
// load x509 cert
ca.Leaf, err = x509.ParseCertificate(ca.Certificate[0])
if err != nil {
return nil, err
}
s.ca = &ca
// parse all cert
for _, cert := range ca.Certificate {
var certChainPEM bytes.Buffer
if err = pem.Encode(&certChainPEM, &pem.Block{Type: "CERTIFICATE", Bytes: cert}); err != nil {
return nil, err
}
s.certChain = append(s.certChain, certChainPEM.String())
}
}
return s, nil
} }
// Get SeedPeer and SeedPeer cluster configuration. // Get SeedPeer and SeedPeer cluster configuration.

View File

@ -29,6 +29,7 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1" healthpb "google.golang.org/grpc/health/grpc_health_v1"
managerv1 "d7y.io/api/pkg/apis/manager/v1" managerv1 "d7y.io/api/pkg/apis/manager/v1"
securityv1 "d7y.io/api/pkg/apis/security/v1"
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc"
@ -67,6 +68,7 @@ func New(svr managerv1.ManagerServer, opts ...grpc.ServerOption) *grpc.Server {
// Register servers on grpc server. // Register servers on grpc server.
managerv1.RegisterManagerServer(grpcServer, svr) managerv1.RegisterManagerServer(grpcServer, svr)
securityv1.RegisterCertificateServiceServer(grpcServer, svr.(securityv1.CertificateServiceServer))
// Register health on grpc server. // Register health on grpc server.
healthpb.RegisterHealthServer(grpcServer, health.NewServer()) healthpb.RegisterHealthServer(grpcServer, health.NewServer())