Facilitate graceful cleanup

Signed-off-by: Andrew Harding <aharding@vmware.com>
This commit is contained in:
Andrew Harding 2022-09-12 14:38:36 -06:00 committed by Marcos Yacob
parent 118dccc874
commit 65fb743ba1
8 changed files with 243 additions and 21 deletions

View File

@ -148,6 +148,22 @@ Plugin authors can decide if the lack of support for a specific host service is
an error or not. If the plugin returns an error from BrokerHostServices, the
plugin will fail to load.
## Cleanup
Plugins are seperate processes and are terminated when the plugin is unloaded.
However, it may be desirable to perform some graceful cleanup operations.
To facilitate this, if plugin/service implementations implement the io.Closer
interface, then the `Close` method will be invoked before the plugin is
unloaded. No other RPCs will be invoked at any time during or after the `Close`
method is called. Errors returned from `Close` are simply logged and will not
impact any runtime behavior of SPIRE Server.
Implementations of `Close` should avoid long running or blocking behavior.
SPIRE may employ deadlines on the operation and could terminate the plugin
before the cleanup is fully completed if plugin implementations ignore this
advice.
## Unit Testing
The [plugintest](https://pkg.go.dev/github.com/spiffe/spire-plugin-sdk/plugintest)

View File

@ -122,6 +122,84 @@ func (x *InitResponse) GetPluginServiceNames() []string {
return nil
}
// Deinit request parameters
type DeinitRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *DeinitRequest) Reset() {
*x = DeinitRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_spire_service_private_init_v1_init_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *DeinitRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*DeinitRequest) ProtoMessage() {}
func (x *DeinitRequest) ProtoReflect() protoreflect.Message {
mi := &file_spire_service_private_init_v1_init_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use DeinitRequest.ProtoReflect.Descriptor instead.
func (*DeinitRequest) Descriptor() ([]byte, []int) {
return file_spire_service_private_init_v1_init_proto_rawDescGZIP(), []int{2}
}
// Deinit response parameters
type DeinitResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *DeinitResponse) Reset() {
*x = DeinitResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_spire_service_private_init_v1_init_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *DeinitResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*DeinitResponse) ProtoMessage() {}
func (x *DeinitResponse) ProtoReflect() protoreflect.Message {
mi := &file_spire_service_private_init_v1_init_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use DeinitResponse.ProtoReflect.Descriptor instead.
func (*DeinitResponse) Descriptor() ([]byte, []int) {
return file_spire_service_private_init_v1_init_proto_rawDescGZIP(), []int{3}
}
var File_spire_service_private_init_v1_init_proto protoreflect.FileDescriptor
var file_spire_service_private_init_v1_init_proto_rawDesc = []byte{
@ -137,20 +215,28 @@ var file_spire_service_private_init_v1_init_proto_rawDesc = []byte{
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x30, 0x0a, 0x14, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e,
0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01,
0x20, 0x03, 0x28, 0x09, 0x52, 0x12, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x53, 0x65, 0x72, 0x76,
0x69, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x32, 0x67, 0x0a, 0x04, 0x49, 0x6e, 0x69, 0x74,
0x12, 0x5f, 0x0a, 0x04, 0x49, 0x6e, 0x69, 0x74, 0x12, 0x2a, 0x2e, 0x73, 0x70, 0x69, 0x72, 0x65,
0x69, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x22, 0x0f, 0x0a, 0x0d, 0x44, 0x65, 0x69, 0x6e,
0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x10, 0x0a, 0x0e, 0x44, 0x65, 0x69,
0x6e, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xce, 0x01, 0x0a, 0x04,
0x49, 0x6e, 0x69, 0x74, 0x12, 0x5f, 0x0a, 0x04, 0x49, 0x6e, 0x69, 0x74, 0x12, 0x2a, 0x2e, 0x73,
0x70, 0x69, 0x72, 0x65, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x69,
0x76, 0x61, 0x74, 0x65, 0x2e, 0x69, 0x6e, 0x69, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x69,
0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x73, 0x70, 0x69, 0x72, 0x65,
0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65,
0x2e, 0x69, 0x6e, 0x69, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x73, 0x70, 0x69, 0x72, 0x65, 0x2e, 0x73, 0x65, 0x72,
0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x2e, 0x69, 0x6e, 0x69,
0x74, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x42, 0x58, 0x5a, 0x56, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
0x73, 0x70, 0x69, 0x66, 0x66, 0x65, 0x2f, 0x73, 0x70, 0x69, 0x72, 0x65, 0x2d, 0x70, 0x6c, 0x75,
0x67, 0x69, 0x6e, 0x2d, 0x73, 0x64, 0x6b, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c,
0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x70, 0x69, 0x72, 0x65, 0x2f, 0x73, 0x65, 0x72,
0x76, 0x69, 0x63, 0x65, 0x2f, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x2f, 0x69, 0x6e, 0x69,
0x74, 0x2f, 0x76, 0x31, 0x3b, 0x69, 0x6e, 0x69, 0x74, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
0x2e, 0x69, 0x6e, 0x69, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x65, 0x0a, 0x06, 0x44, 0x65, 0x69, 0x6e, 0x69, 0x74, 0x12,
0x2c, 0x2e, 0x73, 0x70, 0x69, 0x72, 0x65, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e,
0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x2e, 0x69, 0x6e, 0x69, 0x74, 0x2e, 0x76, 0x31, 0x2e,
0x44, 0x65, 0x69, 0x6e, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e,
0x73, 0x70, 0x69, 0x72, 0x65, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72,
0x69, 0x76, 0x61, 0x74, 0x65, 0x2e, 0x69, 0x6e, 0x69, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x65,
0x69, 0x6e, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x58, 0x5a, 0x56,
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x70, 0x69, 0x66, 0x66,
0x65, 0x2f, 0x73, 0x70, 0x69, 0x72, 0x65, 0x2d, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2d, 0x73,
0x64, 0x6b, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x2f, 0x73, 0x70, 0x69, 0x72, 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f,
0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x2f, 0x69, 0x6e, 0x69, 0x74, 0x2f, 0x76, 0x31, 0x3b,
0x69, 0x6e, 0x69, 0x74, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -165,16 +251,20 @@ func file_spire_service_private_init_v1_init_proto_rawDescGZIP() []byte {
return file_spire_service_private_init_v1_init_proto_rawDescData
}
var file_spire_service_private_init_v1_init_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_spire_service_private_init_v1_init_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_spire_service_private_init_v1_init_proto_goTypes = []interface{}{
(*InitRequest)(nil), // 0: spire.service.private.init.v1.InitRequest
(*InitResponse)(nil), // 1: spire.service.private.init.v1.InitResponse
(*InitRequest)(nil), // 0: spire.service.private.init.v1.InitRequest
(*InitResponse)(nil), // 1: spire.service.private.init.v1.InitResponse
(*DeinitRequest)(nil), // 2: spire.service.private.init.v1.DeinitRequest
(*DeinitResponse)(nil), // 3: spire.service.private.init.v1.DeinitResponse
}
var file_spire_service_private_init_v1_init_proto_depIdxs = []int32{
0, // 0: spire.service.private.init.v1.Init.Init:input_type -> spire.service.private.init.v1.InitRequest
1, // 1: spire.service.private.init.v1.Init.Init:output_type -> spire.service.private.init.v1.InitResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
2, // 1: spire.service.private.init.v1.Init.Deinit:input_type -> spire.service.private.init.v1.DeinitRequest
1, // 2: spire.service.private.init.v1.Init.Init:output_type -> spire.service.private.init.v1.InitResponse
3, // 3: spire.service.private.init.v1.Init.Deinit:output_type -> spire.service.private.init.v1.DeinitResponse
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
@ -210,6 +300,30 @@ func file_spire_service_private_init_v1_init_proto_init() {
return nil
}
}
file_spire_service_private_init_v1_init_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DeinitRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_spire_service_private_init_v1_init_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DeinitResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
@ -217,7 +331,7 @@ func file_spire_service_private_init_v1_init_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_spire_service_private_init_v1_init_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumMessages: 4,
NumExtensions: 0,
NumServices: 1,
},

View File

@ -8,8 +8,10 @@ option go_package = "github.com/spiffe/spire-plugin-sdk/internal/proto/spire/ser
// plugin client connects, since the client is responsible for hosting the
// broker that is used to provide host services. If we initialize before that,
// there would be no broker available to connect to host services with.
// The service is also used for graceful cleanup when the plugin is unloaded.
service Init {
rpc Init(InitRequest) returns (InitResponse);
rpc Deinit(DeinitRequest) returns (DeinitResponse);
}
// Init request parameters
@ -27,3 +29,9 @@ message InitResponse {
// spire.plugin.server.keymanager.v1.Keymanager).
repeated string plugin_service_names = 1;
}
// Deinit request parameters
message DeinitRequest {}
// Deinit response parameters
message DeinitResponse {}

View File

@ -19,6 +19,7 @@ const _ = grpc.SupportPackageIsVersion7
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type InitClient interface {
Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*InitResponse, error)
Deinit(ctx context.Context, in *DeinitRequest, opts ...grpc.CallOption) (*DeinitResponse, error)
}
type initClient struct {
@ -38,11 +39,21 @@ func (c *initClient) Init(ctx context.Context, in *InitRequest, opts ...grpc.Cal
return out, nil
}
func (c *initClient) Deinit(ctx context.Context, in *DeinitRequest, opts ...grpc.CallOption) (*DeinitResponse, error) {
out := new(DeinitResponse)
err := c.cc.Invoke(ctx, "/spire.service.private.init.v1.Init/Deinit", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// InitServer is the server API for Init service.
// All implementations must embed UnimplementedInitServer
// for forward compatibility
type InitServer interface {
Init(context.Context, *InitRequest) (*InitResponse, error)
Deinit(context.Context, *DeinitRequest) (*DeinitResponse, error)
mustEmbedUnimplementedInitServer()
}
@ -53,6 +64,9 @@ type UnimplementedInitServer struct {
func (UnimplementedInitServer) Init(context.Context, *InitRequest) (*InitResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Init not implemented")
}
func (UnimplementedInitServer) Deinit(context.Context, *DeinitRequest) (*DeinitResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Deinit not implemented")
}
func (UnimplementedInitServer) mustEmbedUnimplementedInitServer() {}
// UnsafeInitServer may be embedded to opt out of forward compatibility for this service.
@ -84,6 +98,24 @@ func _Init_Init_Handler(srv interface{}, ctx context.Context, dec func(interface
return interceptor(ctx, in, info, handler)
}
func _Init_Deinit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeinitRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(InitServer).Deinit(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/spire.service.private.init.v1.Init/Deinit",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(InitServer).Deinit(ctx, req.(*DeinitRequest))
}
return interceptor(ctx, in, info, handler)
}
// Init_ServiceDesc is the grpc.ServiceDesc for Init service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -95,6 +127,10 @@ var Init_ServiceDesc = grpc.ServiceDesc{
MethodName: "Init",
Handler: _Init_Init_Handler,
},
{
MethodName: "Deinit",
Handler: _Init_Deinit_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "spire/service/private/init/v1/init.proto",

View File

@ -131,6 +131,12 @@ func ServeInBackground(t *testing.T, config Config) {
t.Fatalf("failed to initialize plugin: %v", err)
}
t.Cleanup(func() {
if err := private.Deinit(ctx, conn); err != nil {
t.Fatalf("failed to deinitialize plugin: %v", err)
}
})
assertInitClient(t, conn, config.PluginClient, grpcServiceNames)
for _, serviceClient := range config.ServiceClients {
assertInitClient(t, conn, serviceClient, grpcServiceNames)

View File

@ -63,7 +63,7 @@ func TestServe(t *testing.T) {
assertStringEqual(t, "hostService-in,hostService-out", hostServiceResp.Out)
})
assertStringEqual(t, "[INFO] PLUGIN: in=plugin-in\n[INFO] SERVICE: in=service-in\n", log.String())
assertStringEqual(t, "[INFO] PLUGIN: in=plugin-in\n[INFO] SERVICE: in=service-in\n[INFO] CLOSED\n", log.String())
}
func assertStringEqual(t *testing.T, expected, actual string) {
@ -105,6 +105,11 @@ func (p *TestPlugin) ServiceEcho(_ context.Context, req *test.EchoRequest) (*tes
return &test.EchoResponse{Out: req.In + ",service-out"}, nil
}
func (p *TestPlugin) Close() error {
p.log.Info("CLOSED")
return nil
}
type someHostService struct {
test.UnimplementedSomeHostServiceServer
}

View File

@ -5,6 +5,8 @@ import (
initv1 "github.com/spiffe/spire-plugin-sdk/internal/proto/spire/service/private/init/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Init initializes the plugin and advertises the given host service names to
@ -21,3 +23,17 @@ func Init(ctx context.Context, conn grpc.ClientConnInterface, hostServiceNames [
}
return resp.PluginServiceNames, nil
}
// Deinit deinitializes the plugin. It should only be called right before the
// host unloads the plugin and will not be invoking any other plugin or service
// RPCs.
func Deinit(ctx context.Context, conn grpc.ClientConnInterface) error {
client := initv1.NewInitClient(conn)
_, err := client.Deinit(ctx, &initv1.DeinitRequest{})
switch status.Code(err) {
case codes.OK, codes.Unimplemented:
return nil
default:
return err
}
}

View File

@ -2,6 +2,7 @@ package private
import (
"context"
"io"
"github.com/hashicorp/go-hclog"
initv1 "github.com/spiffe/spire-plugin-sdk/internal/proto/spire/service/private/init/v1"
@ -77,6 +78,26 @@ func (s *initService) Init(ctx context.Context, req *initv1.InitRequest) (*initv
}, nil
}
func (s *initService) Deinit(ctx context.Context, req *initv1.DeinitRequest) (*initv1.DeinitResponse, error) {
deinitted := map[interface{}]struct{}{}
for _, impl := range s.impls {
// Deinitialize the implementation. Since the same
// implementation might back more than one server, only deinitialize
// once.
if _, ok := deinitted[impl]; ok {
continue
}
deinitted[impl] = struct{}{}
if impl, ok := impl.(io.Closer); ok {
if err := impl.Close(); err != nil {
s.logger.Error("Plugin implementation failed to deinitialize", "error", err)
}
}
}
return &initv1.DeinitResponse{}, nil
}
type serviceBroker struct {
conn grpc.ClientConnInterface
hostServiceNames []string