Merge pull request #1033 from letsencrypt/amqp-reconnect
Add reconnect loop to AMQP RPC
This commit is contained in:
commit
8a1e97a4ae
|
@ -96,6 +96,10 @@
|
|||
"Comment": "v1.2-88-ga197e5d",
|
||||
"Rev": "a197e5d40516f2e9f74dcee085a5f2d4604e94df"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/golang/mock/gomock",
|
||||
"Rev": "06883d979f10cc178f2716846215c8cf90f9e363"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/jmhodges/clock",
|
||||
"Rev": "3c4ebd218625c9364c33db6d39c276d80c3090c6"
|
||||
|
|
|
@ -0,0 +1,247 @@
|
|||
// Copyright 2010 Google Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package gomock
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Call represents an expected call to a mock.
|
||||
type Call struct {
|
||||
t TestReporter // for triggering test failures on invalid call setup
|
||||
|
||||
receiver interface{} // the receiver of the method call
|
||||
method string // the name of the method
|
||||
args []Matcher // the args
|
||||
rets []interface{} // the return values (if any)
|
||||
|
||||
preReqs []*Call // prerequisite calls
|
||||
|
||||
// Expectations
|
||||
minCalls, maxCalls int
|
||||
|
||||
numCalls int // actual number made
|
||||
|
||||
// Actions
|
||||
doFunc reflect.Value
|
||||
setArgs map[int]reflect.Value
|
||||
}
|
||||
|
||||
func (c *Call) AnyTimes() *Call {
|
||||
c.minCalls, c.maxCalls = 0, 1e8 // close enough to infinity
|
||||
return c
|
||||
}
|
||||
|
||||
// Do declares the action to run when the call is matched.
|
||||
// It takes an interface{} argument to support n-arity functions.
|
||||
func (c *Call) Do(f interface{}) *Call {
|
||||
// TODO: Check arity and types here, rather than dying badly elsewhere.
|
||||
c.doFunc = reflect.ValueOf(f)
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Call) Return(rets ...interface{}) *Call {
|
||||
mt := c.methodType()
|
||||
if len(rets) != mt.NumOut() {
|
||||
c.t.Fatalf("wrong number of arguments to Return for %T.%v: got %d, want %d",
|
||||
c.receiver, c.method, len(rets), mt.NumOut())
|
||||
}
|
||||
for i, ret := range rets {
|
||||
if got, want := reflect.TypeOf(ret), mt.Out(i); got == want {
|
||||
// Identical types; nothing to do.
|
||||
} else if got == nil {
|
||||
// Nil needs special handling.
|
||||
switch want.Kind() {
|
||||
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice:
|
||||
// ok
|
||||
default:
|
||||
c.t.Fatalf("argument %d to Return for %T.%v is nil, but %v is not nillable",
|
||||
i, c.receiver, c.method, want)
|
||||
}
|
||||
} else if got.AssignableTo(want) {
|
||||
// Assignable type relation. Make the assignment now so that the generated code
|
||||
// can return the values with a type assertion.
|
||||
v := reflect.New(want).Elem()
|
||||
v.Set(reflect.ValueOf(ret))
|
||||
rets[i] = v.Interface()
|
||||
} else {
|
||||
c.t.Fatalf("wrong type of argument %d to Return for %T.%v: %v is not assignable to %v",
|
||||
i, c.receiver, c.method, got, want)
|
||||
}
|
||||
}
|
||||
|
||||
c.rets = rets
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Call) Times(n int) *Call {
|
||||
c.minCalls, c.maxCalls = n, n
|
||||
return c
|
||||
}
|
||||
|
||||
// SetArg declares an action that will set the nth argument's value,
|
||||
// indirected through a pointer.
|
||||
func (c *Call) SetArg(n int, value interface{}) *Call {
|
||||
if c.setArgs == nil {
|
||||
c.setArgs = make(map[int]reflect.Value)
|
||||
}
|
||||
mt := c.methodType()
|
||||
// TODO: This will break on variadic methods.
|
||||
// We will need to check those at invocation time.
|
||||
if n < 0 || n >= mt.NumIn() {
|
||||
c.t.Fatalf("SetArg(%d, ...) called for a method with %d args", n, mt.NumIn())
|
||||
}
|
||||
// Permit setting argument through an interface.
|
||||
// In the interface case, we don't (nay, can't) check the type here.
|
||||
at := mt.In(n)
|
||||
switch at.Kind() {
|
||||
case reflect.Ptr:
|
||||
dt := at.Elem()
|
||||
if vt := reflect.TypeOf(value); !vt.AssignableTo(dt) {
|
||||
c.t.Fatalf("SetArg(%d, ...) argument is a %v, not assignable to %v", n, vt, dt)
|
||||
}
|
||||
case reflect.Interface:
|
||||
// nothing to do
|
||||
default:
|
||||
c.t.Fatalf("SetArg(%d, ...) referring to argument of non-pointer non-interface type %v", n, at)
|
||||
}
|
||||
c.setArgs[n] = reflect.ValueOf(value)
|
||||
return c
|
||||
}
|
||||
|
||||
// isPreReq returns true if other is a direct or indirect prerequisite to c.
|
||||
func (c *Call) isPreReq(other *Call) bool {
|
||||
for _, preReq := range c.preReqs {
|
||||
if other == preReq || preReq.isPreReq(other) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// After declares that the call may only match after preReq has been exhausted.
|
||||
func (c *Call) After(preReq *Call) *Call {
|
||||
if preReq.isPreReq(c) {
|
||||
msg := fmt.Sprintf(
|
||||
"Loop in call order: %v is a prerequisite to %v (possibly indirectly).",
|
||||
c, preReq,
|
||||
)
|
||||
panic(msg)
|
||||
}
|
||||
|
||||
c.preReqs = append(c.preReqs, preReq)
|
||||
return c
|
||||
}
|
||||
|
||||
// Returns true iff the minimum number of calls have been made.
|
||||
func (c *Call) satisfied() bool {
|
||||
return c.numCalls >= c.minCalls
|
||||
}
|
||||
|
||||
// Returns true iff the maximum number of calls have been made.
|
||||
func (c *Call) exhausted() bool {
|
||||
return c.numCalls >= c.maxCalls
|
||||
}
|
||||
|
||||
func (c *Call) String() string {
|
||||
args := make([]string, len(c.args))
|
||||
for i, arg := range c.args {
|
||||
args[i] = arg.String()
|
||||
}
|
||||
arguments := strings.Join(args, ", ")
|
||||
return fmt.Sprintf("%T.%v(%s)", c.receiver, c.method, arguments)
|
||||
}
|
||||
|
||||
// Tests if the given call matches the expected call.
|
||||
func (c *Call) matches(args []interface{}) bool {
|
||||
if len(args) != len(c.args) {
|
||||
return false
|
||||
}
|
||||
for i, m := range c.args {
|
||||
if !m.Matches(args[i]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Check that all prerequisite calls have been satisfied.
|
||||
for _, preReqCall := range c.preReqs {
|
||||
if !preReqCall.satisfied() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// dropPrereqs tells the expected Call to not re-check prerequite calls any
|
||||
// longer, and to return its current set.
|
||||
func (c *Call) dropPrereqs() (preReqs []*Call) {
|
||||
preReqs = c.preReqs
|
||||
c.preReqs = nil
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Call) call(args []interface{}) (rets []interface{}, action func()) {
|
||||
c.numCalls++
|
||||
|
||||
// Actions
|
||||
if c.doFunc.IsValid() {
|
||||
doArgs := make([]reflect.Value, len(args))
|
||||
ft := c.doFunc.Type()
|
||||
for i := 0; i < ft.NumIn(); i++ {
|
||||
if args[i] != nil {
|
||||
doArgs[i] = reflect.ValueOf(args[i])
|
||||
} else {
|
||||
// Use the zero value for the arg.
|
||||
doArgs[i] = reflect.Zero(ft.In(i))
|
||||
}
|
||||
}
|
||||
action = func() { c.doFunc.Call(doArgs) }
|
||||
}
|
||||
for n, v := range c.setArgs {
|
||||
reflect.ValueOf(args[n]).Elem().Set(v)
|
||||
}
|
||||
|
||||
rets = c.rets
|
||||
if rets == nil {
|
||||
// Synthesize the zero value for each of the return args' types.
|
||||
mt := c.methodType()
|
||||
rets = make([]interface{}, mt.NumOut())
|
||||
for i := 0; i < mt.NumOut(); i++ {
|
||||
rets[i] = reflect.Zero(mt.Out(i)).Interface()
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Call) methodType() reflect.Type {
|
||||
recv := reflect.ValueOf(c.receiver)
|
||||
for i := 0; i < recv.Type().NumMethod(); i++ {
|
||||
if recv.Type().Method(i).Name == c.method {
|
||||
return recv.Method(i).Type()
|
||||
}
|
||||
}
|
||||
panic(fmt.Sprintf("gomock: failed finding method %s on %T", c.method, c.receiver))
|
||||
}
|
||||
|
||||
// InOrder declares that the given calls should occur in order.
|
||||
func InOrder(calls ...*Call) {
|
||||
for i := 1; i < len(calls); i++ {
|
||||
calls[i].After(calls[i-1])
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
// Copyright 2011 Google Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package gomock
|
||||
|
||||
// callSet represents a set of expected calls, indexed by receiver and method
|
||||
// name.
|
||||
type callSet map[interface{}]map[string][]*Call
|
||||
|
||||
// Add adds a new expected call.
|
||||
func (cs callSet) Add(call *Call) {
|
||||
methodMap, ok := cs[call.receiver]
|
||||
if !ok {
|
||||
methodMap = make(map[string][]*Call)
|
||||
cs[call.receiver] = methodMap
|
||||
}
|
||||
methodMap[call.method] = append(methodMap[call.method], call)
|
||||
}
|
||||
|
||||
// Remove removes an expected call.
|
||||
func (cs callSet) Remove(call *Call) {
|
||||
methodMap, ok := cs[call.receiver]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
sl := methodMap[call.method]
|
||||
for i, c := range sl {
|
||||
if c == call {
|
||||
// quick removal; we don't need to maintain call order
|
||||
if len(sl) > 1 {
|
||||
sl[i] = sl[len(sl)-1]
|
||||
}
|
||||
methodMap[call.method] = sl[:len(sl)-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FindMatch searches for a matching call. Returns nil if no call matched.
|
||||
func (cs callSet) FindMatch(receiver interface{}, method string, args []interface{}) *Call {
|
||||
methodMap, ok := cs[receiver]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
calls, ok := methodMap[method]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Search through the unordered set of calls expected on a method on a
|
||||
// receiver.
|
||||
for _, call := range calls {
|
||||
// A call should not normally still be here if exhausted,
|
||||
// but it can happen if, for instance, .Times(0) was used.
|
||||
// Pretend the call doesn't match.
|
||||
if call.exhausted() {
|
||||
continue
|
||||
}
|
||||
if call.matches(args) {
|
||||
return call
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
167
Godeps/_workspace/src/github.com/golang/mock/gomock/controller.go
generated
vendored
Normal file
167
Godeps/_workspace/src/github.com/golang/mock/gomock/controller.go
generated
vendored
Normal file
|
@ -0,0 +1,167 @@
|
|||
// Copyright 2010 Google Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// GoMock - a mock framework for Go.
|
||||
//
|
||||
// Standard usage:
|
||||
// (1) Define an interface that you wish to mock.
|
||||
// type MyInterface interface {
|
||||
// SomeMethod(x int64, y string)
|
||||
// }
|
||||
// (2) Use mockgen to generate a mock from the interface.
|
||||
// (3) Use the mock in a test:
|
||||
// func TestMyThing(t *testing.T) {
|
||||
// mockCtrl := gomock.NewController(t)
|
||||
// defer mockCtrl.Finish()
|
||||
//
|
||||
// mockObj := something.NewMockMyInterface(mockCtrl)
|
||||
// mockObj.EXPECT().SomeMethod(4, "blah")
|
||||
// // pass mockObj to a real object and play with it.
|
||||
// }
|
||||
//
|
||||
// By default, expected calls are not enforced to run in any particular order.
|
||||
// Call order dependency can be enforced by use of InOrder and/or Call.After.
|
||||
// Call.After can create more varied call order dependencies, but InOrder is
|
||||
// often more convenient.
|
||||
//
|
||||
// The following examples create equivalent call order dependencies.
|
||||
//
|
||||
// Example of using Call.After to chain expected call order:
|
||||
//
|
||||
// firstCall := mockObj.EXPECT().SomeMethod(1, "first")
|
||||
// secondCall := mockObj.EXPECT().SomeMethod(2, "second").After(firstCall)
|
||||
// mockObj.EXPECT().SomeMethod(3, "third").After(secondCall)
|
||||
//
|
||||
// Example of using InOrder to declare expected call order:
|
||||
//
|
||||
// gomock.InOrder(
|
||||
// mockObj.EXPECT().SomeMethod(1, "first"),
|
||||
// mockObj.EXPECT().SomeMethod(2, "second"),
|
||||
// mockObj.EXPECT().SomeMethod(3, "third"),
|
||||
// )
|
||||
//
|
||||
// TODO:
|
||||
// - Handle different argument/return types (e.g. ..., chan, map, interface).
|
||||
package gomock
|
||||
|
||||
import "sync"
|
||||
|
||||
// A TestReporter is something that can be used to report test failures.
|
||||
// It is satisfied by the standard library's *testing.T.
|
||||
type TestReporter interface {
|
||||
Errorf(format string, args ...interface{})
|
||||
Fatalf(format string, args ...interface{})
|
||||
}
|
||||
|
||||
// A Controller represents the top-level control of a mock ecosystem.
|
||||
// It defines the scope and lifetime of mock objects, as well as their expectations.
|
||||
// It is safe to call Controller's methods from multiple goroutines.
|
||||
type Controller struct {
|
||||
mu sync.Mutex
|
||||
t TestReporter
|
||||
expectedCalls callSet
|
||||
}
|
||||
|
||||
func NewController(t TestReporter) *Controller {
|
||||
return &Controller{
|
||||
t: t,
|
||||
expectedCalls: make(callSet),
|
||||
}
|
||||
}
|
||||
|
||||
func (ctrl *Controller) RecordCall(receiver interface{}, method string, args ...interface{}) *Call {
|
||||
// TODO: check arity, types.
|
||||
margs := make([]Matcher, len(args))
|
||||
for i, arg := range args {
|
||||
if m, ok := arg.(Matcher); ok {
|
||||
margs[i] = m
|
||||
} else if arg == nil {
|
||||
// Handle nil specially so that passing a nil interface value
|
||||
// will match the typed nils of concrete args.
|
||||
margs[i] = Nil()
|
||||
} else {
|
||||
margs[i] = Eq(arg)
|
||||
}
|
||||
}
|
||||
|
||||
ctrl.mu.Lock()
|
||||
defer ctrl.mu.Unlock()
|
||||
|
||||
call := &Call{t: ctrl.t, receiver: receiver, method: method, args: margs, minCalls: 1, maxCalls: 1}
|
||||
|
||||
ctrl.expectedCalls.Add(call)
|
||||
return call
|
||||
}
|
||||
|
||||
func (ctrl *Controller) Call(receiver interface{}, method string, args ...interface{}) []interface{} {
|
||||
ctrl.mu.Lock()
|
||||
defer ctrl.mu.Unlock()
|
||||
|
||||
expected := ctrl.expectedCalls.FindMatch(receiver, method, args)
|
||||
if expected == nil {
|
||||
ctrl.t.Fatalf("no matching expected call: %T.%v(%v)", receiver, method, args)
|
||||
}
|
||||
|
||||
// Two things happen here:
|
||||
// * the matching call no longer needs to check prerequite calls,
|
||||
// * and the prerequite calls are no longer expected, so remove them.
|
||||
preReqCalls := expected.dropPrereqs()
|
||||
for _, preReqCall := range preReqCalls {
|
||||
ctrl.expectedCalls.Remove(preReqCall)
|
||||
}
|
||||
|
||||
rets, action := expected.call(args)
|
||||
if expected.exhausted() {
|
||||
ctrl.expectedCalls.Remove(expected)
|
||||
}
|
||||
|
||||
// Don't hold the lock while doing the call's action (if any)
|
||||
// so that actions may execute concurrently.
|
||||
// We use the deferred Unlock to capture any panics that happen above;
|
||||
// here we add a deferred Lock to balance it.
|
||||
ctrl.mu.Unlock()
|
||||
defer ctrl.mu.Lock()
|
||||
if action != nil {
|
||||
action()
|
||||
}
|
||||
|
||||
return rets
|
||||
}
|
||||
|
||||
func (ctrl *Controller) Finish() {
|
||||
ctrl.mu.Lock()
|
||||
defer ctrl.mu.Unlock()
|
||||
|
||||
// If we're currently panicking, probably because this is a deferred call,
|
||||
// pass through the panic.
|
||||
if err := recover(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Check that all remaining expected calls are satisfied.
|
||||
failures := false
|
||||
for _, methodMap := range ctrl.expectedCalls {
|
||||
for _, calls := range methodMap {
|
||||
for _, call := range calls {
|
||||
if !call.satisfied() {
|
||||
ctrl.t.Errorf("missing call(s) to %v", call)
|
||||
failures = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if failures {
|
||||
ctrl.t.Fatalf("aborting test due to missing call(s)")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
// Copyright 2010 Google Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package gomock
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
// A Matcher is a representation of a class of values.
|
||||
// It is used to represent the valid or expected arguments to a mocked method.
|
||||
type Matcher interface {
|
||||
// Matches returns whether y is a match.
|
||||
Matches(x interface{}) bool
|
||||
|
||||
// String describes what the matcher matches.
|
||||
String() string
|
||||
}
|
||||
|
||||
type anyMatcher struct{}
|
||||
|
||||
func (anyMatcher) Matches(x interface{}) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (anyMatcher) String() string {
|
||||
return "is anything"
|
||||
}
|
||||
|
||||
type eqMatcher struct {
|
||||
x interface{}
|
||||
}
|
||||
|
||||
func (e eqMatcher) Matches(x interface{}) bool {
|
||||
return reflect.DeepEqual(e.x, x)
|
||||
}
|
||||
|
||||
func (e eqMatcher) String() string {
|
||||
return fmt.Sprintf("is equal to %v", e.x)
|
||||
}
|
||||
|
||||
type nilMatcher struct{}
|
||||
|
||||
func (nilMatcher) Matches(x interface{}) bool {
|
||||
if x == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
v := reflect.ValueOf(x)
|
||||
switch v.Kind() {
|
||||
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map,
|
||||
reflect.Ptr, reflect.Slice:
|
||||
return v.IsNil()
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (nilMatcher) String() string {
|
||||
return "is nil"
|
||||
}
|
||||
|
||||
type notMatcher struct {
|
||||
m Matcher
|
||||
}
|
||||
|
||||
func (n notMatcher) Matches(x interface{}) bool {
|
||||
return !n.m.Matches(x)
|
||||
}
|
||||
|
||||
func (n notMatcher) String() string {
|
||||
// TODO: Improve this if we add a NotString method to the Matcher interface.
|
||||
return "not(" + n.m.String() + ")"
|
||||
}
|
||||
|
||||
// Constructors
|
||||
func Any() Matcher { return anyMatcher{} }
|
||||
func Eq(x interface{}) Matcher { return eqMatcher{x} }
|
||||
func Nil() Matcher { return nilMatcher{} }
|
||||
func Not(x interface{}) Matcher {
|
||||
if m, ok := x.(Matcher); ok {
|
||||
return notMatcher{m}
|
||||
}
|
||||
return notMatcher{Eq(x)}
|
||||
}
|
49
Godeps/_workspace/src/github.com/golang/mock/gomock/mock_matcher/mock_matcher.go
generated
vendored
Normal file
49
Godeps/_workspace/src/github.com/golang/mock/gomock/mock_matcher/mock_matcher.go
generated
vendored
Normal file
|
@ -0,0 +1,49 @@
|
|||
// Automatically generated by MockGen. DO NOT EDIT!
|
||||
// Source: github.com/golang/mock/gomock (interfaces: Matcher)
|
||||
|
||||
package mock_gomock
|
||||
|
||||
import (
|
||||
gomock "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/golang/mock/gomock"
|
||||
)
|
||||
|
||||
// Mock of Matcher interface
|
||||
type MockMatcher struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *_MockMatcherRecorder
|
||||
}
|
||||
|
||||
// Recorder for MockMatcher (not exported)
|
||||
type _MockMatcherRecorder struct {
|
||||
mock *MockMatcher
|
||||
}
|
||||
|
||||
func NewMockMatcher(ctrl *gomock.Controller) *MockMatcher {
|
||||
mock := &MockMatcher{ctrl: ctrl}
|
||||
mock.recorder = &_MockMatcherRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
func (_m *MockMatcher) EXPECT() *_MockMatcherRecorder {
|
||||
return _m.recorder
|
||||
}
|
||||
|
||||
func (_m *MockMatcher) Matches(_param0 interface{}) bool {
|
||||
ret := _m.ctrl.Call(_m, "Matches", _param0)
|
||||
ret0, _ := ret[0].(bool)
|
||||
return ret0
|
||||
}
|
||||
|
||||
func (_mr *_MockMatcherRecorder) Matches(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "Matches", arg0)
|
||||
}
|
||||
|
||||
func (_m *MockMatcher) String() string {
|
||||
ret := _m.ctrl.Call(_m, "String")
|
||||
ret0, _ := ret[0].(string)
|
||||
return ret0
|
||||
}
|
||||
|
||||
func (_mr *_MockMatcherRecorder) String() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "String")
|
||||
}
|
|
@ -48,10 +48,7 @@ func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog.
|
|||
cmd.FailOnError(err, "Could not connect to Syslog")
|
||||
blog.SetAuditLogger(auditlogger)
|
||||
|
||||
ch, err := rpc.AmqpChannel(c)
|
||||
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||
|
||||
raRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->RA", c.AMQP.RA.Server, ch, stats)
|
||||
raRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->RA", c.AMQP.RA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
rac, err := rpc.NewRegistrationAuthorityClient(raRPC)
|
||||
|
@ -60,7 +57,7 @@ func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog.
|
|||
dbMap, err := sa.NewDbMap(c.Revoker.DBConnect)
|
||||
cmd.FailOnError(err, "Couldn't setup database connection")
|
||||
|
||||
saRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->SA", c.AMQP.SA.Server, ch, stats)
|
||||
saRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->SA", c.AMQP.SA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
sac, err := rpc.NewStorageAuthorityClient(saRPC)
|
||||
|
|
|
@ -45,24 +45,22 @@ func main() {
|
|||
|
||||
go cmd.ProfileCmd("CA", stats)
|
||||
|
||||
connectionHandler := func(srv *rpc.AmqpRPCServer) {
|
||||
saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, srv.Channel, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
sac, err := rpc.NewStorageAuthorityClient(saRPC)
|
||||
cmd.FailOnError(err, "Failed to create SA client")
|
||||
sac, err := rpc.NewStorageAuthorityClient(saRPC)
|
||||
cmd.FailOnError(err, "Failed to create SA client")
|
||||
|
||||
pubRPC, err := rpc.NewAmqpRPCClient("CA->Publisher", c.AMQP.Publisher.Server, srv.Channel, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
pubRPC, err := rpc.NewAmqpRPCClient("CA->Publisher", c.AMQP.Publisher.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
pubc, err := rpc.NewPublisherClient(pubRPC)
|
||||
cmd.FailOnError(err, "Failed to create Publisher client")
|
||||
pubc, err := rpc.NewPublisherClient(pubRPC)
|
||||
cmd.FailOnError(err, "Failed to create Publisher client")
|
||||
|
||||
cai.Publisher = &pubc
|
||||
cai.SA = &sac
|
||||
}
|
||||
cai.Publisher = &pubc
|
||||
cai.SA = &sac
|
||||
|
||||
cas, err := rpc.NewAmqpRPCServer(c.AMQP.CA.Server, connectionHandler, c.CA.MaxConcurrentRPCServerRequests)
|
||||
cas, err := rpc.NewAmqpRPCServer(c.AMQP.CA.Server, c.CA.MaxConcurrentRPCServerRequests, c)
|
||||
cmd.FailOnError(err, "Unable to create CA RPC server")
|
||||
rpc.NewCertificateAuthorityServer(cas, cai)
|
||||
|
||||
|
|
|
@ -36,17 +36,15 @@ func main() {
|
|||
go cmd.DebugServer(c.Publisher.DebugAddr)
|
||||
go cmd.ProfileCmd("Publisher", stats)
|
||||
|
||||
connectionHandler := func(srv *rpc.AmqpRPCServer) {
|
||||
saRPC, err := rpc.NewAmqpRPCClient("Publisher->SA", c.AMQP.SA.Server, srv.Channel, stats)
|
||||
cmd.FailOnError(err, "Unable to create SA RPC client")
|
||||
saRPC, err := rpc.NewAmqpRPCClient("Publisher->SA", c.AMQP.SA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create SA RPC client")
|
||||
|
||||
sac, err := rpc.NewStorageAuthorityClient(saRPC)
|
||||
cmd.FailOnError(err, "Unable to create SA client")
|
||||
sac, err := rpc.NewStorageAuthorityClient(saRPC)
|
||||
cmd.FailOnError(err, "Unable to create SA client")
|
||||
|
||||
pubi.SA = &sac
|
||||
}
|
||||
pubi.SA = &sac
|
||||
|
||||
pubs, err := rpc.NewAmqpRPCServer(c.AMQP.Publisher.Server, connectionHandler, c.Publisher.MaxConcurrentRPCServerRequests)
|
||||
pubs, err := rpc.NewAmqpRPCServer(c.AMQP.Publisher.Server, c.Publisher.MaxConcurrentRPCServerRequests, c)
|
||||
cmd.FailOnError(err, "Unable to create Publisher RPC server")
|
||||
rpc.NewPublisherServer(pubs, &pubi)
|
||||
|
||||
|
|
|
@ -59,31 +59,29 @@ func main() {
|
|||
|
||||
go cmd.ProfileCmd("RA", stats)
|
||||
|
||||
connectionHandler := func(srv *rpc.AmqpRPCServer) {
|
||||
vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, srv.Channel, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
caRPC, err := rpc.NewAmqpRPCClient("RA->CA", c.AMQP.CA.Server, srv.Channel, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
caRPC, err := rpc.NewAmqpRPCClient("RA->CA", c.AMQP.CA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
saRPC, err := rpc.NewAmqpRPCClient("RA->SA", c.AMQP.SA.Server, srv.Channel, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
saRPC, err := rpc.NewAmqpRPCClient("RA->SA", c.AMQP.SA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
vac, err := rpc.NewValidationAuthorityClient(vaRPC)
|
||||
cmd.FailOnError(err, "Unable to create VA client")
|
||||
vac, err := rpc.NewValidationAuthorityClient(vaRPC)
|
||||
cmd.FailOnError(err, "Unable to create VA client")
|
||||
|
||||
cac, err := rpc.NewCertificateAuthorityClient(caRPC)
|
||||
cmd.FailOnError(err, "Unable to create CA client")
|
||||
cac, err := rpc.NewCertificateAuthorityClient(caRPC)
|
||||
cmd.FailOnError(err, "Unable to create CA client")
|
||||
|
||||
sac, err := rpc.NewStorageAuthorityClient(saRPC)
|
||||
cmd.FailOnError(err, "Unable to create SA client")
|
||||
sac, err := rpc.NewStorageAuthorityClient(saRPC)
|
||||
cmd.FailOnError(err, "Unable to create SA client")
|
||||
|
||||
rai.VA = &vac
|
||||
rai.CA = &cac
|
||||
rai.SA = &sac
|
||||
}
|
||||
rai.VA = &vac
|
||||
rai.CA = &cac
|
||||
rai.SA = &sac
|
||||
|
||||
ras, err := rpc.NewAmqpRPCServer(c.AMQP.RA.Server, connectionHandler, c.RA.MaxConcurrentRPCServerRequests)
|
||||
ras, err := rpc.NewAmqpRPCServer(c.AMQP.RA.Server, c.RA.MaxConcurrentRPCServerRequests, c)
|
||||
cmd.FailOnError(err, "Unable to create RA RPC server")
|
||||
rpc.NewRegistrationAuthorityServer(ras, &rai)
|
||||
|
||||
|
|
|
@ -41,9 +41,7 @@ func main() {
|
|||
|
||||
go cmd.ProfileCmd("SA", stats)
|
||||
|
||||
connectionHandler := func(*rpc.AmqpRPCServer) {}
|
||||
|
||||
sas, err := rpc.NewAmqpRPCServer(c.AMQP.SA.Server, connectionHandler, c.SA.MaxConcurrentRPCServerRequests)
|
||||
sas, err := rpc.NewAmqpRPCServer(c.AMQP.SA.Server, c.SA.MaxConcurrentRPCServerRequests, c)
|
||||
cmd.FailOnError(err, "Unable to create SA RPC server")
|
||||
rpc.NewStorageAuthorityServer(sas, sai)
|
||||
|
||||
|
|
|
@ -62,17 +62,15 @@ func main() {
|
|||
}
|
||||
vai.UserAgent = c.VA.UserAgent
|
||||
|
||||
connectionHandler := func(srv *rpc.AmqpRPCServer) {
|
||||
raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, srv.Channel, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
rac, err := rpc.NewRegistrationAuthorityClient(raRPC)
|
||||
cmd.FailOnError(err, "Unable to create RA client")
|
||||
rac, err := rpc.NewRegistrationAuthorityClient(raRPC)
|
||||
cmd.FailOnError(err, "Unable to create RA client")
|
||||
|
||||
vai.RA = &rac
|
||||
}
|
||||
vai.RA = &rac
|
||||
|
||||
vas, err := rpc.NewAmqpRPCServer(c.AMQP.VA.Server, connectionHandler, c.VA.MaxConcurrentRPCServerRequests)
|
||||
vas, err := rpc.NewAmqpRPCServer(c.AMQP.VA.Server, c.VA.MaxConcurrentRPCServerRequests, c)
|
||||
cmd.FailOnError(err, "Unable to create VA RPC server")
|
||||
rpc.NewValidationAuthorityServer(vas, vai)
|
||||
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/codegangsta/cli"
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/facebookgo/httpdown"
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/jmhodges/clock"
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
|
||||
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
|
@ -23,17 +22,11 @@ import (
|
|||
"github.com/letsencrypt/boulder/wfe"
|
||||
)
|
||||
|
||||
func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient, chan *amqp.Error) {
|
||||
ch, err := rpc.AmqpChannel(c)
|
||||
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||
logger.Info(" [!] Connected to AMQP")
|
||||
|
||||
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
||||
|
||||
raRPC, err := rpc.NewAmqpRPCClient("WFE->RA", c.AMQP.RA.Server, ch, stats)
|
||||
func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient) {
|
||||
raRPC, err := rpc.NewAmqpRPCClient("WFE->RA", c.AMQP.RA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
saRPC, err := rpc.NewAmqpRPCClient("WFE->SA", c.AMQP.SA.Server, ch, stats)
|
||||
saRPC, err := rpc.NewAmqpRPCClient("WFE->SA", c.AMQP.SA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
rac, err := rpc.NewRegistrationAuthorityClient(raRPC)
|
||||
|
@ -42,7 +35,7 @@ func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (rpc
|
|||
sac, err := rpc.NewStorageAuthorityClient(saRPC)
|
||||
cmd.FailOnError(err, "Unable to create SA client")
|
||||
|
||||
return rac, sac, closeChan
|
||||
return rac, sac
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -78,7 +71,7 @@ func main() {
|
|||
|
||||
wfe, err := wfe.NewWebFrontEndImpl(stats, clock.Default())
|
||||
cmd.FailOnError(err, "Unable to create WFE")
|
||||
rac, sac, closeChan := setupWFE(c, auditlogger, stats)
|
||||
rac, sac := setupWFE(c, auditlogger, stats)
|
||||
wfe.RA = &rac
|
||||
wfe.SA = &sac
|
||||
wfe.SubscriberAgreementURL = c.SubscriberAgreementURL
|
||||
|
@ -104,21 +97,6 @@ func main() {
|
|||
|
||||
go cmd.ProfileCmd("WFE", stats)
|
||||
|
||||
go func() {
|
||||
// sit around and reconnect to AMQP if the channel
|
||||
// drops for some reason and repopulate the wfe object
|
||||
// with new RA and SA rpc clients.
|
||||
for {
|
||||
for err := range closeChan {
|
||||
auditlogger.Warning(fmt.Sprintf(" [!] AMQP Channel closed, will reconnect in 5 seconds: [%s]", err))
|
||||
time.Sleep(time.Second * 5)
|
||||
rac, sac, closeChan = setupWFE(c, auditlogger, stats)
|
||||
wfe.RA = &rac
|
||||
wfe.SA = &sac
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Set up paths
|
||||
wfe.BaseURL = c.Common.BaseURL
|
||||
h, err := wfe.Handler()
|
||||
|
|
|
@ -245,10 +245,7 @@ func main() {
|
|||
dbMap, err := sa.NewDbMap(c.Mailer.DBConnect)
|
||||
cmd.FailOnError(err, "Could not connect to database")
|
||||
|
||||
ch, err := rpc.AmqpChannel(c)
|
||||
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||
|
||||
saRPC, err := rpc.NewAmqpRPCClient("ExpirationMailer->SA", c.AMQP.SA.Server, ch, stats)
|
||||
saRPC, err := rpc.NewAmqpRPCClient("ExpirationMailer->SA", c.AMQP.SA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
sac, err := rpc.NewStorageAuthorityClient(saRPC)
|
||||
|
|
|
@ -9,12 +9,10 @@ import (
|
|||
"crypto/x509"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd"
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/jmhodges/clock"
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
|
||||
gorp "github.com/letsencrypt/boulder/Godeps/_workspace/src/gopkg.in/gorp.v1"
|
||||
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
|
@ -464,32 +462,26 @@ func setupClients(c cmd.Config, stats statsd.Statter) (
|
|||
core.CertificateAuthority,
|
||||
core.Publisher,
|
||||
core.StorageAuthority,
|
||||
chan *amqp.Error,
|
||||
) {
|
||||
ch, err := rpc.AmqpChannel(c)
|
||||
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||
|
||||
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
||||
|
||||
caRPC, err := rpc.NewAmqpRPCClient("OCSP->CA", c.AMQP.CA.Server, ch, stats)
|
||||
caRPC, err := rpc.NewAmqpRPCClient("OCSP->CA", c.AMQP.CA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
cac, err := rpc.NewCertificateAuthorityClient(caRPC)
|
||||
cmd.FailOnError(err, "Unable to create CA client")
|
||||
|
||||
pubRPC, err := rpc.NewAmqpRPCClient("OCSP->Publisher", c.AMQP.Publisher.Server, ch, stats)
|
||||
pubRPC, err := rpc.NewAmqpRPCClient("OCSP->Publisher", c.AMQP.Publisher.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
pubc, err := rpc.NewPublisherClient(pubRPC)
|
||||
cmd.FailOnError(err, "Unable to create Publisher client")
|
||||
|
||||
saRPC, err := rpc.NewAmqpRPCClient("OCSP->SA", c.AMQP.SA.Server, ch, stats)
|
||||
saRPC, err := rpc.NewAmqpRPCClient("OCSP->SA", c.AMQP.SA.Server, c, stats)
|
||||
cmd.FailOnError(err, "Unable to create RPC client")
|
||||
|
||||
sac, err := rpc.NewStorageAuthorityClient(saRPC)
|
||||
cmd.FailOnError(err, "Unable to create Publisher client")
|
||||
|
||||
return cac, pubc, sac, closeChan
|
||||
return cac, pubc, sac
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -516,7 +508,7 @@ func main() {
|
|||
dbMap, err := sa.NewDbMap(c.OCSPUpdater.DBConnect)
|
||||
cmd.FailOnError(err, "Could not connect to database")
|
||||
|
||||
cac, pubc, sac, closeChan := setupClients(c, stats)
|
||||
cac, pubc, sac := setupClients(c, stats)
|
||||
|
||||
updater, err := newUpdater(
|
||||
stats,
|
||||
|
@ -530,6 +522,8 @@ func main() {
|
|||
len(c.Common.CT.Logs),
|
||||
)
|
||||
|
||||
cmd.FailOnError(err, "Failed to create updater")
|
||||
|
||||
for _, l := range updater.loops {
|
||||
go func(loop *looper) {
|
||||
err = loop.loop()
|
||||
|
@ -539,15 +533,8 @@ func main() {
|
|||
}(l)
|
||||
}
|
||||
|
||||
cmd.FailOnError(err, "Failed to create updater")
|
||||
|
||||
// TODO(): When the channel falls over so do we for now, if the AMQP channel
|
||||
// has already closed there is no real cleanup we can do. This is due to
|
||||
// really needing to change the underlying AMQP Server/Client reconnection
|
||||
// logic.
|
||||
err = <-closeChan
|
||||
auditlogger.AuditErr(fmt.Errorf(" [!] AMQP Channel closed, exiting: [%s]", err))
|
||||
os.Exit(1)
|
||||
// Sleep forever (until signaled)
|
||||
select {}
|
||||
}
|
||||
|
||||
app.Run()
|
||||
|
|
22
cmd/shell.go
22
cmd/shell.go
|
@ -58,15 +58,19 @@ type Config struct {
|
|||
|
||||
// General
|
||||
AMQP struct {
|
||||
Server string
|
||||
Insecure bool
|
||||
RA Queue
|
||||
VA Queue
|
||||
SA Queue
|
||||
CA Queue
|
||||
OCSP Queue
|
||||
Publisher Queue
|
||||
TLS *TLSConfig
|
||||
Server string
|
||||
Insecure bool
|
||||
RA Queue
|
||||
VA Queue
|
||||
SA Queue
|
||||
CA Queue
|
||||
OCSP Queue
|
||||
Publisher Queue
|
||||
TLS *TLSConfig
|
||||
ReconnectTimeouts struct {
|
||||
Base ConfigDuration
|
||||
Max ConfigDuration
|
||||
}
|
||||
}
|
||||
|
||||
WFE struct {
|
||||
|
|
256
rpc/amqp-rpc.go
256
rpc/amqp-rpc.go
|
@ -52,12 +52,13 @@ const (
|
|||
AmqpAutoAck = true
|
||||
AmqpMandatory = false
|
||||
AmqpImmediate = false
|
||||
consumerName = "boulder"
|
||||
)
|
||||
|
||||
// AMQPDeclareExchange attempts to declare the configured AMQP exchange,
|
||||
// returning silently if already declared, erroring if nonexistant and
|
||||
// unable to create.
|
||||
func AMQPDeclareExchange(conn *amqp.Connection) error {
|
||||
func amqpDeclareExchange(conn *amqp.Connection) error {
|
||||
var err error
|
||||
var ch *amqp.Channel
|
||||
log := blog.GetAuditLogger()
|
||||
|
@ -77,7 +78,7 @@ func AMQPDeclareExchange(conn *amqp.Connection) error {
|
|||
AmqpNoWait,
|
||||
nil)
|
||||
if err != nil {
|
||||
log.Info(fmt.Sprintf("Exchange %s does not exist on AMQP server, attempting to create. (err=%s)", AmqpExchange, err))
|
||||
log.Info(fmt.Sprintf("Exchange %s does not exist on AMQP server, creating.", AmqpExchange))
|
||||
|
||||
// Channel is invalid at this point, so recreate
|
||||
ch.Close()
|
||||
|
@ -100,6 +101,7 @@ func AMQPDeclareExchange(conn *amqp.Connection) error {
|
|||
ch.Close()
|
||||
return err
|
||||
}
|
||||
log.Info(fmt.Sprintf("Created exchange %s.", AmqpExchange))
|
||||
}
|
||||
|
||||
ch.Close()
|
||||
|
@ -107,7 +109,7 @@ func AMQPDeclareExchange(conn *amqp.Connection) error {
|
|||
}
|
||||
|
||||
// A simplified way to declare and subscribe to an AMQP queue
|
||||
func amqpSubscribe(ch *amqp.Channel, name string, consumerName string, log *blog.AuditLogger) (<-chan amqp.Delivery, error) {
|
||||
func amqpSubscribe(ch amqpChannel, name string) (<-chan amqp.Delivery, error) {
|
||||
var err error
|
||||
|
||||
_, err = ch.QueueDeclare(
|
||||
|
@ -118,8 +120,7 @@ func amqpSubscribe(ch *amqp.Channel, name string, consumerName string, log *blog
|
|||
AmqpNoWait,
|
||||
nil)
|
||||
if err != nil {
|
||||
log.Crit(fmt.Sprintf("Could not declare queue: %s", err))
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("could not declare queue: %s", err)
|
||||
}
|
||||
|
||||
routingKey := name
|
||||
|
@ -131,13 +132,12 @@ func amqpSubscribe(ch *amqp.Channel, name string, consumerName string, log *blog
|
|||
false,
|
||||
nil)
|
||||
if err != nil {
|
||||
log.Crit(fmt.Sprintf("Could not bind to queue [%s]. NOTE: You may need to delete %s to re-trigger the bind attempt after fixing permissions, or manually bind the queue to %s.", name, name, routingKey))
|
||||
err = fmt.Errorf(
|
||||
"Could not bind to queue [%s]. NOTE: You may need to delete %s to re-trigger the bind attempt after fixing permissions, or manually bind the queue to %s.",
|
||||
name, name, routingKey)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// A consumer name is used so that the specific consumer can be cancelled later
|
||||
// if signalled. If no name is used a UID is used which cannot be retrieved (as
|
||||
// far as I can tell).
|
||||
msgs, err := ch.Consume(
|
||||
name,
|
||||
consumerName,
|
||||
|
@ -147,11 +147,10 @@ func amqpSubscribe(ch *amqp.Channel, name string, consumerName string, log *blog
|
|||
AmqpNoWait,
|
||||
nil)
|
||||
if err != nil {
|
||||
log.Crit(fmt.Sprintf("Could not subscribe to queue: %s", err))
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("Could not subscribe to queue: %s", err)
|
||||
}
|
||||
|
||||
return msgs, err
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
// AmqpRPCServer listens on a specified queue within an AMQP channel.
|
||||
|
@ -162,14 +161,12 @@ func amqpSubscribe(ch *amqp.Channel, name string, consumerName string, log *blog
|
|||
// method to add specific actions.
|
||||
type AmqpRPCServer struct {
|
||||
serverQueue string
|
||||
Channel *amqp.Channel
|
||||
connection *amqpConnector
|
||||
log *blog.AuditLogger
|
||||
dispatchTable map[string]func([]byte) ([]byte, error)
|
||||
connectionHandler func(*AmqpRPCServer)
|
||||
consumerName string
|
||||
connected bool
|
||||
done bool
|
||||
dMu sync.Mutex
|
||||
mu sync.RWMutex
|
||||
currentGoroutines int64
|
||||
maxConcurrentRPCServerRequests int64
|
||||
tooManyRequestsResponse []byte
|
||||
|
@ -177,20 +174,23 @@ type AmqpRPCServer struct {
|
|||
|
||||
// NewAmqpRPCServer creates a new RPC server for the given queue and will begin
|
||||
// consuming requests from the queue. To start the server you must call Start().
|
||||
func NewAmqpRPCServer(serverQueue string, handler func(*AmqpRPCServer), maxConcurrentRPCServerRequests int64) (*AmqpRPCServer, error) {
|
||||
func NewAmqpRPCServer(serverQueue string, maxConcurrentRPCServerRequests int64, c cmd.Config) (*AmqpRPCServer, error) {
|
||||
log := blog.GetAuditLogger()
|
||||
b := make([]byte, 4)
|
||||
_, err := rand.Read(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
reconnectBase := c.AMQP.ReconnectTimeouts.Base.Duration
|
||||
if reconnectBase == 0 {
|
||||
reconnectBase = 20 * time.Millisecond
|
||||
}
|
||||
consumerName := fmt.Sprintf("%s.%x", serverQueue, b)
|
||||
reconnectMax := c.AMQP.ReconnectTimeouts.Max.Duration
|
||||
if reconnectMax == 0 {
|
||||
reconnectMax = time.Minute
|
||||
}
|
||||
|
||||
return &AmqpRPCServer{
|
||||
serverQueue: serverQueue,
|
||||
connection: newAMQPConnector(serverQueue, reconnectBase, reconnectMax),
|
||||
log: log,
|
||||
dispatchTable: make(map[string]func([]byte) ([]byte, error)),
|
||||
connectionHandler: handler,
|
||||
consumerName: consumerName,
|
||||
maxConcurrentRPCServerRequests: maxConcurrentRPCServerRequests,
|
||||
}, nil
|
||||
}
|
||||
|
@ -349,7 +349,7 @@ func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
err = AMQPDeclareExchange(conn)
|
||||
err = amqpDeclareExchange(conn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -380,31 +380,23 @@ func (rpc *AmqpRPCServer) processMessage(msg amqp.Delivery) {
|
|||
rpc.log.Info(fmt.Sprintf(" [s>][%s][%s] %s failed, replying: %s (%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, response.Error.Value, response.Error.Type, msg.CorrelationId))
|
||||
}
|
||||
rpc.log.Debug(fmt.Sprintf(" [s>][%s][%s] replying %s(%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(jsonResponse), msg.CorrelationId))
|
||||
rpc.Channel.Publish(
|
||||
AmqpExchange,
|
||||
rpc.connection.publish(
|
||||
msg.ReplyTo,
|
||||
AmqpMandatory,
|
||||
AmqpImmediate,
|
||||
amqp.Publishing{
|
||||
CorrelationId: msg.CorrelationId,
|
||||
Type: msg.Type,
|
||||
Body: jsonResponse, // XXX-JWS: jws.Sign(privKey, body)
|
||||
Expiration: "30000",
|
||||
})
|
||||
msg.CorrelationId,
|
||||
"30000",
|
||||
"",
|
||||
msg.Type,
|
||||
jsonResponse)
|
||||
}
|
||||
|
||||
func (rpc *AmqpRPCServer) replyTooManyRequests(msg amqp.Delivery) {
|
||||
rpc.Channel.Publish(
|
||||
AmqpExchange,
|
||||
func (rpc *AmqpRPCServer) replyTooManyRequests(msg amqp.Delivery) error {
|
||||
return rpc.connection.publish(
|
||||
msg.ReplyTo,
|
||||
AmqpMandatory,
|
||||
AmqpImmediate,
|
||||
amqp.Publishing{
|
||||
CorrelationId: msg.CorrelationId,
|
||||
Type: msg.Type,
|
||||
Body: rpc.tooManyRequestsResponse,
|
||||
Expiration: "1000",
|
||||
})
|
||||
msg.CorrelationId,
|
||||
"1000",
|
||||
"",
|
||||
msg.Type,
|
||||
rpc.tooManyRequestsResponse)
|
||||
}
|
||||
|
||||
// Start starts the AMQP-RPC server and handles reconnections, this will block
|
||||
|
@ -420,56 +412,45 @@ func (rpc *AmqpRPCServer) Start(c cmd.Config) error {
|
|||
}
|
||||
rpc.tooManyRequestsResponse = tooManyRequestsResponse
|
||||
|
||||
err = rpc.connection.connect(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rpc.mu.Lock()
|
||||
rpc.connected = true
|
||||
rpc.mu.Unlock()
|
||||
|
||||
go rpc.catchSignals()
|
||||
|
||||
for {
|
||||
rpc.dMu.Lock()
|
||||
if rpc.done {
|
||||
rpc.dMu.Unlock()
|
||||
break
|
||||
}
|
||||
rpc.dMu.Unlock()
|
||||
var err error
|
||||
rpc.Channel, err = AmqpChannel(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rpc.connectionHandler(rpc)
|
||||
|
||||
msgs, err := amqpSubscribe(rpc.Channel, rpc.serverQueue, rpc.consumerName, rpc.log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rpc.connected = true
|
||||
rpc.log.Info(" [!] Connected to AMQP")
|
||||
|
||||
closeChan := rpc.Channel.NotifyClose(make(chan *amqp.Error, 1))
|
||||
for blocking := true; blocking; {
|
||||
select {
|
||||
case msg, ok := <-msgs:
|
||||
if ok {
|
||||
if rpc.maxConcurrentRPCServerRequests > 0 && atomic.LoadInt64(&rpc.currentGoroutines) >= rpc.maxConcurrentRPCServerRequests {
|
||||
rpc.replyTooManyRequests(msg)
|
||||
break // this breaks the select, not the for
|
||||
}
|
||||
go func() {
|
||||
atomic.AddInt64(&rpc.currentGoroutines, 1)
|
||||
defer atomic.AddInt64(&rpc.currentGoroutines, -1)
|
||||
rpc.processMessage(msg)
|
||||
}()
|
||||
} else {
|
||||
// chan has been closed by rpc.channel.Cancel
|
||||
select {
|
||||
case msg, ok := <-rpc.connection.messages():
|
||||
if ok {
|
||||
if rpc.maxConcurrentRPCServerRequests > 0 && atomic.LoadInt64(&rpc.currentGoroutines) >= rpc.maxConcurrentRPCServerRequests {
|
||||
rpc.replyTooManyRequests(msg)
|
||||
break // this breaks the select, not the for
|
||||
}
|
||||
go func() {
|
||||
atomic.AddInt64(&rpc.currentGoroutines, 1)
|
||||
defer atomic.AddInt64(&rpc.currentGoroutines, -1)
|
||||
rpc.processMessage(msg)
|
||||
}()
|
||||
} else {
|
||||
rpc.mu.RLock()
|
||||
if rpc.done {
|
||||
// chan has been closed by rpc.connection.Cancel
|
||||
rpc.log.Info(" [!] Finished processing messages")
|
||||
rpc.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
case err = <-closeChan:
|
||||
rpc.connected = false
|
||||
rpc.log.Warning(fmt.Sprintf(" [!] AMQP Channel closed, will reconnect in 5 seconds: [%s]", err))
|
||||
time.Sleep(time.Second * 5)
|
||||
blocking = false
|
||||
rpc.mu.RUnlock()
|
||||
rpc.log.Info(" [!] Got channel close, but no signal to shut down. Continuing.")
|
||||
}
|
||||
case err = <-rpc.connection.closeChannel():
|
||||
rpc.log.Info(fmt.Sprintf(" [!] Server channel closed: %s", rpc.serverQueue))
|
||||
rpc.connection.reconnect(c, rpc.log)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var signalToName = map[os.Signal]string{
|
||||
|
@ -494,39 +475,36 @@ func (rpc *AmqpRPCServer) catchSignals() {
|
|||
// continue blocking until it has processed any messages that have already been
|
||||
// retrieved.
|
||||
func (rpc *AmqpRPCServer) Stop() {
|
||||
rpc.mu.Lock()
|
||||
rpc.done = true
|
||||
rpc.mu.Unlock()
|
||||
if rpc.connected {
|
||||
rpc.log.Info(" [!] Shutting down RPC server, stopping new deliveries and processing remaining messages")
|
||||
rpc.Channel.Cancel(rpc.consumerName, false)
|
||||
rpc.connection.cancel()
|
||||
} else {
|
||||
rpc.log.Info("[!] Shutting down RPC server, nothing to clean up")
|
||||
rpc.dMu.Lock()
|
||||
rpc.done = true
|
||||
rpc.dMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// AmqpRPCCLient is an AMQP-RPC client that sends requests to a specific server
|
||||
// queue, and uses a dedicated response queue for responses.
|
||||
//
|
||||
// To implement specific functionality, using code uses the Dispatch()
|
||||
// To implement specific functionality, using code uses the DispatchSync()
|
||||
// method to send a method name and body, and get back a response. So
|
||||
// you end up with wrapper methods of the form:
|
||||
//
|
||||
// ```
|
||||
// request = /* serialize request to []byte */
|
||||
// response = <-AmqpRPCCLient.Dispatch(method, request)
|
||||
// response = AmqpRPCCLient.Dispatch(method, request)
|
||||
// return /* deserialized response */
|
||||
// ```
|
||||
//
|
||||
// Callers that don't care about the response can just call Dispatch()
|
||||
// and ignore the return value.
|
||||
//
|
||||
// DispatchSync will manage the channel for you, and also enforce a
|
||||
// timeout on the transaction (default 60 seconds)
|
||||
// timeout on the transaction.
|
||||
type AmqpRPCCLient struct {
|
||||
serverQueue string
|
||||
clientQueue string
|
||||
channel *amqp.Channel
|
||||
connection *amqpConnector
|
||||
timeout time.Duration
|
||||
log *blog.AuditLogger
|
||||
|
||||
|
@ -537,7 +515,7 @@ type AmqpRPCCLient struct {
|
|||
}
|
||||
|
||||
// NewAmqpRPCClient constructs an RPC client using AMQP
|
||||
func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, channel *amqp.Channel, stats statsd.Statter) (rpc *AmqpRPCCLient, err error) {
|
||||
func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats statsd.Statter) (rpc *AmqpRPCCLient, err error) {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -550,40 +528,59 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, channel *amqp.Chann
|
|||
}
|
||||
clientQueue := fmt.Sprintf("%s.%s.%x", clientQueuePrefix, hostname, randID)
|
||||
|
||||
reconnectBase := c.AMQP.ReconnectTimeouts.Base.Duration
|
||||
if reconnectBase == 0 {
|
||||
reconnectBase = 20 * time.Millisecond
|
||||
}
|
||||
reconnectMax := c.AMQP.ReconnectTimeouts.Max.Duration
|
||||
if reconnectMax == 0 {
|
||||
reconnectMax = time.Minute
|
||||
}
|
||||
|
||||
rpc = &AmqpRPCCLient{
|
||||
serverQueue: serverQueue,
|
||||
clientQueue: clientQueue,
|
||||
channel: channel,
|
||||
connection: newAMQPConnector(clientQueue, reconnectBase, reconnectMax),
|
||||
pending: make(map[string]chan []byte),
|
||||
timeout: 10 * time.Second,
|
||||
log: blog.GetAuditLogger(),
|
||||
stats: stats,
|
||||
}
|
||||
|
||||
// Subscribe to the response queue and dispatch
|
||||
msgs, err := amqpSubscribe(rpc.channel, clientQueue, "", rpc.log)
|
||||
err = rpc.connection.connect(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for msg := range msgs {
|
||||
// XXX-JWS: jws.Sign(privKey, body)
|
||||
corrID := msg.CorrelationId
|
||||
rpc.mu.RLock()
|
||||
responseChan, present := rpc.pending[corrID]
|
||||
rpc.mu.RUnlock()
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-rpc.connection.messages():
|
||||
if ok {
|
||||
corrID := msg.CorrelationId
|
||||
rpc.mu.RLock()
|
||||
responseChan, present := rpc.pending[corrID]
|
||||
rpc.mu.RUnlock()
|
||||
|
||||
rpc.log.Debug(fmt.Sprintf(" [c<][%s] response %s(%s) [%s]", clientQueue, msg.Type, core.B64enc(msg.Body), corrID))
|
||||
if !present {
|
||||
// AUDIT[ Misrouted Messages ] f523f21f-12d2-4c31-b2eb-ee4b7d96d60e
|
||||
rpc.log.Audit(fmt.Sprintf(" [c<][%s] Misrouted message: %s - %s - %s", clientQueue, msg.Type, core.B64enc(msg.Body), msg.CorrelationId))
|
||||
continue
|
||||
rpc.log.Debug(fmt.Sprintf(" [c<][%s] response %s(%s) [%s]", clientQueue, msg.Type, core.B64enc(msg.Body), corrID))
|
||||
if !present {
|
||||
// AUDIT[ Misrouted Messages ] f523f21f-12d2-4c31-b2eb-ee4b7d96d60e
|
||||
rpc.log.Audit(fmt.Sprintf(" [c<][%s] Misrouted message: %s - %s - %s", clientQueue, msg.Type, core.B64enc(msg.Body), msg.CorrelationId))
|
||||
continue
|
||||
}
|
||||
responseChan <- msg.Body
|
||||
rpc.mu.Lock()
|
||||
delete(rpc.pending, corrID)
|
||||
rpc.mu.Unlock()
|
||||
} else {
|
||||
// chan has been closed by rpc.connection.Cancel
|
||||
rpc.log.Info(fmt.Sprintf(" [!] Client reply channel closed: %s", rpc.clientQueue))
|
||||
continue
|
||||
}
|
||||
case err = <-rpc.connection.closeChannel():
|
||||
rpc.log.Info(fmt.Sprintf(" [!] Client reply channel closed : %s", rpc.clientQueue))
|
||||
rpc.connection.reconnect(c, rpc.log)
|
||||
}
|
||||
responseChan <- msg.Body
|
||||
rpc.mu.Lock()
|
||||
delete(rpc.pending, corrID)
|
||||
rpc.mu.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -596,10 +593,10 @@ func (rpc *AmqpRPCCLient) SetTimeout(ttl time.Duration) {
|
|||
rpc.timeout = ttl
|
||||
}
|
||||
|
||||
// Dispatch sends a body to the destination, and returns a response channel
|
||||
// dispatch sends a body to the destination, and returns a response channel
|
||||
// that can be used to monitor for responses, or discarded for one-shot
|
||||
// actions.
|
||||
func (rpc *AmqpRPCCLient) Dispatch(method string, body []byte) chan []byte {
|
||||
func (rpc *AmqpRPCCLient) dispatch(method string, body []byte) chan []byte {
|
||||
// Create a channel on which to direct the response
|
||||
// At least in some cases, it's important that this channel
|
||||
// be buffered to avoid deadlock
|
||||
|
@ -611,18 +608,13 @@ func (rpc *AmqpRPCCLient) Dispatch(method string, body []byte) chan []byte {
|
|||
|
||||
// Send the request
|
||||
rpc.log.Debug(fmt.Sprintf(" [c>][%s] requesting %s(%s) [%s]", rpc.clientQueue, method, core.B64enc(body), corrID))
|
||||
rpc.channel.Publish(
|
||||
AmqpExchange,
|
||||
rpc.connection.publish(
|
||||
rpc.serverQueue,
|
||||
AmqpMandatory,
|
||||
AmqpImmediate,
|
||||
amqp.Publishing{
|
||||
CorrelationId: corrID,
|
||||
ReplyTo: rpc.clientQueue,
|
||||
Type: method,
|
||||
Body: body, // XXX-JWS: jws.Sign(privKey, body)
|
||||
Expiration: "30000",
|
||||
})
|
||||
corrID,
|
||||
"30000",
|
||||
rpc.clientQueue,
|
||||
method,
|
||||
body)
|
||||
|
||||
return responseChan
|
||||
}
|
||||
|
@ -635,7 +627,7 @@ func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []b
|
|||
defer rpc.stats.GaugeDelta("RPC.CallsWaiting", -1, 1.0)
|
||||
callStarted := time.Now()
|
||||
select {
|
||||
case jsonResponse := <-rpc.Dispatch(method, body):
|
||||
case jsonResponse := <-rpc.dispatch(method, body):
|
||||
var rpcResponse rpcResponse
|
||||
err = json.Unmarshal(jsonResponse, &rpcResponse)
|
||||
if err != nil {
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
package rpc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
|
||||
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/jmhodges/clock"
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
"github.com/letsencrypt/boulder/core"
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
)
|
||||
|
||||
func newAMQPConnector(
|
||||
queueName string,
|
||||
retryTimeoutBase time.Duration,
|
||||
retryTimeoutMax time.Duration,
|
||||
) *amqpConnector {
|
||||
return &amqpConnector{
|
||||
queueName: queueName,
|
||||
chMaker: defaultChannelMaker{},
|
||||
clk: clock.Default(),
|
||||
retryTimeoutBase: retryTimeoutBase,
|
||||
retryTimeoutMax: retryTimeoutMax,
|
||||
}
|
||||
}
|
||||
|
||||
// channelMaker encapsulates how to create an AMQP channel.
|
||||
type channelMaker interface {
|
||||
makeChannel(conf cmd.Config) (amqpChannel, error)
|
||||
}
|
||||
|
||||
type defaultChannelMaker struct{}
|
||||
|
||||
func (d defaultChannelMaker) makeChannel(conf cmd.Config) (amqpChannel, error) {
|
||||
return AmqpChannel(conf)
|
||||
}
|
||||
|
||||
// amqpConnector encapsulates an AMQP channel and a subscription to a specific
|
||||
// queue, plus appropriate locking for its members. It provides reconnect logic,
|
||||
// and allows publishing via the channel onto an arbitrary queue.
|
||||
type amqpConnector struct {
|
||||
mu sync.RWMutex
|
||||
chMaker channelMaker
|
||||
channel amqpChannel
|
||||
queueName string
|
||||
closeChan chan *amqp.Error
|
||||
msgs <-chan amqp.Delivery
|
||||
retryTimeoutBase time.Duration
|
||||
retryTimeoutMax time.Duration
|
||||
clk clock.Clock
|
||||
}
|
||||
|
||||
func (ac *amqpConnector) messages() <-chan amqp.Delivery {
|
||||
ac.mu.RLock()
|
||||
defer ac.mu.RUnlock()
|
||||
return ac.msgs
|
||||
}
|
||||
|
||||
func (ac *amqpConnector) closeChannel() chan *amqp.Error {
|
||||
ac.mu.RLock()
|
||||
defer ac.mu.RUnlock()
|
||||
return ac.closeChan
|
||||
}
|
||||
|
||||
// connect attempts to connect to a channel and subscribe to the named queue,
|
||||
// returning error if it fails. This is used at first startup, where we want to
|
||||
// fail fast if we can't connect.
|
||||
func (ac *amqpConnector) connect(config cmd.Config) error {
|
||||
channel, err := ac.chMaker.makeChannel(config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("channel connect failed for %s: %s", ac.queueName, err)
|
||||
}
|
||||
msgs, err := amqpSubscribe(channel, ac.queueName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("queue subscribe failed for %s: %s", ac.queueName, err)
|
||||
}
|
||||
closeChan := channel.NotifyClose(make(chan *amqp.Error, 1))
|
||||
ac.mu.Lock()
|
||||
defer ac.mu.Unlock()
|
||||
ac.channel = channel
|
||||
ac.msgs = msgs
|
||||
ac.closeChan = closeChan
|
||||
return nil
|
||||
}
|
||||
|
||||
// reconnect attempts repeatedly to connect and subscribe to the named queue. It
|
||||
// will loop forever until it succeeds. This is used for a running server, where
|
||||
// we don't want to shut down because we lost our AMQP connection.
|
||||
func (ac *amqpConnector) reconnect(config cmd.Config, log blog.SyslogWriter) {
|
||||
for i := 0; ; i++ {
|
||||
ac.clk.Sleep(core.RetryBackoff(i, ac.retryTimeoutBase, ac.retryTimeoutMax, 2))
|
||||
log.Info(fmt.Sprintf(" [!] attempting reconnect for %s", ac.queueName))
|
||||
err := ac.connect(config)
|
||||
if err != nil {
|
||||
log.Warning(fmt.Sprintf(" [!] %s", err))
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
log.Info(fmt.Sprintf(" [!] reconnect success for %s", ac.queueName))
|
||||
return
|
||||
}
|
||||
|
||||
// cancel cancels the AMQP channel. Used for graceful shutdowns.
|
||||
func (ac *amqpConnector) cancel() {
|
||||
ac.mu.RLock()
|
||||
channel := ac.channel
|
||||
ac.mu.RUnlock()
|
||||
channel.Cancel(consumerName, false)
|
||||
}
|
||||
|
||||
// publish publishes a message onto the provided queue. We provide this wrapper
|
||||
// because it requires locking around the read of ac.channel.
|
||||
func (ac *amqpConnector) publish(queueName, corrId, expiration, replyTo, msgType string, body []byte) error {
|
||||
ac.mu.RLock()
|
||||
channel := ac.channel
|
||||
ac.mu.RUnlock()
|
||||
return channel.Publish(
|
||||
AmqpExchange,
|
||||
queueName,
|
||||
AmqpMandatory,
|
||||
AmqpImmediate,
|
||||
amqp.Publishing{
|
||||
Body: body,
|
||||
CorrelationId: corrId,
|
||||
Expiration: expiration,
|
||||
ReplyTo: replyTo,
|
||||
Type: msgType,
|
||||
})
|
||||
}
|
||||
|
||||
// amqpChannel defines the subset of amqp.Channel methods that we use in this
|
||||
// package.
|
||||
type amqpChannel interface {
|
||||
Cancel(consumer string, noWait bool) error
|
||||
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
|
||||
NotifyClose(c chan *amqp.Error) chan *amqp.Error
|
||||
Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
|
||||
QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
|
||||
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
|
||||
}
|
|
@ -0,0 +1,130 @@
|
|||
package rpc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/golang/mock/gomock"
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/jmhodges/clock"
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
"github.com/letsencrypt/boulder/mocks"
|
||||
)
|
||||
|
||||
// mockChannelMaker always returns the given amqpChannel
|
||||
type mockChannelMaker struct {
|
||||
channel amqpChannel
|
||||
}
|
||||
|
||||
func (m mockChannelMaker) makeChannel(conf cmd.Config) (amqpChannel, error) {
|
||||
return m.channel, nil
|
||||
}
|
||||
|
||||
func setup(t *testing.T) (*amqpConnector, *MockamqpChannel, func()) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
|
||||
mockChannel := NewMockamqpChannel(mockCtrl)
|
||||
ac := amqpConnector{
|
||||
chMaker: mockChannelMaker{
|
||||
channel: mockChannel,
|
||||
},
|
||||
queueName: "fooqueue",
|
||||
retryTimeoutBase: time.Second,
|
||||
}
|
||||
return &ac, mockChannel, func() { mockCtrl.Finish() }
|
||||
}
|
||||
|
||||
func TestConnect(t *testing.T) {
|
||||
ac, mockChannel, finish := setup(t)
|
||||
defer finish()
|
||||
mockChannel.EXPECT().QueueDeclare(
|
||||
"fooqueue", AmqpDurable, AmqpDeleteUnused, AmqpExclusive, AmqpNoWait, nil)
|
||||
mockChannel.EXPECT().QueueBind("fooqueue", "fooqueue", AmqpExchange, false, nil)
|
||||
mockChannel.EXPECT().Consume("fooqueue", consumerName, AmqpAutoAck, AmqpExclusive, AmqpNoLocal, AmqpNoWait, nil).Return(make(<-chan amqp.Delivery), nil)
|
||||
mockChannel.EXPECT().NotifyClose(gomock.Any()).Return(make(chan *amqp.Error))
|
||||
err := ac.connect(cmd.Config{})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to connect: %s", err)
|
||||
}
|
||||
if ac.channel != mockChannel {
|
||||
t.Errorf("ac.channel was not equal to mockChannel")
|
||||
}
|
||||
if ac.messages() == nil {
|
||||
t.Errorf("ac.msgs was not initialized")
|
||||
}
|
||||
if ac.closeChannel() == nil {
|
||||
t.Errorf("ac.closeChan was not initialized")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnectFail(t *testing.T) {
|
||||
ac, mockChannel, finish := setup(t)
|
||||
defer finish()
|
||||
mockChannel.EXPECT().QueueDeclare(
|
||||
"fooqueue", AmqpDurable, AmqpDeleteUnused, AmqpExclusive, AmqpNoWait, nil).Return(amqp.Queue{}, errors.New("fail"))
|
||||
err := ac.connect(cmd.Config{})
|
||||
if err == nil {
|
||||
t.Fatalf("connect should have errored but did not")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReconnect(t *testing.T) {
|
||||
ac, mockChannel, finish := setup(t)
|
||||
defer finish()
|
||||
|
||||
// Override the clock so the sleep calls are instantaneous, regardless of what
|
||||
// the retry calls say.
|
||||
ac.clk = clock.NewFake()
|
||||
ac.retryTimeoutBase = time.Second
|
||||
ac.retryTimeoutMax = time.Second
|
||||
|
||||
mockChannel.EXPECT().QueueDeclare(
|
||||
"fooqueue", AmqpDurable, AmqpDeleteUnused, AmqpExclusive, AmqpNoWait, nil).AnyTimes()
|
||||
mockChannel.EXPECT().QueueBind("fooqueue", "fooqueue", AmqpExchange, false, nil).Times(3).Return(errors.New("fail"))
|
||||
mockChannel.EXPECT().QueueBind("fooqueue", "fooqueue", AmqpExchange, false, nil).Return(nil)
|
||||
mockChannel.EXPECT().Consume("fooqueue", consumerName, AmqpAutoAck, AmqpExclusive, AmqpNoLocal, AmqpNoWait, nil).Return(make(<-chan amqp.Delivery), nil)
|
||||
mockChannel.EXPECT().NotifyClose(gomock.Any()).Return(make(chan *amqp.Error))
|
||||
|
||||
log = mocks.UseMockLog()
|
||||
|
||||
ac.reconnect(cmd.Config{}, log)
|
||||
if ac.channel != mockChannel {
|
||||
t.Errorf("ac.channel was not equal to mockChannel")
|
||||
}
|
||||
if ac.msgs == nil {
|
||||
t.Errorf("ac.msgs was not initialized")
|
||||
}
|
||||
if ac.closeChan == nil {
|
||||
t.Errorf("ac.closeChan was not initialized")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCancel(t *testing.T) {
|
||||
ac, mockChannel, finish := setup(t)
|
||||
defer finish()
|
||||
// Since we're skipping the connect step, fake it by assigning directly to
|
||||
// channel.
|
||||
ac.channel = mockChannel
|
||||
mockChannel.EXPECT().Cancel(consumerName, false)
|
||||
ac.cancel()
|
||||
}
|
||||
|
||||
func TestPublish(t *testing.T) {
|
||||
ac, mockChannel, finish := setup(t)
|
||||
defer finish()
|
||||
ac.channel = mockChannel
|
||||
mockChannel.EXPECT().Publish(
|
||||
AmqpExchange,
|
||||
"fooqueue",
|
||||
AmqpMandatory,
|
||||
AmqpImmediate,
|
||||
amqp.Publishing{
|
||||
Body: []byte("body"),
|
||||
CorrelationId: "03c52e",
|
||||
Expiration: "3000",
|
||||
ReplyTo: "replyTo",
|
||||
Type: "testMsg",
|
||||
})
|
||||
ac.publish("fooqueue", "03c52e", "3000", "replyTo", "testMsg", []byte("body"))
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
// Automatically generated by MockGen. DO NOT EDIT!
|
||||
// Source: rpc/amqp-rpc-connection.go
|
||||
|
||||
package rpc
|
||||
|
||||
import (
|
||||
gomock "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/golang/mock/gomock"
|
||||
amqp "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
// Mock of amqpChannel interface
|
||||
type MockamqpChannel struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *_MockamqpChannelRecorder
|
||||
}
|
||||
|
||||
// Recorder for MockamqpChannel (not exported)
|
||||
type _MockamqpChannelRecorder struct {
|
||||
mock *MockamqpChannel
|
||||
}
|
||||
|
||||
func NewMockamqpChannel(ctrl *gomock.Controller) *MockamqpChannel {
|
||||
mock := &MockamqpChannel{ctrl: ctrl}
|
||||
mock.recorder = &_MockamqpChannelRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
func (_m *MockamqpChannel) EXPECT() *_MockamqpChannelRecorder {
|
||||
return _m.recorder
|
||||
}
|
||||
|
||||
func (_m *MockamqpChannel) Cancel(consumer string, noWait bool) error {
|
||||
ret := _m.ctrl.Call(_m, "Cancel", consumer, noWait)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
func (_mr *_MockamqpChannelRecorder) Cancel(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "Cancel", arg0, arg1)
|
||||
}
|
||||
|
||||
func (_m *MockamqpChannel) Consume(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
|
||||
ret := _m.ctrl.Call(_m, "Consume", queue, consumer, autoAck, exclusive, noLocal, noWait, args)
|
||||
ret0, _ := ret[0].(<-chan amqp.Delivery)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
func (_mr *_MockamqpChannelRecorder) Consume(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "Consume", arg0, arg1, arg2, arg3, arg4, arg5, arg6)
|
||||
}
|
||||
|
||||
func (_m *MockamqpChannel) NotifyClose(c chan *amqp.Error) chan *amqp.Error {
|
||||
ret := _m.ctrl.Call(_m, "NotifyClose", c)
|
||||
ret0, _ := ret[0].(chan *amqp.Error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
func (_mr *_MockamqpChannelRecorder) NotifyClose(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "NotifyClose", arg0)
|
||||
}
|
||||
|
||||
func (_m *MockamqpChannel) Publish(exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error {
|
||||
ret := _m.ctrl.Call(_m, "Publish", exchange, key, mandatory, immediate, msg)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
func (_mr *_MockamqpChannelRecorder) Publish(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "Publish", arg0, arg1, arg2, arg3, arg4)
|
||||
}
|
||||
|
||||
func (_m *MockamqpChannel) QueueBind(name string, key string, exchange string, noWait bool, args amqp.Table) error {
|
||||
ret := _m.ctrl.Call(_m, "QueueBind", name, key, exchange, noWait, args)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
func (_mr *_MockamqpChannelRecorder) QueueBind(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "QueueBind", arg0, arg1, arg2, arg3, arg4)
|
||||
}
|
||||
|
||||
func (_m *MockamqpChannel) QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table) (amqp.Queue, error) {
|
||||
ret := _m.ctrl.Call(_m, "QueueDeclare", name, durable, autoDelete, exclusive, noWait, args)
|
||||
ret0, _ := ret[0].(amqp.Queue)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
func (_mr *_MockamqpChannelRecorder) QueueDeclare(arg0, arg1, arg2, arg3, arg4, arg5 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "QueueDeclare", arg0, arg1, arg2, arg3, arg4, arg5)
|
||||
}
|
|
@ -12,7 +12,6 @@ import (
|
|||
// Client describes the functions an RPC Client performs
|
||||
type Client interface {
|
||||
SetTimeout(time.Duration)
|
||||
Dispatch(string, []byte) chan []byte
|
||||
DispatchSync(string, []byte) ([]byte, error)
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIIC+zCCAeOgAwIBAgIJAJ5ig3+Bc76gMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV
|
||||
BAMMCWFtcXAgY2VydDAeFw0xNTEwMjcwMTQwMTdaFw0yMDEwMjUwMTQwMTdaMBQx
|
||||
EjAQBgNVBAMMCWFtcXAgY2VydDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
|
||||
ggEBANsa6RjxW2s1jvtvmYG3BOokEQ9wM5W+56mR5NQ0tbdSgoDde1aEArlWcw9x
|
||||
0JZkuP3v3rDavygdlt1BoychfaFHDXpyYa7ROKn4iHyn89wysyEW0gMgR3JsC2r5
|
||||
5exH90G6gAi8DjGidR3I03IsyeiiyLXGgia8kJ/PB/wv46ll4Cx2PcUNWtMSoum+
|
||||
W7ueya4TA9c595pZtoS3JMbQrg83xlPvRL5DMg8/WAcDOMaDgNESoo1bOpFzsh/o
|
||||
buz8VMfb5Danjf2sWxwLheC9MBRpJUWE9mcik2CdhnCy1guPsvcSgrb6D9+gJW1Q
|
||||
r2AoCUisB3fggDj497kxxndT1jkCAwEAAaNQME4wHQYDVR0OBBYEFN8xiwv5cXm3
|
||||
Gid3eDOwSCBs7vR9MB8GA1UdIwQYMBaAFN8xiwv5cXm3Gid3eDOwSCBs7vR9MAwG
|
||||
A1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAC9OwoViHFVbcrY/EqVsy1D1
|
||||
PyFeCD0xADmn8jRtJNm9zTnNnIQxa+TaLcliGqtVMUld0inAbj7/ZjAhjzKgbaC3
|
||||
rRZvcSaHlKwLemtswjXYlUdKDd6BITKOEHKw2VTeZxBqeSapYADYSQUr46PzZu44
|
||||
nQmqVEaB2mLZIRDiSe2LVQh+WgZ9YKP/k2Wy4G8NJFzXqEycf2vKmWIU6X8hyTvO
|
||||
SVTd1I/MKtGq+JALwaJTa2UGX80wMKIEEBLBM9cJht1CnceedWROy3ZPcrv9+D49
|
||||
B/ZfduX+tVg7BI7sw+m8lzBexrFqYneJLNRwXUaTER66dmfjwb2Y2Qg7MA7pCSo=
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,28 @@
|
|||
-----BEGIN PRIVATE KEY-----
|
||||
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDbGukY8VtrNY77
|
||||
b5mBtwTqJBEPcDOVvuepkeTUNLW3UoKA3XtWhAK5VnMPcdCWZLj9796w2r8oHZbd
|
||||
QaMnIX2hRw16cmGu0Tip+Ih8p/PcMrMhFtIDIEdybAtq+eXsR/dBuoAIvA4xonUd
|
||||
yNNyLMnoosi1xoImvJCfzwf8L+OpZeAsdj3FDVrTEqLpvlu7nsmuEwPXOfeaWbaE
|
||||
tyTG0K4PN8ZT70S+QzIPP1gHAzjGg4DREqKNWzqRc7If6G7s/FTH2+Q2p439rFsc
|
||||
C4XgvTAUaSVFhPZnIpNgnYZwstYLj7L3EoK2+g/foCVtUK9gKAlIrAd34IA4+Pe5
|
||||
McZ3U9Y5AgMBAAECggEAMwqC0KexFzmD1Hj79qbPA0hhnQoTSkgNqYfuGa0pw8n5
|
||||
0tFFoTLhe5Fy/l8SK2bNXYKaivJ6Y3r5IRPgUQZRQNfPgP3SWaotnBLkoL1zZpF8
|
||||
/zFUvRuw6SvYQRR0BmYBaJNsrhw45kElLUoST9+1LXd2cDoNKb71pfTRtGLnkxdV
|
||||
F84i87+nC4vLU0ZKkWOzMZMIYqtemrC73BC0+mvXte+P6zbWeSiC7tpyxGyj5pMS
|
||||
WQ4TI0xDfZQRVtIVBXKplgvKb2fZ0DIHv7RJBWPSaajmzLAV3pxnmik+IfMLrxjk
|
||||
/x0eXf/F/E6Y2Ejq05NudFxbPnGCfEl9Cg4FvEr0LQKBgQDugTds77FAMr/IYCJO
|
||||
iweo6dC/47EYinrkrdj2kgxWuevjNo5jfpuksj0qA+w1EKImDVIh6dyXWKVwIVAg
|
||||
58b09RD2C6Mvt+762n6JKbn8LBxczk8fxcWYv7rMRwbMcj5E7luAhBKbF+M4rTWt
|
||||
5Ko0nI6hgqFn9JPsS3clZ+hK2wKBgQDrLWVv3WpGzp+R/0dGMLQGzgw0mL0u+msP
|
||||
Q7v4/51i7HlQjDhj0pgsms9HIgmCudaKVXosZ+CEXv6I6LjfCiB7Brv3DUhCTWN/
|
||||
2g6PQfCLBuE9rqXbez6XaxiMVQPKphGop88/USKDlc8uzWPPuQ9cg9L0QZifEfCM
|
||||
7+H4iNxNewKBgAp3aTNCoYirsXbdoSPJPiDPgfWpzE/DY/k9F9RaPGhh6FQkRMNg
|
||||
/vuPRtfdLDR38mWxF/WdCa0qmrf8/kMzaKu/RWtGv3aMn9QqWnsydZL3bJc2Bori
|
||||
ZvV5FH16cHXwXYMw4psVMKvVtIb8Muraqg19AVVdIjApr4QjG6tsj+kZAoGBAJZc
|
||||
PueHq8Q7pQAmM76nsuJK6LjUEtivWLW4u7zWSR0PTfz2ubLw2URjcjTriSMgiA+H
|
||||
2QX9ICnhxmFoUZKgmeWuh4zL3DAv5HbAxuBG63En4+iY+gfaw0jyOw616Cevh8jK
|
||||
CZJU4Hk61ez5emA71Jt02PI74kWJpb+mO1a4wglzAoGAHP8Z7QnyISl/q8zfmKhG
|
||||
jLzE7njHZfvRV/Q2W1x9f2Pu+VfXNAQLoqEH/q8HTw8tm3p84VJPSl1pfyTNrQdw
|
||||
W3h+siTD9PyNTXZWEtuOEC23BfAqzU6PMbj4rVIb9goYYYwEpkAC7GrrvwFoZcTl
|
||||
9f6SVG7zUFVrbyaYc6jEbPU=
|
||||
-----END PRIVATE KEY-----
|
|
@ -228,6 +228,8 @@ tempdir = tempfile.mkdtemp()
|
|||
if not startservers.start(race_detection=True):
|
||||
die(ExitStatus.Error)
|
||||
run_node_test()
|
||||
# Simulate a disconnection from RabbitMQ to make sure reconnects work.
|
||||
startservers.bounce_forward()
|
||||
run_client_tests()
|
||||
if not startservers.check():
|
||||
die(ExitStatus.Error)
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
},
|
||||
|
||||
"amqp": {
|
||||
"server": "amqp://guest:guest@localhost:5672",
|
||||
"server": "amqp://guest:guest@localhost:5673",
|
||||
"insecure": true,
|
||||
"-uncomment_for_AMQPS-tls": {
|
||||
"cacertfile": "/etc/boulder/rabbitmq-cacert.pem",
|
||||
|
|
|
@ -58,6 +58,7 @@ def start(race_detection):
|
|||
up explicitly by calling stop(), or automatically atexit.
|
||||
"""
|
||||
global processes
|
||||
forward()
|
||||
t = ToSServerThread()
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
@ -119,6 +120,19 @@ def start(race_detection):
|
|||
print "All servers running. Hit ^C to kill."
|
||||
return True
|
||||
|
||||
def forward():
|
||||
"""Add a TCP forwarder between Boulder and RabbitMQ to simulate failures."""
|
||||
cmd = """exec listenbuddy -listen :5673 -speak localhost:5672"""
|
||||
p = subprocess.Popen(cmd, shell=True)
|
||||
p.cmd = cmd
|
||||
print('started %s with pid %d' % (p.cmd, p.pid))
|
||||
global processes
|
||||
processes.insert(0, p)
|
||||
|
||||
def bounce_forward():
|
||||
"""Kill all forwarded TCP connections."""
|
||||
global processes
|
||||
processes[0].send_signal(signal.SIGUSR1)
|
||||
|
||||
def check():
|
||||
"""Return true if all started processes are still alive.
|
||||
|
|
|
@ -19,7 +19,8 @@ travis_retry go get \
|
|||
github.com/golang/lint/golint \
|
||||
github.com/mattn/goveralls \
|
||||
github.com/modocache/gover \
|
||||
github.com/jcjones/github-pr-status &
|
||||
github.com/jcjones/github-pr-status \
|
||||
github.com/jsha/listenbuddy &
|
||||
|
||||
(wget https://github.com/jsha/boulder-tools/raw/master/goose.gz &&
|
||||
mkdir $GOPATH/bin &&
|
||||
|
|
Loading…
Reference in New Issue