From 194e421931166244fceac33d6079dc40590af606 Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Wed, 21 Oct 2015 21:54:32 -0700 Subject: [PATCH] Add reconnects in AMQP. --- Godeps/Godeps.json | 4 + .../src/github.com/golang/mock/gomock/call.go | 247 +++++++++++++++++ .../github.com/golang/mock/gomock/callset.go | 76 ++++++ .../golang/mock/gomock/controller.go | 167 ++++++++++++ .../github.com/golang/mock/gomock/matchers.go | 97 +++++++ .../mock/gomock/mock_matcher/mock_matcher.go | 49 ++++ cmd/admin-revoker/main.go | 7 +- cmd/boulder-ca/main.go | 24 +- cmd/boulder-publisher/main.go | 14 +- cmd/boulder-ra/main.go | 34 ++- cmd/boulder-sa/main.go | 4 +- cmd/boulder-va/main.go | 14 +- cmd/boulder-wfe/main.go | 32 +-- cmd/expiration-mailer/main.go | 5 +- cmd/ocsp-updater/main.go | 31 +-- cmd/shell.go | 22 +- rpc/amqp-rpc.go | 256 +++++++++--------- rpc/connection.go | 144 ++++++++++ rpc/connection_test.go | 130 +++++++++ rpc/mock_channel_test.go | 92 +++++++ rpc/rpc-interfaces.go | 1 - rpc/testdata/cert.pem | 18 ++ rpc/testdata/key.pem | 28 ++ test/amqp-integration-test.py | 2 + test/boulder-config.json | 2 +- test/startservers.py | 14 + test/travis-before-install.sh | 3 +- 27 files changed, 1265 insertions(+), 252 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/golang/mock/gomock/call.go create mode 100644 Godeps/_workspace/src/github.com/golang/mock/gomock/callset.go create mode 100644 Godeps/_workspace/src/github.com/golang/mock/gomock/controller.go create mode 100644 Godeps/_workspace/src/github.com/golang/mock/gomock/matchers.go create mode 100644 Godeps/_workspace/src/github.com/golang/mock/gomock/mock_matcher/mock_matcher.go create mode 100644 rpc/connection.go create mode 100644 rpc/connection_test.go create mode 100644 rpc/mock_channel_test.go create mode 100644 rpc/testdata/cert.pem create mode 100644 rpc/testdata/key.pem diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index a095c0a00..ba4b856e2 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -96,6 +96,10 @@ "Comment": "v1.2-88-ga197e5d", "Rev": "a197e5d40516f2e9f74dcee085a5f2d4604e94df" }, + { + "ImportPath": "github.com/golang/mock/gomock", + "Rev": "06883d979f10cc178f2716846215c8cf90f9e363" + }, { "ImportPath": "github.com/jmhodges/clock", "Rev": "3c4ebd218625c9364c33db6d39c276d80c3090c6" diff --git a/Godeps/_workspace/src/github.com/golang/mock/gomock/call.go b/Godeps/_workspace/src/github.com/golang/mock/gomock/call.go new file mode 100644 index 000000000..ea1e09227 --- /dev/null +++ b/Godeps/_workspace/src/github.com/golang/mock/gomock/call.go @@ -0,0 +1,247 @@ +// Copyright 2010 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gomock + +import ( + "fmt" + "reflect" + "strings" +) + +// Call represents an expected call to a mock. +type Call struct { + t TestReporter // for triggering test failures on invalid call setup + + receiver interface{} // the receiver of the method call + method string // the name of the method + args []Matcher // the args + rets []interface{} // the return values (if any) + + preReqs []*Call // prerequisite calls + + // Expectations + minCalls, maxCalls int + + numCalls int // actual number made + + // Actions + doFunc reflect.Value + setArgs map[int]reflect.Value +} + +func (c *Call) AnyTimes() *Call { + c.minCalls, c.maxCalls = 0, 1e8 // close enough to infinity + return c +} + +// Do declares the action to run when the call is matched. +// It takes an interface{} argument to support n-arity functions. +func (c *Call) Do(f interface{}) *Call { + // TODO: Check arity and types here, rather than dying badly elsewhere. + c.doFunc = reflect.ValueOf(f) + return c +} + +func (c *Call) Return(rets ...interface{}) *Call { + mt := c.methodType() + if len(rets) != mt.NumOut() { + c.t.Fatalf("wrong number of arguments to Return for %T.%v: got %d, want %d", + c.receiver, c.method, len(rets), mt.NumOut()) + } + for i, ret := range rets { + if got, want := reflect.TypeOf(ret), mt.Out(i); got == want { + // Identical types; nothing to do. + } else if got == nil { + // Nil needs special handling. + switch want.Kind() { + case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice: + // ok + default: + c.t.Fatalf("argument %d to Return for %T.%v is nil, but %v is not nillable", + i, c.receiver, c.method, want) + } + } else if got.AssignableTo(want) { + // Assignable type relation. Make the assignment now so that the generated code + // can return the values with a type assertion. + v := reflect.New(want).Elem() + v.Set(reflect.ValueOf(ret)) + rets[i] = v.Interface() + } else { + c.t.Fatalf("wrong type of argument %d to Return for %T.%v: %v is not assignable to %v", + i, c.receiver, c.method, got, want) + } + } + + c.rets = rets + return c +} + +func (c *Call) Times(n int) *Call { + c.minCalls, c.maxCalls = n, n + return c +} + +// SetArg declares an action that will set the nth argument's value, +// indirected through a pointer. +func (c *Call) SetArg(n int, value interface{}) *Call { + if c.setArgs == nil { + c.setArgs = make(map[int]reflect.Value) + } + mt := c.methodType() + // TODO: This will break on variadic methods. + // We will need to check those at invocation time. + if n < 0 || n >= mt.NumIn() { + c.t.Fatalf("SetArg(%d, ...) called for a method with %d args", n, mt.NumIn()) + } + // Permit setting argument through an interface. + // In the interface case, we don't (nay, can't) check the type here. + at := mt.In(n) + switch at.Kind() { + case reflect.Ptr: + dt := at.Elem() + if vt := reflect.TypeOf(value); !vt.AssignableTo(dt) { + c.t.Fatalf("SetArg(%d, ...) argument is a %v, not assignable to %v", n, vt, dt) + } + case reflect.Interface: + // nothing to do + default: + c.t.Fatalf("SetArg(%d, ...) referring to argument of non-pointer non-interface type %v", n, at) + } + c.setArgs[n] = reflect.ValueOf(value) + return c +} + +// isPreReq returns true if other is a direct or indirect prerequisite to c. +func (c *Call) isPreReq(other *Call) bool { + for _, preReq := range c.preReqs { + if other == preReq || preReq.isPreReq(other) { + return true + } + } + return false +} + +// After declares that the call may only match after preReq has been exhausted. +func (c *Call) After(preReq *Call) *Call { + if preReq.isPreReq(c) { + msg := fmt.Sprintf( + "Loop in call order: %v is a prerequisite to %v (possibly indirectly).", + c, preReq, + ) + panic(msg) + } + + c.preReqs = append(c.preReqs, preReq) + return c +} + +// Returns true iff the minimum number of calls have been made. +func (c *Call) satisfied() bool { + return c.numCalls >= c.minCalls +} + +// Returns true iff the maximum number of calls have been made. +func (c *Call) exhausted() bool { + return c.numCalls >= c.maxCalls +} + +func (c *Call) String() string { + args := make([]string, len(c.args)) + for i, arg := range c.args { + args[i] = arg.String() + } + arguments := strings.Join(args, ", ") + return fmt.Sprintf("%T.%v(%s)", c.receiver, c.method, arguments) +} + +// Tests if the given call matches the expected call. +func (c *Call) matches(args []interface{}) bool { + if len(args) != len(c.args) { + return false + } + for i, m := range c.args { + if !m.Matches(args[i]) { + return false + } + } + + // Check that all prerequisite calls have been satisfied. + for _, preReqCall := range c.preReqs { + if !preReqCall.satisfied() { + return false + } + } + + return true +} + +// dropPrereqs tells the expected Call to not re-check prerequite calls any +// longer, and to return its current set. +func (c *Call) dropPrereqs() (preReqs []*Call) { + preReqs = c.preReqs + c.preReqs = nil + return +} + +func (c *Call) call(args []interface{}) (rets []interface{}, action func()) { + c.numCalls++ + + // Actions + if c.doFunc.IsValid() { + doArgs := make([]reflect.Value, len(args)) + ft := c.doFunc.Type() + for i := 0; i < ft.NumIn(); i++ { + if args[i] != nil { + doArgs[i] = reflect.ValueOf(args[i]) + } else { + // Use the zero value for the arg. + doArgs[i] = reflect.Zero(ft.In(i)) + } + } + action = func() { c.doFunc.Call(doArgs) } + } + for n, v := range c.setArgs { + reflect.ValueOf(args[n]).Elem().Set(v) + } + + rets = c.rets + if rets == nil { + // Synthesize the zero value for each of the return args' types. + mt := c.methodType() + rets = make([]interface{}, mt.NumOut()) + for i := 0; i < mt.NumOut(); i++ { + rets[i] = reflect.Zero(mt.Out(i)).Interface() + } + } + + return +} + +func (c *Call) methodType() reflect.Type { + recv := reflect.ValueOf(c.receiver) + for i := 0; i < recv.Type().NumMethod(); i++ { + if recv.Type().Method(i).Name == c.method { + return recv.Method(i).Type() + } + } + panic(fmt.Sprintf("gomock: failed finding method %s on %T", c.method, c.receiver)) +} + +// InOrder declares that the given calls should occur in order. +func InOrder(calls ...*Call) { + for i := 1; i < len(calls); i++ { + calls[i].After(calls[i-1]) + } +} diff --git a/Godeps/_workspace/src/github.com/golang/mock/gomock/callset.go b/Godeps/_workspace/src/github.com/golang/mock/gomock/callset.go new file mode 100644 index 000000000..1b7de4c0b --- /dev/null +++ b/Godeps/_workspace/src/github.com/golang/mock/gomock/callset.go @@ -0,0 +1,76 @@ +// Copyright 2011 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gomock + +// callSet represents a set of expected calls, indexed by receiver and method +// name. +type callSet map[interface{}]map[string][]*Call + +// Add adds a new expected call. +func (cs callSet) Add(call *Call) { + methodMap, ok := cs[call.receiver] + if !ok { + methodMap = make(map[string][]*Call) + cs[call.receiver] = methodMap + } + methodMap[call.method] = append(methodMap[call.method], call) +} + +// Remove removes an expected call. +func (cs callSet) Remove(call *Call) { + methodMap, ok := cs[call.receiver] + if !ok { + return + } + sl := methodMap[call.method] + for i, c := range sl { + if c == call { + // quick removal; we don't need to maintain call order + if len(sl) > 1 { + sl[i] = sl[len(sl)-1] + } + methodMap[call.method] = sl[:len(sl)-1] + break + } + } +} + +// FindMatch searches for a matching call. Returns nil if no call matched. +func (cs callSet) FindMatch(receiver interface{}, method string, args []interface{}) *Call { + methodMap, ok := cs[receiver] + if !ok { + return nil + } + calls, ok := methodMap[method] + if !ok { + return nil + } + + // Search through the unordered set of calls expected on a method on a + // receiver. + for _, call := range calls { + // A call should not normally still be here if exhausted, + // but it can happen if, for instance, .Times(0) was used. + // Pretend the call doesn't match. + if call.exhausted() { + continue + } + if call.matches(args) { + return call + } + } + + return nil +} diff --git a/Godeps/_workspace/src/github.com/golang/mock/gomock/controller.go b/Godeps/_workspace/src/github.com/golang/mock/gomock/controller.go new file mode 100644 index 000000000..6ca952803 --- /dev/null +++ b/Godeps/_workspace/src/github.com/golang/mock/gomock/controller.go @@ -0,0 +1,167 @@ +// Copyright 2010 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// GoMock - a mock framework for Go. +// +// Standard usage: +// (1) Define an interface that you wish to mock. +// type MyInterface interface { +// SomeMethod(x int64, y string) +// } +// (2) Use mockgen to generate a mock from the interface. +// (3) Use the mock in a test: +// func TestMyThing(t *testing.T) { +// mockCtrl := gomock.NewController(t) +// defer mockCtrl.Finish() +// +// mockObj := something.NewMockMyInterface(mockCtrl) +// mockObj.EXPECT().SomeMethod(4, "blah") +// // pass mockObj to a real object and play with it. +// } +// +// By default, expected calls are not enforced to run in any particular order. +// Call order dependency can be enforced by use of InOrder and/or Call.After. +// Call.After can create more varied call order dependencies, but InOrder is +// often more convenient. +// +// The following examples create equivalent call order dependencies. +// +// Example of using Call.After to chain expected call order: +// +// firstCall := mockObj.EXPECT().SomeMethod(1, "first") +// secondCall := mockObj.EXPECT().SomeMethod(2, "second").After(firstCall) +// mockObj.EXPECT().SomeMethod(3, "third").After(secondCall) +// +// Example of using InOrder to declare expected call order: +// +// gomock.InOrder( +// mockObj.EXPECT().SomeMethod(1, "first"), +// mockObj.EXPECT().SomeMethod(2, "second"), +// mockObj.EXPECT().SomeMethod(3, "third"), +// ) +// +// TODO: +// - Handle different argument/return types (e.g. ..., chan, map, interface). +package gomock + +import "sync" + +// A TestReporter is something that can be used to report test failures. +// It is satisfied by the standard library's *testing.T. +type TestReporter interface { + Errorf(format string, args ...interface{}) + Fatalf(format string, args ...interface{}) +} + +// A Controller represents the top-level control of a mock ecosystem. +// It defines the scope and lifetime of mock objects, as well as their expectations. +// It is safe to call Controller's methods from multiple goroutines. +type Controller struct { + mu sync.Mutex + t TestReporter + expectedCalls callSet +} + +func NewController(t TestReporter) *Controller { + return &Controller{ + t: t, + expectedCalls: make(callSet), + } +} + +func (ctrl *Controller) RecordCall(receiver interface{}, method string, args ...interface{}) *Call { + // TODO: check arity, types. + margs := make([]Matcher, len(args)) + for i, arg := range args { + if m, ok := arg.(Matcher); ok { + margs[i] = m + } else if arg == nil { + // Handle nil specially so that passing a nil interface value + // will match the typed nils of concrete args. + margs[i] = Nil() + } else { + margs[i] = Eq(arg) + } + } + + ctrl.mu.Lock() + defer ctrl.mu.Unlock() + + call := &Call{t: ctrl.t, receiver: receiver, method: method, args: margs, minCalls: 1, maxCalls: 1} + + ctrl.expectedCalls.Add(call) + return call +} + +func (ctrl *Controller) Call(receiver interface{}, method string, args ...interface{}) []interface{} { + ctrl.mu.Lock() + defer ctrl.mu.Unlock() + + expected := ctrl.expectedCalls.FindMatch(receiver, method, args) + if expected == nil { + ctrl.t.Fatalf("no matching expected call: %T.%v(%v)", receiver, method, args) + } + + // Two things happen here: + // * the matching call no longer needs to check prerequite calls, + // * and the prerequite calls are no longer expected, so remove them. + preReqCalls := expected.dropPrereqs() + for _, preReqCall := range preReqCalls { + ctrl.expectedCalls.Remove(preReqCall) + } + + rets, action := expected.call(args) + if expected.exhausted() { + ctrl.expectedCalls.Remove(expected) + } + + // Don't hold the lock while doing the call's action (if any) + // so that actions may execute concurrently. + // We use the deferred Unlock to capture any panics that happen above; + // here we add a deferred Lock to balance it. + ctrl.mu.Unlock() + defer ctrl.mu.Lock() + if action != nil { + action() + } + + return rets +} + +func (ctrl *Controller) Finish() { + ctrl.mu.Lock() + defer ctrl.mu.Unlock() + + // If we're currently panicking, probably because this is a deferred call, + // pass through the panic. + if err := recover(); err != nil { + panic(err) + } + + // Check that all remaining expected calls are satisfied. + failures := false + for _, methodMap := range ctrl.expectedCalls { + for _, calls := range methodMap { + for _, call := range calls { + if !call.satisfied() { + ctrl.t.Errorf("missing call(s) to %v", call) + failures = true + } + } + } + } + if failures { + ctrl.t.Fatalf("aborting test due to missing call(s)") + } +} diff --git a/Godeps/_workspace/src/github.com/golang/mock/gomock/matchers.go b/Godeps/_workspace/src/github.com/golang/mock/gomock/matchers.go new file mode 100644 index 000000000..32628ae8c --- /dev/null +++ b/Godeps/_workspace/src/github.com/golang/mock/gomock/matchers.go @@ -0,0 +1,97 @@ +// Copyright 2010 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gomock + +import ( + "fmt" + "reflect" +) + +// A Matcher is a representation of a class of values. +// It is used to represent the valid or expected arguments to a mocked method. +type Matcher interface { + // Matches returns whether y is a match. + Matches(x interface{}) bool + + // String describes what the matcher matches. + String() string +} + +type anyMatcher struct{} + +func (anyMatcher) Matches(x interface{}) bool { + return true +} + +func (anyMatcher) String() string { + return "is anything" +} + +type eqMatcher struct { + x interface{} +} + +func (e eqMatcher) Matches(x interface{}) bool { + return reflect.DeepEqual(e.x, x) +} + +func (e eqMatcher) String() string { + return fmt.Sprintf("is equal to %v", e.x) +} + +type nilMatcher struct{} + +func (nilMatcher) Matches(x interface{}) bool { + if x == nil { + return true + } + + v := reflect.ValueOf(x) + switch v.Kind() { + case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, + reflect.Ptr, reflect.Slice: + return v.IsNil() + } + + return false +} + +func (nilMatcher) String() string { + return "is nil" +} + +type notMatcher struct { + m Matcher +} + +func (n notMatcher) Matches(x interface{}) bool { + return !n.m.Matches(x) +} + +func (n notMatcher) String() string { + // TODO: Improve this if we add a NotString method to the Matcher interface. + return "not(" + n.m.String() + ")" +} + +// Constructors +func Any() Matcher { return anyMatcher{} } +func Eq(x interface{}) Matcher { return eqMatcher{x} } +func Nil() Matcher { return nilMatcher{} } +func Not(x interface{}) Matcher { + if m, ok := x.(Matcher); ok { + return notMatcher{m} + } + return notMatcher{Eq(x)} +} diff --git a/Godeps/_workspace/src/github.com/golang/mock/gomock/mock_matcher/mock_matcher.go b/Godeps/_workspace/src/github.com/golang/mock/gomock/mock_matcher/mock_matcher.go new file mode 100644 index 000000000..eaea0e63a --- /dev/null +++ b/Godeps/_workspace/src/github.com/golang/mock/gomock/mock_matcher/mock_matcher.go @@ -0,0 +1,49 @@ +// Automatically generated by MockGen. DO NOT EDIT! +// Source: github.com/golang/mock/gomock (interfaces: Matcher) + +package mock_gomock + +import ( + gomock "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/golang/mock/gomock" +) + +// Mock of Matcher interface +type MockMatcher struct { + ctrl *gomock.Controller + recorder *_MockMatcherRecorder +} + +// Recorder for MockMatcher (not exported) +type _MockMatcherRecorder struct { + mock *MockMatcher +} + +func NewMockMatcher(ctrl *gomock.Controller) *MockMatcher { + mock := &MockMatcher{ctrl: ctrl} + mock.recorder = &_MockMatcherRecorder{mock} + return mock +} + +func (_m *MockMatcher) EXPECT() *_MockMatcherRecorder { + return _m.recorder +} + +func (_m *MockMatcher) Matches(_param0 interface{}) bool { + ret := _m.ctrl.Call(_m, "Matches", _param0) + ret0, _ := ret[0].(bool) + return ret0 +} + +func (_mr *_MockMatcherRecorder) Matches(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Matches", arg0) +} + +func (_m *MockMatcher) String() string { + ret := _m.ctrl.Call(_m, "String") + ret0, _ := ret[0].(string) + return ret0 +} + +func (_mr *_MockMatcherRecorder) String() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "String") +} diff --git a/cmd/admin-revoker/main.go b/cmd/admin-revoker/main.go index 0f81f604c..4c9cc7b8b 100644 --- a/cmd/admin-revoker/main.go +++ b/cmd/admin-revoker/main.go @@ -48,10 +48,7 @@ func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog. cmd.FailOnError(err, "Could not connect to Syslog") blog.SetAuditLogger(auditlogger) - ch, err := rpc.AmqpChannel(c) - cmd.FailOnError(err, "Could not connect to AMQP") - - raRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->RA", c.AMQP.RA.Server, ch, stats) + raRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->RA", c.AMQP.RA.Server, c, stats) cmd.FailOnError(err, "Unable to create RPC client") rac, err := rpc.NewRegistrationAuthorityClient(raRPC) @@ -60,7 +57,7 @@ func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog. dbMap, err := sa.NewDbMap(c.Revoker.DBConnect) cmd.FailOnError(err, "Couldn't setup database connection") - saRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->SA", c.AMQP.SA.Server, ch, stats) + saRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->SA", c.AMQP.SA.Server, c, stats) cmd.FailOnError(err, "Unable to create RPC client") sac, err := rpc.NewStorageAuthorityClient(saRPC) diff --git a/cmd/boulder-ca/main.go b/cmd/boulder-ca/main.go index 7217fdf4a..c3bf8548b 100644 --- a/cmd/boulder-ca/main.go +++ b/cmd/boulder-ca/main.go @@ -45,24 +45,22 @@ func main() { go cmd.ProfileCmd("CA", stats) - connectionHandler := func(srv *rpc.AmqpRPCServer) { - saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, srv.Channel, stats) - cmd.FailOnError(err, "Unable to create RPC client") + saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, c, stats) + cmd.FailOnError(err, "Unable to create RPC client") - sac, err := rpc.NewStorageAuthorityClient(saRPC) - cmd.FailOnError(err, "Failed to create SA client") + sac, err := rpc.NewStorageAuthorityClient(saRPC) + cmd.FailOnError(err, "Failed to create SA client") - pubRPC, err := rpc.NewAmqpRPCClient("CA->Publisher", c.AMQP.Publisher.Server, srv.Channel, stats) - cmd.FailOnError(err, "Unable to create RPC client") + pubRPC, err := rpc.NewAmqpRPCClient("CA->Publisher", c.AMQP.Publisher.Server, c, stats) + cmd.FailOnError(err, "Unable to create RPC client") - pubc, err := rpc.NewPublisherClient(pubRPC) - cmd.FailOnError(err, "Failed to create Publisher client") + pubc, err := rpc.NewPublisherClient(pubRPC) + cmd.FailOnError(err, "Failed to create Publisher client") - cai.Publisher = &pubc - cai.SA = &sac - } + cai.Publisher = &pubc + cai.SA = &sac - cas, err := rpc.NewAmqpRPCServer(c.AMQP.CA.Server, connectionHandler, c.CA.MaxConcurrentRPCServerRequests) + cas, err := rpc.NewAmqpRPCServer(c.AMQP.CA.Server, c.CA.MaxConcurrentRPCServerRequests, c) cmd.FailOnError(err, "Unable to create CA RPC server") rpc.NewCertificateAuthorityServer(cas, cai) diff --git a/cmd/boulder-publisher/main.go b/cmd/boulder-publisher/main.go index b3ec19407..5d51becbf 100644 --- a/cmd/boulder-publisher/main.go +++ b/cmd/boulder-publisher/main.go @@ -36,17 +36,15 @@ func main() { go cmd.DebugServer(c.Publisher.DebugAddr) go cmd.ProfileCmd("Publisher", stats) - connectionHandler := func(srv *rpc.AmqpRPCServer) { - saRPC, err := rpc.NewAmqpRPCClient("Publisher->SA", c.AMQP.SA.Server, srv.Channel, stats) - cmd.FailOnError(err, "Unable to create SA RPC client") + saRPC, err := rpc.NewAmqpRPCClient("Publisher->SA", c.AMQP.SA.Server, c, stats) + cmd.FailOnError(err, "Unable to create SA RPC client") - sac, err := rpc.NewStorageAuthorityClient(saRPC) - cmd.FailOnError(err, "Unable to create SA client") + sac, err := rpc.NewStorageAuthorityClient(saRPC) + cmd.FailOnError(err, "Unable to create SA client") - pubi.SA = &sac - } + pubi.SA = &sac - pubs, err := rpc.NewAmqpRPCServer(c.AMQP.Publisher.Server, connectionHandler, c.Publisher.MaxConcurrentRPCServerRequests) + pubs, err := rpc.NewAmqpRPCServer(c.AMQP.Publisher.Server, c.Publisher.MaxConcurrentRPCServerRequests, c) cmd.FailOnError(err, "Unable to create Publisher RPC server") rpc.NewPublisherServer(pubs, &pubi) diff --git a/cmd/boulder-ra/main.go b/cmd/boulder-ra/main.go index e9facf68d..f670fde1c 100644 --- a/cmd/boulder-ra/main.go +++ b/cmd/boulder-ra/main.go @@ -59,31 +59,29 @@ func main() { go cmd.ProfileCmd("RA", stats) - connectionHandler := func(srv *rpc.AmqpRPCServer) { - vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, srv.Channel, stats) - cmd.FailOnError(err, "Unable to create RPC client") + vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, c, stats) + cmd.FailOnError(err, "Unable to create RPC client") - caRPC, err := rpc.NewAmqpRPCClient("RA->CA", c.AMQP.CA.Server, srv.Channel, stats) - cmd.FailOnError(err, "Unable to create RPC client") + caRPC, err := rpc.NewAmqpRPCClient("RA->CA", c.AMQP.CA.Server, c, stats) + cmd.FailOnError(err, "Unable to create RPC client") - saRPC, err := rpc.NewAmqpRPCClient("RA->SA", c.AMQP.SA.Server, srv.Channel, stats) - cmd.FailOnError(err, "Unable to create RPC client") + saRPC, err := rpc.NewAmqpRPCClient("RA->SA", c.AMQP.SA.Server, c, stats) + cmd.FailOnError(err, "Unable to create RPC client") - vac, err := rpc.NewValidationAuthorityClient(vaRPC) - cmd.FailOnError(err, "Unable to create VA client") + vac, err := rpc.NewValidationAuthorityClient(vaRPC) + cmd.FailOnError(err, "Unable to create VA client") - cac, err := rpc.NewCertificateAuthorityClient(caRPC) - cmd.FailOnError(err, "Unable to create CA client") + cac, err := rpc.NewCertificateAuthorityClient(caRPC) + cmd.FailOnError(err, "Unable to create CA client") - sac, err := rpc.NewStorageAuthorityClient(saRPC) - cmd.FailOnError(err, "Unable to create SA client") + sac, err := rpc.NewStorageAuthorityClient(saRPC) + cmd.FailOnError(err, "Unable to create SA client") - rai.VA = &vac - rai.CA = &cac - rai.SA = &sac - } + rai.VA = &vac + rai.CA = &cac + rai.SA = &sac - ras, err := rpc.NewAmqpRPCServer(c.AMQP.RA.Server, connectionHandler, c.RA.MaxConcurrentRPCServerRequests) + ras, err := rpc.NewAmqpRPCServer(c.AMQP.RA.Server, c.RA.MaxConcurrentRPCServerRequests, c) cmd.FailOnError(err, "Unable to create RA RPC server") rpc.NewRegistrationAuthorityServer(ras, &rai) diff --git a/cmd/boulder-sa/main.go b/cmd/boulder-sa/main.go index cb9f1ec1b..f08722f81 100644 --- a/cmd/boulder-sa/main.go +++ b/cmd/boulder-sa/main.go @@ -41,9 +41,7 @@ func main() { go cmd.ProfileCmd("SA", stats) - connectionHandler := func(*rpc.AmqpRPCServer) {} - - sas, err := rpc.NewAmqpRPCServer(c.AMQP.SA.Server, connectionHandler, c.SA.MaxConcurrentRPCServerRequests) + sas, err := rpc.NewAmqpRPCServer(c.AMQP.SA.Server, c.SA.MaxConcurrentRPCServerRequests, c) cmd.FailOnError(err, "Unable to create SA RPC server") rpc.NewStorageAuthorityServer(sas, sai) diff --git a/cmd/boulder-va/main.go b/cmd/boulder-va/main.go index 25912532c..f5fb01cf2 100644 --- a/cmd/boulder-va/main.go +++ b/cmd/boulder-va/main.go @@ -62,17 +62,15 @@ func main() { } vai.UserAgent = c.VA.UserAgent - connectionHandler := func(srv *rpc.AmqpRPCServer) { - raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, srv.Channel, stats) - cmd.FailOnError(err, "Unable to create RPC client") + raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, c, stats) + cmd.FailOnError(err, "Unable to create RPC client") - rac, err := rpc.NewRegistrationAuthorityClient(raRPC) - cmd.FailOnError(err, "Unable to create RA client") + rac, err := rpc.NewRegistrationAuthorityClient(raRPC) + cmd.FailOnError(err, "Unable to create RA client") - vai.RA = &rac - } + vai.RA = &rac - vas, err := rpc.NewAmqpRPCServer(c.AMQP.VA.Server, connectionHandler, c.VA.MaxConcurrentRPCServerRequests) + vas, err := rpc.NewAmqpRPCServer(c.AMQP.VA.Server, c.VA.MaxConcurrentRPCServerRequests, c) cmd.FailOnError(err, "Unable to create VA RPC server") rpc.NewValidationAuthorityServer(vas, vai) diff --git a/cmd/boulder-wfe/main.go b/cmd/boulder-wfe/main.go index a00612320..fca8187a9 100644 --- a/cmd/boulder-wfe/main.go +++ b/cmd/boulder-wfe/main.go @@ -14,7 +14,6 @@ import ( "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/codegangsta/cli" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/facebookgo/httpdown" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/jmhodges/clock" - "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" "github.com/letsencrypt/boulder/cmd" blog "github.com/letsencrypt/boulder/log" @@ -23,17 +22,11 @@ import ( "github.com/letsencrypt/boulder/wfe" ) -func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient, chan *amqp.Error) { - ch, err := rpc.AmqpChannel(c) - cmd.FailOnError(err, "Could not connect to AMQP") - logger.Info(" [!] Connected to AMQP") - - closeChan := ch.NotifyClose(make(chan *amqp.Error, 1)) - - raRPC, err := rpc.NewAmqpRPCClient("WFE->RA", c.AMQP.RA.Server, ch, stats) +func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient) { + raRPC, err := rpc.NewAmqpRPCClient("WFE->RA", c.AMQP.RA.Server, c, stats) cmd.FailOnError(err, "Unable to create RPC client") - saRPC, err := rpc.NewAmqpRPCClient("WFE->SA", c.AMQP.SA.Server, ch, stats) + saRPC, err := rpc.NewAmqpRPCClient("WFE->SA", c.AMQP.SA.Server, c, stats) cmd.FailOnError(err, "Unable to create RPC client") rac, err := rpc.NewRegistrationAuthorityClient(raRPC) @@ -42,7 +35,7 @@ func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (rpc sac, err := rpc.NewStorageAuthorityClient(saRPC) cmd.FailOnError(err, "Unable to create SA client") - return rac, sac, closeChan + return rac, sac } func main() { @@ -78,7 +71,7 @@ func main() { wfe, err := wfe.NewWebFrontEndImpl(stats, clock.Default()) cmd.FailOnError(err, "Unable to create WFE") - rac, sac, closeChan := setupWFE(c, auditlogger, stats) + rac, sac := setupWFE(c, auditlogger, stats) wfe.RA = &rac wfe.SA = &sac wfe.SubscriberAgreementURL = c.SubscriberAgreementURL @@ -104,21 +97,6 @@ func main() { go cmd.ProfileCmd("WFE", stats) - go func() { - // sit around and reconnect to AMQP if the channel - // drops for some reason and repopulate the wfe object - // with new RA and SA rpc clients. - for { - for err := range closeChan { - auditlogger.Warning(fmt.Sprintf(" [!] AMQP Channel closed, will reconnect in 5 seconds: [%s]", err)) - time.Sleep(time.Second * 5) - rac, sac, closeChan = setupWFE(c, auditlogger, stats) - wfe.RA = &rac - wfe.SA = &sac - } - } - }() - // Set up paths wfe.BaseURL = c.Common.BaseURL h, err := wfe.Handler() diff --git a/cmd/expiration-mailer/main.go b/cmd/expiration-mailer/main.go index c0cef2bdc..90e1429bc 100644 --- a/cmd/expiration-mailer/main.go +++ b/cmd/expiration-mailer/main.go @@ -245,10 +245,7 @@ func main() { dbMap, err := sa.NewDbMap(c.Mailer.DBConnect) cmd.FailOnError(err, "Could not connect to database") - ch, err := rpc.AmqpChannel(c) - cmd.FailOnError(err, "Could not connect to AMQP") - - saRPC, err := rpc.NewAmqpRPCClient("ExpirationMailer->SA", c.AMQP.SA.Server, ch, stats) + saRPC, err := rpc.NewAmqpRPCClient("ExpirationMailer->SA", c.AMQP.SA.Server, c, stats) cmd.FailOnError(err, "Unable to create RPC client") sac, err := rpc.NewStorageAuthorityClient(saRPC) diff --git a/cmd/ocsp-updater/main.go b/cmd/ocsp-updater/main.go index b7ac2536e..778a4ffb7 100644 --- a/cmd/ocsp-updater/main.go +++ b/cmd/ocsp-updater/main.go @@ -9,12 +9,10 @@ import ( "crypto/x509" "database/sql" "fmt" - "os" "time" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/jmhodges/clock" - "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" gorp "github.com/letsencrypt/boulder/Godeps/_workspace/src/gopkg.in/gorp.v1" "github.com/letsencrypt/boulder/cmd" @@ -464,32 +462,26 @@ func setupClients(c cmd.Config, stats statsd.Statter) ( core.CertificateAuthority, core.Publisher, core.StorageAuthority, - chan *amqp.Error, ) { - ch, err := rpc.AmqpChannel(c) - cmd.FailOnError(err, "Could not connect to AMQP") - - closeChan := ch.NotifyClose(make(chan *amqp.Error, 1)) - - caRPC, err := rpc.NewAmqpRPCClient("OCSP->CA", c.AMQP.CA.Server, ch, stats) + caRPC, err := rpc.NewAmqpRPCClient("OCSP->CA", c.AMQP.CA.Server, c, stats) cmd.FailOnError(err, "Unable to create RPC client") cac, err := rpc.NewCertificateAuthorityClient(caRPC) cmd.FailOnError(err, "Unable to create CA client") - pubRPC, err := rpc.NewAmqpRPCClient("OCSP->Publisher", c.AMQP.Publisher.Server, ch, stats) + pubRPC, err := rpc.NewAmqpRPCClient("OCSP->Publisher", c.AMQP.Publisher.Server, c, stats) cmd.FailOnError(err, "Unable to create RPC client") pubc, err := rpc.NewPublisherClient(pubRPC) cmd.FailOnError(err, "Unable to create Publisher client") - saRPC, err := rpc.NewAmqpRPCClient("OCSP->SA", c.AMQP.SA.Server, ch, stats) + saRPC, err := rpc.NewAmqpRPCClient("OCSP->SA", c.AMQP.SA.Server, c, stats) cmd.FailOnError(err, "Unable to create RPC client") sac, err := rpc.NewStorageAuthorityClient(saRPC) cmd.FailOnError(err, "Unable to create Publisher client") - return cac, pubc, sac, closeChan + return cac, pubc, sac } func main() { @@ -516,7 +508,7 @@ func main() { dbMap, err := sa.NewDbMap(c.OCSPUpdater.DBConnect) cmd.FailOnError(err, "Could not connect to database") - cac, pubc, sac, closeChan := setupClients(c, stats) + cac, pubc, sac := setupClients(c, stats) updater, err := newUpdater( stats, @@ -530,6 +522,8 @@ func main() { len(c.Common.CT.Logs), ) + cmd.FailOnError(err, "Failed to create updater") + for _, l := range updater.loops { go func(loop *looper) { err = loop.loop() @@ -539,15 +533,8 @@ func main() { }(l) } - cmd.FailOnError(err, "Failed to create updater") - - // TODO(): When the channel falls over so do we for now, if the AMQP channel - // has already closed there is no real cleanup we can do. This is due to - // really needing to change the underlying AMQP Server/Client reconnection - // logic. - err = <-closeChan - auditlogger.AuditErr(fmt.Errorf(" [!] AMQP Channel closed, exiting: [%s]", err)) - os.Exit(1) + // Sleep forever (until signaled) + select {} } app.Run() diff --git a/cmd/shell.go b/cmd/shell.go index b7d0d6b5d..6f7733871 100644 --- a/cmd/shell.go +++ b/cmd/shell.go @@ -58,15 +58,19 @@ type Config struct { // General AMQP struct { - Server string - Insecure bool - RA Queue - VA Queue - SA Queue - CA Queue - OCSP Queue - Publisher Queue - TLS *TLSConfig + Server string + Insecure bool + RA Queue + VA Queue + SA Queue + CA Queue + OCSP Queue + Publisher Queue + TLS *TLSConfig + ReconnectTimeouts struct { + Base ConfigDuration + Max ConfigDuration + } } WFE struct { diff --git a/rpc/amqp-rpc.go b/rpc/amqp-rpc.go index 28812951a..02327b758 100644 --- a/rpc/amqp-rpc.go +++ b/rpc/amqp-rpc.go @@ -52,12 +52,13 @@ const ( AmqpAutoAck = true AmqpMandatory = false AmqpImmediate = false + consumerName = "boulder" ) // AMQPDeclareExchange attempts to declare the configured AMQP exchange, // returning silently if already declared, erroring if nonexistant and // unable to create. -func AMQPDeclareExchange(conn *amqp.Connection) error { +func amqpDeclareExchange(conn *amqp.Connection) error { var err error var ch *amqp.Channel log := blog.GetAuditLogger() @@ -77,7 +78,7 @@ func AMQPDeclareExchange(conn *amqp.Connection) error { AmqpNoWait, nil) if err != nil { - log.Info(fmt.Sprintf("Exchange %s does not exist on AMQP server, attempting to create. (err=%s)", AmqpExchange, err)) + log.Info(fmt.Sprintf("Exchange %s does not exist on AMQP server, creating.", AmqpExchange)) // Channel is invalid at this point, so recreate ch.Close() @@ -100,6 +101,7 @@ func AMQPDeclareExchange(conn *amqp.Connection) error { ch.Close() return err } + log.Info(fmt.Sprintf("Created exchange %s.", AmqpExchange)) } ch.Close() @@ -107,7 +109,7 @@ func AMQPDeclareExchange(conn *amqp.Connection) error { } // A simplified way to declare and subscribe to an AMQP queue -func amqpSubscribe(ch *amqp.Channel, name string, consumerName string, log *blog.AuditLogger) (<-chan amqp.Delivery, error) { +func amqpSubscribe(ch amqpChannel, name string) (<-chan amqp.Delivery, error) { var err error _, err = ch.QueueDeclare( @@ -118,8 +120,7 @@ func amqpSubscribe(ch *amqp.Channel, name string, consumerName string, log *blog AmqpNoWait, nil) if err != nil { - log.Crit(fmt.Sprintf("Could not declare queue: %s", err)) - return nil, err + return nil, fmt.Errorf("could not declare queue: %s", err) } routingKey := name @@ -131,13 +132,12 @@ func amqpSubscribe(ch *amqp.Channel, name string, consumerName string, log *blog false, nil) if err != nil { - log.Crit(fmt.Sprintf("Could not bind to queue [%s]. NOTE: You may need to delete %s to re-trigger the bind attempt after fixing permissions, or manually bind the queue to %s.", name, name, routingKey)) + err = fmt.Errorf( + "Could not bind to queue [%s]. NOTE: You may need to delete %s to re-trigger the bind attempt after fixing permissions, or manually bind the queue to %s.", + name, name, routingKey) return nil, err } - // A consumer name is used so that the specific consumer can be cancelled later - // if signalled. If no name is used a UID is used which cannot be retrieved (as - // far as I can tell). msgs, err := ch.Consume( name, consumerName, @@ -147,11 +147,10 @@ func amqpSubscribe(ch *amqp.Channel, name string, consumerName string, log *blog AmqpNoWait, nil) if err != nil { - log.Crit(fmt.Sprintf("Could not subscribe to queue: %s", err)) - return nil, err + return nil, fmt.Errorf("Could not subscribe to queue: %s", err) } - return msgs, err + return msgs, nil } // AmqpRPCServer listens on a specified queue within an AMQP channel. @@ -162,14 +161,12 @@ func amqpSubscribe(ch *amqp.Channel, name string, consumerName string, log *blog // method to add specific actions. type AmqpRPCServer struct { serverQueue string - Channel *amqp.Channel + connection *amqpConnector log *blog.AuditLogger dispatchTable map[string]func([]byte) ([]byte, error) - connectionHandler func(*AmqpRPCServer) - consumerName string connected bool done bool - dMu sync.Mutex + mu sync.RWMutex currentGoroutines int64 maxConcurrentRPCServerRequests int64 tooManyRequestsResponse []byte @@ -177,20 +174,23 @@ type AmqpRPCServer struct { // NewAmqpRPCServer creates a new RPC server for the given queue and will begin // consuming requests from the queue. To start the server you must call Start(). -func NewAmqpRPCServer(serverQueue string, handler func(*AmqpRPCServer), maxConcurrentRPCServerRequests int64) (*AmqpRPCServer, error) { +func NewAmqpRPCServer(serverQueue string, maxConcurrentRPCServerRequests int64, c cmd.Config) (*AmqpRPCServer, error) { log := blog.GetAuditLogger() - b := make([]byte, 4) - _, err := rand.Read(b) - if err != nil { - return nil, err + + reconnectBase := c.AMQP.ReconnectTimeouts.Base.Duration + if reconnectBase == 0 { + reconnectBase = 20 * time.Millisecond } - consumerName := fmt.Sprintf("%s.%x", serverQueue, b) + reconnectMax := c.AMQP.ReconnectTimeouts.Max.Duration + if reconnectMax == 0 { + reconnectMax = time.Minute + } + return &AmqpRPCServer{ serverQueue: serverQueue, + connection: newAMQPConnector(serverQueue, reconnectBase, reconnectMax), log: log, dispatchTable: make(map[string]func([]byte) ([]byte, error)), - connectionHandler: handler, - consumerName: consumerName, maxConcurrentRPCServerRequests: maxConcurrentRPCServerRequests, }, nil } @@ -349,7 +349,7 @@ func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) { return nil, err } - err = AMQPDeclareExchange(conn) + err = amqpDeclareExchange(conn) if err != nil { return nil, err } @@ -380,31 +380,23 @@ func (rpc *AmqpRPCServer) processMessage(msg amqp.Delivery) { rpc.log.Info(fmt.Sprintf(" [s>][%s][%s] %s failed, replying: %s (%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, response.Error.Value, response.Error.Type, msg.CorrelationId)) } rpc.log.Debug(fmt.Sprintf(" [s>][%s][%s] replying %s(%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(jsonResponse), msg.CorrelationId)) - rpc.Channel.Publish( - AmqpExchange, + rpc.connection.publish( msg.ReplyTo, - AmqpMandatory, - AmqpImmediate, - amqp.Publishing{ - CorrelationId: msg.CorrelationId, - Type: msg.Type, - Body: jsonResponse, // XXX-JWS: jws.Sign(privKey, body) - Expiration: "30000", - }) + msg.CorrelationId, + "30000", + "", + msg.Type, + jsonResponse) } -func (rpc *AmqpRPCServer) replyTooManyRequests(msg amqp.Delivery) { - rpc.Channel.Publish( - AmqpExchange, +func (rpc *AmqpRPCServer) replyTooManyRequests(msg amqp.Delivery) error { + return rpc.connection.publish( msg.ReplyTo, - AmqpMandatory, - AmqpImmediate, - amqp.Publishing{ - CorrelationId: msg.CorrelationId, - Type: msg.Type, - Body: rpc.tooManyRequestsResponse, - Expiration: "1000", - }) + msg.CorrelationId, + "1000", + "", + msg.Type, + rpc.tooManyRequestsResponse) } // Start starts the AMQP-RPC server and handles reconnections, this will block @@ -420,56 +412,45 @@ func (rpc *AmqpRPCServer) Start(c cmd.Config) error { } rpc.tooManyRequestsResponse = tooManyRequestsResponse + err = rpc.connection.connect(c) + if err != nil { + return err + } + rpc.mu.Lock() + rpc.connected = true + rpc.mu.Unlock() + go rpc.catchSignals() + for { - rpc.dMu.Lock() - if rpc.done { - rpc.dMu.Unlock() - break - } - rpc.dMu.Unlock() - var err error - rpc.Channel, err = AmqpChannel(c) - if err != nil { - return err - } - rpc.connectionHandler(rpc) - - msgs, err := amqpSubscribe(rpc.Channel, rpc.serverQueue, rpc.consumerName, rpc.log) - if err != nil { - return err - } - rpc.connected = true - rpc.log.Info(" [!] Connected to AMQP") - - closeChan := rpc.Channel.NotifyClose(make(chan *amqp.Error, 1)) - for blocking := true; blocking; { - select { - case msg, ok := <-msgs: - if ok { - if rpc.maxConcurrentRPCServerRequests > 0 && atomic.LoadInt64(&rpc.currentGoroutines) >= rpc.maxConcurrentRPCServerRequests { - rpc.replyTooManyRequests(msg) - break // this breaks the select, not the for - } - go func() { - atomic.AddInt64(&rpc.currentGoroutines, 1) - defer atomic.AddInt64(&rpc.currentGoroutines, -1) - rpc.processMessage(msg) - }() - } else { - // chan has been closed by rpc.channel.Cancel + select { + case msg, ok := <-rpc.connection.messages(): + if ok { + if rpc.maxConcurrentRPCServerRequests > 0 && atomic.LoadInt64(&rpc.currentGoroutines) >= rpc.maxConcurrentRPCServerRequests { + rpc.replyTooManyRequests(msg) + break // this breaks the select, not the for + } + go func() { + atomic.AddInt64(&rpc.currentGoroutines, 1) + defer atomic.AddInt64(&rpc.currentGoroutines, -1) + rpc.processMessage(msg) + }() + } else { + rpc.mu.RLock() + if rpc.done { + // chan has been closed by rpc.connection.Cancel rpc.log.Info(" [!] Finished processing messages") + rpc.mu.RUnlock() return nil } - case err = <-closeChan: - rpc.connected = false - rpc.log.Warning(fmt.Sprintf(" [!] AMQP Channel closed, will reconnect in 5 seconds: [%s]", err)) - time.Sleep(time.Second * 5) - blocking = false + rpc.mu.RUnlock() + rpc.log.Info(" [!] Got channel close, but no signal to shut down. Continuing.") } + case err = <-rpc.connection.closeChannel(): + rpc.log.Info(fmt.Sprintf(" [!] Server channel closed: %s", rpc.serverQueue)) + rpc.connection.reconnect(c, rpc.log) } } - return nil } var signalToName = map[os.Signal]string{ @@ -494,39 +475,36 @@ func (rpc *AmqpRPCServer) catchSignals() { // continue blocking until it has processed any messages that have already been // retrieved. func (rpc *AmqpRPCServer) Stop() { + rpc.mu.Lock() + rpc.done = true + rpc.mu.Unlock() if rpc.connected { rpc.log.Info(" [!] Shutting down RPC server, stopping new deliveries and processing remaining messages") - rpc.Channel.Cancel(rpc.consumerName, false) + rpc.connection.cancel() } else { rpc.log.Info("[!] Shutting down RPC server, nothing to clean up") - rpc.dMu.Lock() - rpc.done = true - rpc.dMu.Unlock() } } // AmqpRPCCLient is an AMQP-RPC client that sends requests to a specific server // queue, and uses a dedicated response queue for responses. // -// To implement specific functionality, using code uses the Dispatch() +// To implement specific functionality, using code uses the DispatchSync() // method to send a method name and body, and get back a response. So // you end up with wrapper methods of the form: // // ``` // request = /* serialize request to []byte */ -// response = <-AmqpRPCCLient.Dispatch(method, request) +// response = AmqpRPCCLient.Dispatch(method, request) // return /* deserialized response */ // ``` // -// Callers that don't care about the response can just call Dispatch() -// and ignore the return value. -// // DispatchSync will manage the channel for you, and also enforce a -// timeout on the transaction (default 60 seconds) +// timeout on the transaction. type AmqpRPCCLient struct { serverQueue string clientQueue string - channel *amqp.Channel + connection *amqpConnector timeout time.Duration log *blog.AuditLogger @@ -537,7 +515,7 @@ type AmqpRPCCLient struct { } // NewAmqpRPCClient constructs an RPC client using AMQP -func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, channel *amqp.Channel, stats statsd.Statter) (rpc *AmqpRPCCLient, err error) { +func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats statsd.Statter) (rpc *AmqpRPCCLient, err error) { hostname, err := os.Hostname() if err != nil { return nil, err @@ -550,40 +528,59 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, channel *amqp.Chann } clientQueue := fmt.Sprintf("%s.%s.%x", clientQueuePrefix, hostname, randID) + reconnectBase := c.AMQP.ReconnectTimeouts.Base.Duration + if reconnectBase == 0 { + reconnectBase = 20 * time.Millisecond + } + reconnectMax := c.AMQP.ReconnectTimeouts.Max.Duration + if reconnectMax == 0 { + reconnectMax = time.Minute + } + rpc = &AmqpRPCCLient{ serverQueue: serverQueue, clientQueue: clientQueue, - channel: channel, + connection: newAMQPConnector(clientQueue, reconnectBase, reconnectMax), pending: make(map[string]chan []byte), timeout: 10 * time.Second, log: blog.GetAuditLogger(), stats: stats, } - // Subscribe to the response queue and dispatch - msgs, err := amqpSubscribe(rpc.channel, clientQueue, "", rpc.log) + err = rpc.connection.connect(c) if err != nil { return nil, err } go func() { - for msg := range msgs { - // XXX-JWS: jws.Sign(privKey, body) - corrID := msg.CorrelationId - rpc.mu.RLock() - responseChan, present := rpc.pending[corrID] - rpc.mu.RUnlock() + for { + select { + case msg, ok := <-rpc.connection.messages(): + if ok { + corrID := msg.CorrelationId + rpc.mu.RLock() + responseChan, present := rpc.pending[corrID] + rpc.mu.RUnlock() - rpc.log.Debug(fmt.Sprintf(" [c<][%s] response %s(%s) [%s]", clientQueue, msg.Type, core.B64enc(msg.Body), corrID)) - if !present { - // AUDIT[ Misrouted Messages ] f523f21f-12d2-4c31-b2eb-ee4b7d96d60e - rpc.log.Audit(fmt.Sprintf(" [c<][%s] Misrouted message: %s - %s - %s", clientQueue, msg.Type, core.B64enc(msg.Body), msg.CorrelationId)) - continue + rpc.log.Debug(fmt.Sprintf(" [c<][%s] response %s(%s) [%s]", clientQueue, msg.Type, core.B64enc(msg.Body), corrID)) + if !present { + // AUDIT[ Misrouted Messages ] f523f21f-12d2-4c31-b2eb-ee4b7d96d60e + rpc.log.Audit(fmt.Sprintf(" [c<][%s] Misrouted message: %s - %s - %s", clientQueue, msg.Type, core.B64enc(msg.Body), msg.CorrelationId)) + continue + } + responseChan <- msg.Body + rpc.mu.Lock() + delete(rpc.pending, corrID) + rpc.mu.Unlock() + } else { + // chan has been closed by rpc.connection.Cancel + rpc.log.Info(fmt.Sprintf(" [!] Client reply channel closed: %s", rpc.clientQueue)) + continue + } + case err = <-rpc.connection.closeChannel(): + rpc.log.Info(fmt.Sprintf(" [!] Client reply channel closed : %s", rpc.clientQueue)) + rpc.connection.reconnect(c, rpc.log) } - responseChan <- msg.Body - rpc.mu.Lock() - delete(rpc.pending, corrID) - rpc.mu.Unlock() } }() @@ -596,10 +593,10 @@ func (rpc *AmqpRPCCLient) SetTimeout(ttl time.Duration) { rpc.timeout = ttl } -// Dispatch sends a body to the destination, and returns a response channel +// dispatch sends a body to the destination, and returns a response channel // that can be used to monitor for responses, or discarded for one-shot // actions. -func (rpc *AmqpRPCCLient) Dispatch(method string, body []byte) chan []byte { +func (rpc *AmqpRPCCLient) dispatch(method string, body []byte) chan []byte { // Create a channel on which to direct the response // At least in some cases, it's important that this channel // be buffered to avoid deadlock @@ -611,18 +608,13 @@ func (rpc *AmqpRPCCLient) Dispatch(method string, body []byte) chan []byte { // Send the request rpc.log.Debug(fmt.Sprintf(" [c>][%s] requesting %s(%s) [%s]", rpc.clientQueue, method, core.B64enc(body), corrID)) - rpc.channel.Publish( - AmqpExchange, + rpc.connection.publish( rpc.serverQueue, - AmqpMandatory, - AmqpImmediate, - amqp.Publishing{ - CorrelationId: corrID, - ReplyTo: rpc.clientQueue, - Type: method, - Body: body, // XXX-JWS: jws.Sign(privKey, body) - Expiration: "30000", - }) + corrID, + "30000", + rpc.clientQueue, + method, + body) return responseChan } @@ -635,7 +627,7 @@ func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []b defer rpc.stats.GaugeDelta("RPC.CallsWaiting", -1, 1.0) callStarted := time.Now() select { - case jsonResponse := <-rpc.Dispatch(method, body): + case jsonResponse := <-rpc.dispatch(method, body): var rpcResponse rpcResponse err = json.Unmarshal(jsonResponse, &rpcResponse) if err != nil { diff --git a/rpc/connection.go b/rpc/connection.go new file mode 100644 index 000000000..df9f9aec0 --- /dev/null +++ b/rpc/connection.go @@ -0,0 +1,144 @@ +package rpc + +import ( + "fmt" + "sync" + "time" + + "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" + + "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/jmhodges/clock" + "github.com/letsencrypt/boulder/cmd" + "github.com/letsencrypt/boulder/core" + blog "github.com/letsencrypt/boulder/log" +) + +func newAMQPConnector( + queueName string, + retryTimeoutBase time.Duration, + retryTimeoutMax time.Duration, +) *amqpConnector { + return &amqpConnector{ + queueName: queueName, + chMaker: defaultChannelMaker{}, + clk: clock.Default(), + retryTimeoutBase: retryTimeoutBase, + retryTimeoutMax: retryTimeoutMax, + } +} + +// channelMaker encapsulates how to create an AMQP channel. +type channelMaker interface { + makeChannel(conf cmd.Config) (amqpChannel, error) +} + +type defaultChannelMaker struct{} + +func (d defaultChannelMaker) makeChannel(conf cmd.Config) (amqpChannel, error) { + return AmqpChannel(conf) +} + +// amqpConnector encapsulates an AMQP channel and a subscription to a specific +// queue, plus appropriate locking for its members. It provides reconnect logic, +// and allows publishing via the channel onto an arbitrary queue. +type amqpConnector struct { + mu sync.RWMutex + chMaker channelMaker + channel amqpChannel + queueName string + closeChan chan *amqp.Error + msgs <-chan amqp.Delivery + retryTimeoutBase time.Duration + retryTimeoutMax time.Duration + clk clock.Clock +} + +func (ac *amqpConnector) messages() <-chan amqp.Delivery { + ac.mu.RLock() + defer ac.mu.RUnlock() + return ac.msgs +} + +func (ac *amqpConnector) closeChannel() chan *amqp.Error { + ac.mu.RLock() + defer ac.mu.RUnlock() + return ac.closeChan +} + +// connect attempts to connect to a channel and subscribe to the named queue, +// returning error if it fails. This is used at first startup, where we want to +// fail fast if we can't connect. +func (ac *amqpConnector) connect(config cmd.Config) error { + channel, err := ac.chMaker.makeChannel(config) + if err != nil { + return fmt.Errorf("channel connect failed for %s: %s", ac.queueName, err) + } + msgs, err := amqpSubscribe(channel, ac.queueName) + if err != nil { + return fmt.Errorf("queue subscribe failed for %s: %s", ac.queueName, err) + } + closeChan := channel.NotifyClose(make(chan *amqp.Error, 1)) + ac.mu.Lock() + defer ac.mu.Unlock() + ac.channel = channel + ac.msgs = msgs + ac.closeChan = closeChan + return nil +} + +// reconnect attempts repeatedly to connect and subscribe to the named queue. It +// will loop forever until it succeeds. This is used for a running server, where +// we don't want to shut down because we lost our AMQP connection. +func (ac *amqpConnector) reconnect(config cmd.Config, log blog.SyslogWriter) { + for i := 0; ; i++ { + ac.clk.Sleep(core.RetryBackoff(i, ac.retryTimeoutBase, ac.retryTimeoutMax, 2)) + log.Info(fmt.Sprintf(" [!] attempting reconnect for %s", ac.queueName)) + err := ac.connect(config) + if err != nil { + log.Warning(fmt.Sprintf(" [!] %s", err)) + continue + } + break + } + log.Info(fmt.Sprintf(" [!] reconnect success for %s", ac.queueName)) + return +} + +// cancel cancels the AMQP channel. Used for graceful shutdowns. +func (ac *amqpConnector) cancel() { + ac.mu.RLock() + channel := ac.channel + ac.mu.RUnlock() + channel.Cancel(consumerName, false) +} + +// publish publishes a message onto the provided queue. We provide this wrapper +// because it requires locking around the read of ac.channel. +func (ac *amqpConnector) publish(queueName, corrId, expiration, replyTo, msgType string, body []byte) error { + ac.mu.RLock() + channel := ac.channel + ac.mu.RUnlock() + return channel.Publish( + AmqpExchange, + queueName, + AmqpMandatory, + AmqpImmediate, + amqp.Publishing{ + Body: body, + CorrelationId: corrId, + Expiration: expiration, + ReplyTo: replyTo, + Type: msgType, + }) +} + +// amqpChannel defines the subset of amqp.Channel methods that we use in this +// package. +type amqpChannel interface { + Cancel(consumer string, noWait bool) error + Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) + NotifyClose(c chan *amqp.Error) chan *amqp.Error + Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error + QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error + QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) +} diff --git a/rpc/connection_test.go b/rpc/connection_test.go new file mode 100644 index 000000000..89c5926d1 --- /dev/null +++ b/rpc/connection_test.go @@ -0,0 +1,130 @@ +package rpc + +import ( + "errors" + "testing" + "time" + + "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/golang/mock/gomock" + "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/jmhodges/clock" + "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" + "github.com/letsencrypt/boulder/cmd" + "github.com/letsencrypt/boulder/mocks" +) + +// mockChannelMaker always returns the given amqpChannel +type mockChannelMaker struct { + channel amqpChannel +} + +func (m mockChannelMaker) makeChannel(conf cmd.Config) (amqpChannel, error) { + return m.channel, nil +} + +func setup(t *testing.T) (*amqpConnector, *MockamqpChannel, func()) { + mockCtrl := gomock.NewController(t) + + mockChannel := NewMockamqpChannel(mockCtrl) + ac := amqpConnector{ + chMaker: mockChannelMaker{ + channel: mockChannel, + }, + queueName: "fooqueue", + retryTimeoutBase: time.Second, + } + return &ac, mockChannel, func() { mockCtrl.Finish() } +} + +func TestConnect(t *testing.T) { + ac, mockChannel, finish := setup(t) + defer finish() + mockChannel.EXPECT().QueueDeclare( + "fooqueue", AmqpDurable, AmqpDeleteUnused, AmqpExclusive, AmqpNoWait, nil) + mockChannel.EXPECT().QueueBind("fooqueue", "fooqueue", AmqpExchange, false, nil) + mockChannel.EXPECT().Consume("fooqueue", consumerName, AmqpAutoAck, AmqpExclusive, AmqpNoLocal, AmqpNoWait, nil).Return(make(<-chan amqp.Delivery), nil) + mockChannel.EXPECT().NotifyClose(gomock.Any()).Return(make(chan *amqp.Error)) + err := ac.connect(cmd.Config{}) + if err != nil { + t.Fatalf("failed to connect: %s", err) + } + if ac.channel != mockChannel { + t.Errorf("ac.channel was not equal to mockChannel") + } + if ac.messages() == nil { + t.Errorf("ac.msgs was not initialized") + } + if ac.closeChannel() == nil { + t.Errorf("ac.closeChan was not initialized") + } +} + +func TestConnectFail(t *testing.T) { + ac, mockChannel, finish := setup(t) + defer finish() + mockChannel.EXPECT().QueueDeclare( + "fooqueue", AmqpDurable, AmqpDeleteUnused, AmqpExclusive, AmqpNoWait, nil).Return(amqp.Queue{}, errors.New("fail")) + err := ac.connect(cmd.Config{}) + if err == nil { + t.Fatalf("connect should have errored but did not") + } +} + +func TestReconnect(t *testing.T) { + ac, mockChannel, finish := setup(t) + defer finish() + + // Override the clock so the sleep calls are instantaneous, regardless of what + // the retry calls say. + ac.clk = clock.NewFake() + ac.retryTimeoutBase = time.Second + ac.retryTimeoutMax = time.Second + + mockChannel.EXPECT().QueueDeclare( + "fooqueue", AmqpDurable, AmqpDeleteUnused, AmqpExclusive, AmqpNoWait, nil).AnyTimes() + mockChannel.EXPECT().QueueBind("fooqueue", "fooqueue", AmqpExchange, false, nil).Times(3).Return(errors.New("fail")) + mockChannel.EXPECT().QueueBind("fooqueue", "fooqueue", AmqpExchange, false, nil).Return(nil) + mockChannel.EXPECT().Consume("fooqueue", consumerName, AmqpAutoAck, AmqpExclusive, AmqpNoLocal, AmqpNoWait, nil).Return(make(<-chan amqp.Delivery), nil) + mockChannel.EXPECT().NotifyClose(gomock.Any()).Return(make(chan *amqp.Error)) + + log = mocks.UseMockLog() + + ac.reconnect(cmd.Config{}, log) + if ac.channel != mockChannel { + t.Errorf("ac.channel was not equal to mockChannel") + } + if ac.msgs == nil { + t.Errorf("ac.msgs was not initialized") + } + if ac.closeChan == nil { + t.Errorf("ac.closeChan was not initialized") + } +} + +func TestCancel(t *testing.T) { + ac, mockChannel, finish := setup(t) + defer finish() + // Since we're skipping the connect step, fake it by assigning directly to + // channel. + ac.channel = mockChannel + mockChannel.EXPECT().Cancel(consumerName, false) + ac.cancel() +} + +func TestPublish(t *testing.T) { + ac, mockChannel, finish := setup(t) + defer finish() + ac.channel = mockChannel + mockChannel.EXPECT().Publish( + AmqpExchange, + "fooqueue", + AmqpMandatory, + AmqpImmediate, + amqp.Publishing{ + Body: []byte("body"), + CorrelationId: "03c52e", + Expiration: "3000", + ReplyTo: "replyTo", + Type: "testMsg", + }) + ac.publish("fooqueue", "03c52e", "3000", "replyTo", "testMsg", []byte("body")) +} diff --git a/rpc/mock_channel_test.go b/rpc/mock_channel_test.go new file mode 100644 index 000000000..8852102da --- /dev/null +++ b/rpc/mock_channel_test.go @@ -0,0 +1,92 @@ +// Automatically generated by MockGen. DO NOT EDIT! +// Source: rpc/amqp-rpc-connection.go + +package rpc + +import ( + gomock "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/golang/mock/gomock" + amqp "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" +) + +// Mock of amqpChannel interface +type MockamqpChannel struct { + ctrl *gomock.Controller + recorder *_MockamqpChannelRecorder +} + +// Recorder for MockamqpChannel (not exported) +type _MockamqpChannelRecorder struct { + mock *MockamqpChannel +} + +func NewMockamqpChannel(ctrl *gomock.Controller) *MockamqpChannel { + mock := &MockamqpChannel{ctrl: ctrl} + mock.recorder = &_MockamqpChannelRecorder{mock} + return mock +} + +func (_m *MockamqpChannel) EXPECT() *_MockamqpChannelRecorder { + return _m.recorder +} + +func (_m *MockamqpChannel) Cancel(consumer string, noWait bool) error { + ret := _m.ctrl.Call(_m, "Cancel", consumer, noWait) + ret0, _ := ret[0].(error) + return ret0 +} + +func (_mr *_MockamqpChannelRecorder) Cancel(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Cancel", arg0, arg1) +} + +func (_m *MockamqpChannel) Consume(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) { + ret := _m.ctrl.Call(_m, "Consume", queue, consumer, autoAck, exclusive, noLocal, noWait, args) + ret0, _ := ret[0].(<-chan amqp.Delivery) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockamqpChannelRecorder) Consume(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Consume", arg0, arg1, arg2, arg3, arg4, arg5, arg6) +} + +func (_m *MockamqpChannel) NotifyClose(c chan *amqp.Error) chan *amqp.Error { + ret := _m.ctrl.Call(_m, "NotifyClose", c) + ret0, _ := ret[0].(chan *amqp.Error) + return ret0 +} + +func (_mr *_MockamqpChannelRecorder) NotifyClose(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "NotifyClose", arg0) +} + +func (_m *MockamqpChannel) Publish(exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error { + ret := _m.ctrl.Call(_m, "Publish", exchange, key, mandatory, immediate, msg) + ret0, _ := ret[0].(error) + return ret0 +} + +func (_mr *_MockamqpChannelRecorder) Publish(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Publish", arg0, arg1, arg2, arg3, arg4) +} + +func (_m *MockamqpChannel) QueueBind(name string, key string, exchange string, noWait bool, args amqp.Table) error { + ret := _m.ctrl.Call(_m, "QueueBind", name, key, exchange, noWait, args) + ret0, _ := ret[0].(error) + return ret0 +} + +func (_mr *_MockamqpChannelRecorder) QueueBind(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "QueueBind", arg0, arg1, arg2, arg3, arg4) +} + +func (_m *MockamqpChannel) QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table) (amqp.Queue, error) { + ret := _m.ctrl.Call(_m, "QueueDeclare", name, durable, autoDelete, exclusive, noWait, args) + ret0, _ := ret[0].(amqp.Queue) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockamqpChannelRecorder) QueueDeclare(arg0, arg1, arg2, arg3, arg4, arg5 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "QueueDeclare", arg0, arg1, arg2, arg3, arg4, arg5) +} diff --git a/rpc/rpc-interfaces.go b/rpc/rpc-interfaces.go index d7a77d3e2..fbb38589f 100644 --- a/rpc/rpc-interfaces.go +++ b/rpc/rpc-interfaces.go @@ -12,7 +12,6 @@ import ( // Client describes the functions an RPC Client performs type Client interface { SetTimeout(time.Duration) - Dispatch(string, []byte) chan []byte DispatchSync(string, []byte) ([]byte, error) } diff --git a/rpc/testdata/cert.pem b/rpc/testdata/cert.pem new file mode 100644 index 000000000..322872ece --- /dev/null +++ b/rpc/testdata/cert.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC+zCCAeOgAwIBAgIJAJ5ig3+Bc76gMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV +BAMMCWFtcXAgY2VydDAeFw0xNTEwMjcwMTQwMTdaFw0yMDEwMjUwMTQwMTdaMBQx +EjAQBgNVBAMMCWFtcXAgY2VydDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC +ggEBANsa6RjxW2s1jvtvmYG3BOokEQ9wM5W+56mR5NQ0tbdSgoDde1aEArlWcw9x +0JZkuP3v3rDavygdlt1BoychfaFHDXpyYa7ROKn4iHyn89wysyEW0gMgR3JsC2r5 +5exH90G6gAi8DjGidR3I03IsyeiiyLXGgia8kJ/PB/wv46ll4Cx2PcUNWtMSoum+ +W7ueya4TA9c595pZtoS3JMbQrg83xlPvRL5DMg8/WAcDOMaDgNESoo1bOpFzsh/o +buz8VMfb5Danjf2sWxwLheC9MBRpJUWE9mcik2CdhnCy1guPsvcSgrb6D9+gJW1Q +r2AoCUisB3fggDj497kxxndT1jkCAwEAAaNQME4wHQYDVR0OBBYEFN8xiwv5cXm3 +Gid3eDOwSCBs7vR9MB8GA1UdIwQYMBaAFN8xiwv5cXm3Gid3eDOwSCBs7vR9MAwG +A1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAC9OwoViHFVbcrY/EqVsy1D1 +PyFeCD0xADmn8jRtJNm9zTnNnIQxa+TaLcliGqtVMUld0inAbj7/ZjAhjzKgbaC3 +rRZvcSaHlKwLemtswjXYlUdKDd6BITKOEHKw2VTeZxBqeSapYADYSQUr46PzZu44 +nQmqVEaB2mLZIRDiSe2LVQh+WgZ9YKP/k2Wy4G8NJFzXqEycf2vKmWIU6X8hyTvO +SVTd1I/MKtGq+JALwaJTa2UGX80wMKIEEBLBM9cJht1CnceedWROy3ZPcrv9+D49 +B/ZfduX+tVg7BI7sw+m8lzBexrFqYneJLNRwXUaTER66dmfjwb2Y2Qg7MA7pCSo= +-----END CERTIFICATE----- diff --git a/rpc/testdata/key.pem b/rpc/testdata/key.pem new file mode 100644 index 000000000..18d7c5c65 --- /dev/null +++ b/rpc/testdata/key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDbGukY8VtrNY77 +b5mBtwTqJBEPcDOVvuepkeTUNLW3UoKA3XtWhAK5VnMPcdCWZLj9796w2r8oHZbd +QaMnIX2hRw16cmGu0Tip+Ih8p/PcMrMhFtIDIEdybAtq+eXsR/dBuoAIvA4xonUd +yNNyLMnoosi1xoImvJCfzwf8L+OpZeAsdj3FDVrTEqLpvlu7nsmuEwPXOfeaWbaE +tyTG0K4PN8ZT70S+QzIPP1gHAzjGg4DREqKNWzqRc7If6G7s/FTH2+Q2p439rFsc +C4XgvTAUaSVFhPZnIpNgnYZwstYLj7L3EoK2+g/foCVtUK9gKAlIrAd34IA4+Pe5 +McZ3U9Y5AgMBAAECggEAMwqC0KexFzmD1Hj79qbPA0hhnQoTSkgNqYfuGa0pw8n5 +0tFFoTLhe5Fy/l8SK2bNXYKaivJ6Y3r5IRPgUQZRQNfPgP3SWaotnBLkoL1zZpF8 +/zFUvRuw6SvYQRR0BmYBaJNsrhw45kElLUoST9+1LXd2cDoNKb71pfTRtGLnkxdV +F84i87+nC4vLU0ZKkWOzMZMIYqtemrC73BC0+mvXte+P6zbWeSiC7tpyxGyj5pMS +WQ4TI0xDfZQRVtIVBXKplgvKb2fZ0DIHv7RJBWPSaajmzLAV3pxnmik+IfMLrxjk +/x0eXf/F/E6Y2Ejq05NudFxbPnGCfEl9Cg4FvEr0LQKBgQDugTds77FAMr/IYCJO +iweo6dC/47EYinrkrdj2kgxWuevjNo5jfpuksj0qA+w1EKImDVIh6dyXWKVwIVAg +58b09RD2C6Mvt+762n6JKbn8LBxczk8fxcWYv7rMRwbMcj5E7luAhBKbF+M4rTWt +5Ko0nI6hgqFn9JPsS3clZ+hK2wKBgQDrLWVv3WpGzp+R/0dGMLQGzgw0mL0u+msP +Q7v4/51i7HlQjDhj0pgsms9HIgmCudaKVXosZ+CEXv6I6LjfCiB7Brv3DUhCTWN/ +2g6PQfCLBuE9rqXbez6XaxiMVQPKphGop88/USKDlc8uzWPPuQ9cg9L0QZifEfCM +7+H4iNxNewKBgAp3aTNCoYirsXbdoSPJPiDPgfWpzE/DY/k9F9RaPGhh6FQkRMNg +/vuPRtfdLDR38mWxF/WdCa0qmrf8/kMzaKu/RWtGv3aMn9QqWnsydZL3bJc2Bori +ZvV5FH16cHXwXYMw4psVMKvVtIb8Muraqg19AVVdIjApr4QjG6tsj+kZAoGBAJZc +PueHq8Q7pQAmM76nsuJK6LjUEtivWLW4u7zWSR0PTfz2ubLw2URjcjTriSMgiA+H +2QX9ICnhxmFoUZKgmeWuh4zL3DAv5HbAxuBG63En4+iY+gfaw0jyOw616Cevh8jK +CZJU4Hk61ez5emA71Jt02PI74kWJpb+mO1a4wglzAoGAHP8Z7QnyISl/q8zfmKhG +jLzE7njHZfvRV/Q2W1x9f2Pu+VfXNAQLoqEH/q8HTw8tm3p84VJPSl1pfyTNrQdw +W3h+siTD9PyNTXZWEtuOEC23BfAqzU6PMbj4rVIb9goYYYwEpkAC7GrrvwFoZcTl +9f6SVG7zUFVrbyaYc6jEbPU= +-----END PRIVATE KEY----- diff --git a/test/amqp-integration-test.py b/test/amqp-integration-test.py index 462bb76ad..edef883dc 100644 --- a/test/amqp-integration-test.py +++ b/test/amqp-integration-test.py @@ -228,6 +228,8 @@ tempdir = tempfile.mkdtemp() if not startservers.start(race_detection=True): die(ExitStatus.Error) run_node_test() +# Simulate a disconnection from RabbitMQ to make sure reconnects work. +startservers.bounce_forward() run_client_tests() if not startservers.check(): die(ExitStatus.Error) diff --git a/test/boulder-config.json b/test/boulder-config.json index 7ae5d3674..f52a34713 100644 --- a/test/boulder-config.json +++ b/test/boulder-config.json @@ -6,7 +6,7 @@ }, "amqp": { - "server": "amqp://guest:guest@localhost:5672", + "server": "amqp://guest:guest@localhost:5673", "insecure": true, "-uncomment_for_AMQPS-tls": { "cacertfile": "/etc/boulder/rabbitmq-cacert.pem", diff --git a/test/startservers.py b/test/startservers.py index c12aeb3d3..1cf22cfbc 100644 --- a/test/startservers.py +++ b/test/startservers.py @@ -58,6 +58,7 @@ def start(race_detection): up explicitly by calling stop(), or automatically atexit. """ global processes + forward() t = ToSServerThread() t.daemon = True t.start() @@ -119,6 +120,19 @@ def start(race_detection): print "All servers running. Hit ^C to kill." return True +def forward(): + """Add a TCP forwarder between Boulder and RabbitMQ to simulate failures.""" + cmd = """exec listenbuddy -listen :5673 -speak localhost:5672""" + p = subprocess.Popen(cmd, shell=True) + p.cmd = cmd + print('started %s with pid %d' % (p.cmd, p.pid)) + global processes + processes.insert(0, p) + +def bounce_forward(): + """Kill all forwarded TCP connections.""" + global processes + processes[0].send_signal(signal.SIGUSR1) def check(): """Return true if all started processes are still alive. diff --git a/test/travis-before-install.sh b/test/travis-before-install.sh index d91befc95..648cf6506 100755 --- a/test/travis-before-install.sh +++ b/test/travis-before-install.sh @@ -19,7 +19,8 @@ travis_retry go get \ github.com/golang/lint/golint \ github.com/mattn/goveralls \ github.com/modocache/gover \ - github.com/jcjones/github-pr-status & + github.com/jcjones/github-pr-status \ + github.com/jsha/listenbuddy & (wget https://github.com/jsha/boulder-tools/raw/master/goose.gz && mkdir $GOPATH/bin &&