Even simpler - cancel the GRPC call using the context object passed

to the GRPC clients - thanks @endophage!

Signed-off-by: Ying Li <ying.li@docker.com>
This commit is contained in:
Ying Li 2015-10-16 09:46:08 -07:00
parent faff328d62
commit 81380e0862
3 changed files with 60 additions and 64 deletions

View File

@ -113,21 +113,20 @@ func main() {
viper.GetString("trust_service.port"), viper.GetString("trust_service.port"),
viper.GetString("trust_service.tls_ca_file"), viper.GetString("trust_service.tls_ca_file"),
) )
minute := 1 * time.Minute
health.RegisterPeriodicFunc( health.RegisterPeriodicFunc(
"Trust operational", "Trust operational",
// If the trust service fails, the server is degraded but not // If the trust service fails, the server is degraded but not
// exactly unheatlthy, so always return healthy and just log an // exactly unheatlthy, so always return healthy and just log an
// error. // error.
func() error { func() error {
logrus.Info("Getting health") err := trust.(*signer.NotarySigner).CheckHealth(minute)
err := trust.(*signer.NotarySigner).CheckHealth(5)
logrus.Info("Got health")
if err != nil { if err != nil {
logrus.Error("Trust not fully operational: ", err.Error()) logrus.Error("Trust not fully operational: ", err.Error())
} }
return nil return nil
}, },
time.Second*5) minute)
} else { } else {
logrus.Info("Using local signing service") logrus.Info("Using local signing service")
trust = signed.NewEd25519() trust = signed.NewEd25519()

View File

@ -11,6 +11,7 @@ import (
"github.com/endophage/gotuf/data" "github.com/endophage/gotuf/data"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
) )
@ -99,9 +100,9 @@ func (trust *NotarySigner) GetKey(keyid string) data.PublicKey {
// CheckHealth returns true if trust is healthy - that is if it can connect // CheckHealth returns true if trust is healthy - that is if it can connect
// to the trust server, and both the key management and signer services are // to the trust server, and both the key management and signer services are
// healthy // healthy
func (trust *NotarySigner) CheckHealth(timeout int) error { func (trust *NotarySigner) CheckHealth(timeout time.Duration) error {
if e := trust.checkServiceHealth( if e := trust.checkServiceHealth(
"Key manager", trust.kmClient.CheckHealth, timeout); e != nil { "Key Manager", trust.kmClient.CheckHealth, timeout); e != nil {
return e return e
} }
return trust.checkServiceHealth( return trust.checkServiceHealth(
@ -114,31 +115,22 @@ type rpcHealthCheck func(
// Generalized function that can check the health of both the signer client // Generalized function that can check the health of both the signer client
// and the key management client. // and the key management client.
func (trust *NotarySigner) checkServiceHealth( func (trust *NotarySigner) checkServiceHealth(
serviceName string, check rpcHealthCheck, timeout int) error { serviceName string, check rpcHealthCheck, timeout time.Duration) error {
// Do not bother starting goroutine if the connection is broken. // Do not bother starting checking at all if the connection is broken.
if trust.clientConn.State() != grpc.Idle && if trust.clientConn.State() != grpc.Idle &&
trust.clientConn.State() != grpc.Ready { trust.clientConn.State() != grpc.Ready {
return errors.New("Not currently connected to trust server.") return errors.New("Not currently connected to trust server.")
} }
// We still want to time out getting health, because the connection could ctx, cancel := context.WithTimeout(context.Background(), timeout)
// have disconnected sometime between when we checked the connection and status, err := check(ctx, &pb.Void{})
// when we try to make an RPC call. defer cancel()
channel := make(chan error, 1) if err == nil && len(status.Status) > 0 {
go func() { return fmt.Errorf("%s not healthy", serviceName)
status, err := check(context.Background(), &pb.Void{}) } else if err != nil && grpc.Code(err) == codes.DeadlineExceeded {
if len(status.Status) > 0 { return fmt.Errorf("Timed out reaching %s after %s.", serviceName,
err = fmt.Errorf("%s not healthy", serviceName) timeout)
}
channel <- err
}()
var returnErr error
select {
case err := <-channel:
returnErr = err
case <-time.After(time.Second * time.Duration(timeout)):
returnErr = fmt.Errorf("Timed out connecting to %s after %s", serviceName, timeout)
} }
return returnErr return err
} }

View File

@ -2,10 +2,12 @@ package signer
import ( import (
"errors" "errors"
"strings"
"testing" "testing"
"time" "time"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
pb "github.com/docker/notary/proto" pb "github.com/docker/notary/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -14,30 +16,22 @@ import (
type StubKeyManagementClient struct { type StubKeyManagementClient struct {
pb.KeyManagementClient pb.KeyManagementClient
healthCheck func() (map[string]string, error) healthCheck rpcHealthCheck
} }
func (c StubKeyManagementClient) CheckHealth(x context.Context, func (c StubKeyManagementClient) CheckHealth(x context.Context,
v *pb.Void, o ...grpc.CallOption) (*pb.HealthStatus, error) { v *pb.Void, o ...grpc.CallOption) (*pb.HealthStatus, error) {
status, err := c.healthCheck() return c.healthCheck(x, v, o...)
if err != nil {
return nil, err
}
return &pb.HealthStatus{Status: status}, nil
} }
type StubSignerClient struct { type StubSignerClient struct {
pb.SignerClient pb.SignerClient
healthCheck func() (map[string]string, error) healthCheck rpcHealthCheck
} }
func (c StubSignerClient) CheckHealth(x context.Context, v *pb.Void, func (c StubSignerClient) CheckHealth(x context.Context, v *pb.Void,
o ...grpc.CallOption) (*pb.HealthStatus, error) { o ...grpc.CallOption) (*pb.HealthStatus, error) {
status, err := c.healthCheck() return c.healthCheck(x, v, o...)
if err != nil {
return nil, err
}
return &pb.HealthStatus{Status: status}, nil
} }
type StubGRPCConnection struct { type StubGRPCConnection struct {
@ -48,26 +42,33 @@ func (c StubGRPCConnection) State() grpc.ConnectivityState {
return c.fakeConnStatus return c.fakeConnStatus
} }
type healthSideEffect func() (map[string]string, error) func stubHealthFunction(t *testing.T, status map[string]string, err error) rpcHealthCheck {
return func(ctx context.Context, v *pb.Void, o ...grpc.CallOption) (*pb.HealthStatus, error) {
_, withDeadline := ctx.Deadline()
assert.True(t, withDeadline)
func healthOk() (map[string]string, error) { return &pb.HealthStatus{status}, err
return make(map[string]string), nil }
} }
func healthBad() (map[string]string, error) { func healthOk(t *testing.T) rpcHealthCheck {
return map[string]string{"health": "not good"}, nil return stubHealthFunction(t, make(map[string]string), nil)
} }
func healthError() (map[string]string, error) { func healthBad(t *testing.T) rpcHealthCheck {
return nil, errors.New("Something's wrong") return stubHealthFunction(t, map[string]string{"health": "not good"}, nil)
} }
func healthTimeout() (map[string]string, error) { func healthError(t *testing.T) rpcHealthCheck {
time.Sleep(time.Second * 10) return stubHealthFunction(t, nil, errors.New("Something's wrong"))
return healthOk()
} }
func makeSigner(kmFunc healthSideEffect, sFunc healthSideEffect, conn StubGRPCConnection) NotarySigner { func healthTimeout(t *testing.T) rpcHealthCheck {
return stubHealthFunction(
t, nil, grpc.Errorf(codes.DeadlineExceeded, ""))
}
func makeSigner(kmFunc rpcHealthCheck, sFunc rpcHealthCheck, conn StubGRPCConnection) NotarySigner {
return NotarySigner{ return NotarySigner{
StubKeyManagementClient{ StubKeyManagementClient{
pb.NewKeyManagementClient(nil), pb.NewKeyManagementClient(nil),
@ -83,49 +84,53 @@ func makeSigner(kmFunc healthSideEffect, sFunc healthSideEffect, conn StubGRPCCo
// CheckHealth does not succeed if the KM server is unhealthy // CheckHealth does not succeed if the KM server is unhealthy
func TestHealthCheckKMUnhealthy(t *testing.T) { func TestHealthCheckKMUnhealthy(t *testing.T) {
signer := makeSigner(healthBad, healthOk, StubGRPCConnection{}) signer := makeSigner(healthBad(t), healthOk(t), StubGRPCConnection{})
assert.Error(t, signer.CheckHealth(1)) assert.Error(t, signer.CheckHealth(1*time.Second))
} }
// CheckHealth does not succeed if the health check to the KM server errors // CheckHealth does not succeed if the health check to the KM server errors
func TestHealthCheckKMError(t *testing.T) { func TestHealthCheckKMError(t *testing.T) {
signer := makeSigner(healthBad, healthOk, StubGRPCConnection{}) signer := makeSigner(healthBad(t), healthOk(t), StubGRPCConnection{})
assert.Error(t, signer.CheckHealth(1)) assert.Error(t, signer.CheckHealth(1*time.Second))
} }
// CheckHealth does not succeed if the health check to the KM server times out // CheckHealth does not succeed if the health check to the KM server times out
func TestHealthCheckKMTimeout(t *testing.T) { func TestHealthCheckKMTimeout(t *testing.T) {
signer := makeSigner(healthTimeout, healthOk, StubGRPCConnection{}) signer := makeSigner(healthTimeout(t), healthOk(t), StubGRPCConnection{})
assert.Error(t, signer.CheckHealth(1)) err := signer.CheckHealth(1 * time.Second)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "Timed out"))
} }
// CheckHealth does not succeed if the signer is unhealthy // CheckHealth does not succeed if the signer is unhealthy
func TestHealthCheckSignerUnhealthy(t *testing.T) { func TestHealthCheckSignerUnhealthy(t *testing.T) {
signer := makeSigner(healthOk, healthBad, StubGRPCConnection{}) signer := makeSigner(healthOk(t), healthBad(t), StubGRPCConnection{})
assert.Error(t, signer.CheckHealth(1)) assert.Error(t, signer.CheckHealth(1*time.Second))
} }
// CheckHealth does not succeed if the health check to the signer errors // CheckHealth does not succeed if the health check to the signer errors
func TestHealthCheckSignerError(t *testing.T) { func TestHealthCheckSignerError(t *testing.T) {
signer := makeSigner(healthOk, healthBad, StubGRPCConnection{}) signer := makeSigner(healthOk(t), healthBad(t), StubGRPCConnection{})
assert.Error(t, signer.CheckHealth(1)) assert.Error(t, signer.CheckHealth(1))
} }
// CheckHealth does not succeed if the health check to the signer times out // CheckHealth does not succeed if the health check to the signer times out
func TestHealthCheckSignerTimeout(t *testing.T) { func TestHealthCheckSignerTimeout(t *testing.T) {
signer := makeSigner(healthOk, healthTimeout, StubGRPCConnection{}) signer := makeSigner(healthOk(t), healthTimeout(t), StubGRPCConnection{})
assert.Error(t, signer.CheckHealth(1)) err := signer.CheckHealth(1 * time.Second)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "Timed out"))
} }
// CheckHealth succeeds if both services are healthy and reachable // CheckHealth succeeds if both services are healthy and reachable
func TestHealthCheckBothHealthy(t *testing.T) { func TestHealthCheckBothHealthy(t *testing.T) {
signer := makeSigner(healthOk, healthOk, StubGRPCConnection{}) signer := makeSigner(healthOk(t), healthOk(t), StubGRPCConnection{})
assert.NoError(t, signer.CheckHealth(1)) assert.NoError(t, signer.CheckHealth(1*time.Second))
} }
// CheckHealth fails immediately if not connected to the server. // CheckHealth fails immediately if not connected to the server.
func TestHealthCheckConnectionDied(t *testing.T) { func TestHealthCheckConnectionDied(t *testing.T) {
signer := makeSigner(healthTimeout, healthTimeout, signer := makeSigner(healthOk(t), healthOk(t),
StubGRPCConnection{grpc.Connecting}) StubGRPCConnection{grpc.Connecting})
assert.Error(t, signer.CheckHealth(30)) assert.Error(t, signer.CheckHealth(1*time.Second))
} }