function-sdk-go/sdk.go

218 lines
6.8 KiB
Go

/*
Copyright 2023 The Crossplane Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package function is an SDK for building Composition Functions.
package function
import (
"context"
"crypto/tls"
"crypto/x509"
"net"
"os"
"path/filepath"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
ginsecure "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/proto"
"github.com/crossplane/function-sdk-go/logging"
v1 "github.com/crossplane/function-sdk-go/proto/v1"
"github.com/crossplane/function-sdk-go/proto/v1beta1"
)
// Default ServeOptions.
const (
DefaultNetwork = "tcp"
DefaultAddress = ":9443"
DefaultMaxRecvMsgSize = 1024 * 1024 * 4
)
// ServeOptions configure how a Function is served.
type ServeOptions struct {
Network string
Address string
MaxRecvMsgSize int
Credentials credentials.TransportCredentials
}
// A ServeOption configures how a Function is served.
type ServeOption func(o *ServeOptions) error
// Listen configures the network, address and maximum message size on which the Function will
// listen for RunFunctionRequests.
func Listen(network, address string) ServeOption {
return func(o *ServeOptions) error {
o.Network = network
o.Address = address
return nil
}
}
// MTLSCertificates specifies a directory from which to load mTLS certificates.
// The directory must contain the server certificate (tls.key and tls.crt), as
// well as a CA certificate (ca.crt) that will be used to authenticate clients.
func MTLSCertificates(dir string) ServeOption {
return func(o *ServeOptions) error {
if dir == "" {
// We want to support passing both MTLSCertificates and
// Insecure as they were supplied as flags. So we don't
// want this to fail because no dir was supplied.
// If no TLS dir is supplied and insecure is false we'll
// return an error due to having no credentials specified.
return nil
}
crt, err := tls.LoadX509KeyPair(
filepath.Clean(filepath.Join(dir, "tls.crt")),
filepath.Clean(filepath.Join(dir, "tls.key")),
)
if err != nil {
return errors.Wrap(err, "cannot load X509 keypair")
}
ca, err := os.ReadFile(filepath.Clean(filepath.Join(dir, "ca.crt")))
if err != nil {
return errors.Wrap(err, "cannot read CA certificate")
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(ca) {
return errors.New("invalid CA certificate")
}
o.Credentials = credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
Certificates: []tls.Certificate{crt},
ClientCAs: pool,
ClientAuth: tls.RequireAndVerifyClientCert,
})
return nil
}
}
// Insecure specifies whether this Function should be served insecurely - i.e.
// without mTLS authentication. This is only useful for testing and development.
// Crossplane will always send requests using mTLS.
func Insecure(insecure bool) ServeOption {
return func(o *ServeOptions) error {
if insecure {
o.Credentials = ginsecure.NewCredentials()
}
return nil
}
}
// MaxRecvMessageSize returns a ServeOption to set the max message size in bytes the server can receive.
// If this is not set, gRPC uses the default limit.
func MaxRecvMessageSize(sz int) ServeOption {
return func(o *ServeOptions) error {
o.MaxRecvMsgSize = sz
return nil
}
}
// Serve the supplied Function by creating a gRPC server and listening for
// RunFunctionRequests. Blocks until the server returns an error.
func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error {
so := &ServeOptions{
Network: DefaultNetwork,
Address: DefaultAddress,
MaxRecvMsgSize: DefaultMaxRecvMsgSize,
}
for _, fn := range o {
if err := fn(so); err != nil {
return errors.Wrap(err, "cannot apply ServeOption")
}
}
if so.Credentials == nil {
return errors.New("no credentials provided - did you specify the Insecure or MTLSCertificates options?")
}
lis, err := net.Listen(so.Network, so.Address)
if err != nil {
return errors.Wrapf(err, "cannot listen for %s connections at address %q", so.Network, so.Address)
}
srv := grpc.NewServer(grpc.MaxRecvMsgSize(so.MaxRecvMsgSize), grpc.Creds(so.Credentials))
reflection.Register(srv)
v1.RegisterFunctionRunnerServiceServer(srv, fn)
v1beta1.RegisterFunctionRunnerServiceServer(srv, ServeBeta(fn))
return errors.Wrap(srv.Serve(lis), "cannot serve mTLS gRPC connections")
}
// NewLogger returns a new logger.
func NewLogger(debug bool) (logging.Logger, error) {
return logging.NewLogger(debug)
}
// A BetaServer is a v1beta1 FunctionRunnerServiceServer that wraps an identical
// v1 FunctionRunnerServiceServer. This requires the v1 and v1beta1 protos to be
// identical.
//
// Functions were promoted from v1beta1 to v1 in Crossplane v1.17. Crossplane
// v1.16 and earlier only sends v1beta1 RunFunctionRequests. Functions should
// use the BetaServer for backward compatibility, to support Crossplane v1.16
// and earlier.
type BetaServer struct {
wrapped v1.FunctionRunnerServiceServer
v1beta1.UnimplementedFunctionRunnerServiceServer
}
// ServeBeta returns a v1beta1.FunctionRunnerServiceServer that wraps the
// suppled v1.FunctionRunnerServiceServer.
func ServeBeta(s v1.FunctionRunnerServiceServer) *BetaServer {
return &BetaServer{wrapped: s}
}
// RunFunction calls the RunFunction method of the wrapped
// v1.FunctionRunnerServiceServer. It converts from v1beta1 to v1 and back by
// round-tripping through protobuf marshaling.
func (s *BetaServer) RunFunction(ctx context.Context, req *v1beta1.RunFunctionRequest) (*v1beta1.RunFunctionResponse, error) {
gareq := &v1.RunFunctionRequest{}
b, err := proto.Marshal(req)
if err != nil {
return nil, errors.Wrap(err, "cannot marshal v1beta1 RunFunctionRequest to protobuf bytes")
}
if err := proto.Unmarshal(b, gareq); err != nil {
return nil, errors.Wrap(err, "cannot unmarshal v1 RunFunctionRequest from v1beta1 protobuf bytes")
}
garsp, err := s.wrapped.RunFunction(ctx, gareq)
if err != nil {
// This error is intentionally not wrapped. This middleware is just
// calling an underlying RunFunction.
return nil, err
}
b, err = proto.Marshal(garsp)
if err != nil {
return nil, errors.Wrap(err, "cannot marshal v1beta1 RunFunctionResponse to protobuf bytes")
}
rsp := &v1beta1.RunFunctionResponse{}
err = proto.Unmarshal(b, rsp)
return rsp, errors.Wrap(err, "cannot unmarshal v1 RunFunctionResponse from v1beta1 protobuf bytes")
}