Add the NoEndpoints message to the Destination API (#564)

Have the controller tell the client whether the service exists, not
just what are available. This way we can implement fallback logic to
alternate service discovery mechanisms for ambigious names.

Signed-off-by: Brian Smith <brian@briansmith.org>
Signed-off-by: Kevin Lingerfelt <kl@buoyant.io>
This commit is contained in:
Brian Smith 2018-03-27 10:45:41 -10:00 committed by GitHub
parent e7aa3d4105
commit 7dc21f9588
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 304 additions and 157 deletions

View File

@ -281,6 +281,17 @@ func (listener endpointListener) Update(add []common.TcpAddress, remove []common
}
}
func (listener endpointListener) NoEndpoints(exists bool) {
update := &pb.Update{
Update: &pb.Update_NoEndpoints{
NoEndpoints: &pb.NoEndpoints{
Exists: exists,
},
},
}
listener.stream.Send(update)
}
func toWeightedAddrSet(endpoints []common.TcpAddress) *pb.WeightedAddrSet {
addrs := make([]*pb.WeightedAddr, 0)
for i := range endpoints {

View File

@ -131,9 +131,7 @@ func (m *HttpMethod) String() string { return proto.CompactTextString
func (*HttpMethod) ProtoMessage() {}
func (*HttpMethod) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type isHttpMethod_Type interface {
isHttpMethod_Type()
}
type isHttpMethod_Type interface{ isHttpMethod_Type() }
type HttpMethod_Registered_ struct {
Registered HttpMethod_Registered `protobuf:"varint,1,opt,name=registered,enum=conduit.common.HttpMethod_Registered,oneof"`
@ -243,9 +241,7 @@ func (m *Scheme) String() string { return proto.CompactTextString(m)
func (*Scheme) ProtoMessage() {}
func (*Scheme) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
type isScheme_Type interface {
isScheme_Type()
}
type isScheme_Type interface{ isScheme_Type() }
type Scheme_Registered_ struct {
Registered Scheme_Registered `protobuf:"varint,1,opt,name=registered,enum=conduit.common.Scheme_Registered,oneof"`
@ -355,9 +351,7 @@ func (m *IPAddress) String() string { return proto.CompactTextString(
func (*IPAddress) ProtoMessage() {}
func (*IPAddress) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
type isIPAddress_Ip interface {
isIPAddress_Ip()
}
type isIPAddress_Ip interface{ isIPAddress_Ip() }
type IPAddress_Ipv4 struct {
Ipv4 uint32 `protobuf:"fixed32,1,opt,name=ipv4,oneof"`
@ -543,9 +537,7 @@ func (m *Eos) String() string { return proto.CompactTextString(m) }
func (*Eos) ProtoMessage() {}
func (*Eos) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
type isEos_End interface {
isEos_End()
}
type isEos_End interface{ isEos_End() }
type Eos_GrpcStatusCode struct {
GrpcStatusCode uint32 `protobuf:"varint,1,opt,name=grpc_status_code,json=grpcStatusCode,oneof"`
@ -655,9 +647,7 @@ func (m *TapEvent) String() string { return proto.CompactTextString(m
func (*TapEvent) ProtoMessage() {}
func (*TapEvent) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
type isTapEvent_Event interface {
isTapEvent_Event()
}
type isTapEvent_Event interface{ isTapEvent_Event() }
type TapEvent_Http_ struct {
Http *TapEvent_Http `protobuf:"bytes,3,opt,name=http,oneof"`
@ -761,9 +751,7 @@ func (m *TapEvent_Http) String() string { return proto.CompactTextStr
func (*TapEvent_Http) ProtoMessage() {}
func (*TapEvent_Http) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7, 0} }
type isTapEvent_Http_Event interface {
isTapEvent_Http_Event()
}
type isTapEvent_Http_Event interface{ isTapEvent_Http_Event() }
type TapEvent_Http_RequestInit_ struct {
RequestInit *TapEvent_Http_RequestInit `protobuf:"bytes,1,opt,name=request_init,json=requestInit,oneof"`

View File

@ -38,10 +38,15 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type QueryRequest struct {
Query string `protobuf:"bytes,1,opt,name=query" json:"query,omitempty"`
StartMs int64 `protobuf:"varint,2,opt,name=start_ms,json=startMs" json:"start_ms,omitempty"`
EndMs int64 `protobuf:"varint,3,opt,name=end_ms,json=endMs" json:"end_ms,omitempty"`
Step string `protobuf:"bytes,4,opt,name=step" json:"step,omitempty"`
// required
Query string `protobuf:"bytes,1,opt,name=query" json:"query,omitempty"`
// required for timeseries queries
StartMs int64 `protobuf:"varint,2,opt,name=start_ms,json=startMs" json:"start_ms,omitempty"`
// required for timeseries queries
// optional for single data point, but if unset, results will have non-deterministic timestamps
EndMs int64 `protobuf:"varint,3,opt,name=end_ms,json=endMs" json:"end_ms,omitempty"`
// required for timeseries queries
Step string `protobuf:"bytes,4,opt,name=step" json:"step,omitempty"`
}
func (m *QueryRequest) Reset() { *m = QueryRequest{} }

View File

@ -12,6 +12,7 @@ It has these top-level messages:
AddrSet
WeightedAddrSet
WeightedAddr
NoEndpoints
*/
package conduit_proxy_destination
@ -40,6 +41,7 @@ type Update struct {
// Types that are valid to be assigned to Update:
// *Update_Add
// *Update_Remove
// *Update_NoEndpoints
Update isUpdate_Update `protobuf_oneof:"update"`
}
@ -48,9 +50,7 @@ func (m *Update) String() string { return proto.CompactTextString(m)
func (*Update) ProtoMessage() {}
func (*Update) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type isUpdate_Update interface {
isUpdate_Update()
}
type isUpdate_Update interface{ isUpdate_Update() }
type Update_Add struct {
Add *WeightedAddrSet `protobuf:"bytes,1,opt,name=add,oneof"`
@ -58,9 +58,13 @@ type Update_Add struct {
type Update_Remove struct {
Remove *AddrSet `protobuf:"bytes,2,opt,name=remove,oneof"`
}
type Update_NoEndpoints struct {
NoEndpoints *NoEndpoints `protobuf:"bytes,3,opt,name=no_endpoints,json=noEndpoints,oneof"`
}
func (*Update_Add) isUpdate_Update() {}
func (*Update_Remove) isUpdate_Update() {}
func (*Update_Add) isUpdate_Update() {}
func (*Update_Remove) isUpdate_Update() {}
func (*Update_NoEndpoints) isUpdate_Update() {}
func (m *Update) GetUpdate() isUpdate_Update {
if m != nil {
@ -83,11 +87,19 @@ func (m *Update) GetRemove() *AddrSet {
return nil
}
func (m *Update) GetNoEndpoints() *NoEndpoints {
if x, ok := m.GetUpdate().(*Update_NoEndpoints); ok {
return x.NoEndpoints
}
return nil
}
// XXX_OneofFuncs is for the internal use of the proto package.
func (*Update) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _Update_OneofMarshaler, _Update_OneofUnmarshaler, _Update_OneofSizer, []interface{}{
(*Update_Add)(nil),
(*Update_Remove)(nil),
(*Update_NoEndpoints)(nil),
}
}
@ -105,6 +117,11 @@ func _Update_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
if err := b.EncodeMessage(x.Remove); err != nil {
return err
}
case *Update_NoEndpoints:
b.EncodeVarint(3<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.NoEndpoints); err != nil {
return err
}
case nil:
default:
return fmt.Errorf("Update.Update has unexpected type %T", x)
@ -131,6 +148,14 @@ func _Update_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer)
err := b.DecodeMessage(msg)
m.Update = &Update_Remove{msg}
return true, err
case 3: // update.no_endpoints
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
msg := new(NoEndpoints)
err := b.DecodeMessage(msg)
m.Update = &Update_NoEndpoints{msg}
return true, err
default:
return false, nil
}
@ -150,6 +175,11 @@ func _Update_OneofSizer(msg proto.Message) (n int) {
n += proto.SizeVarint(2<<3 | proto.WireBytes)
n += proto.SizeVarint(uint64(s))
n += s
case *Update_NoEndpoints:
s := proto.Size(x.NoEndpoints)
n += proto.SizeVarint(3<<3 | proto.WireBytes)
n += proto.SizeVarint(uint64(s))
n += s
case nil:
default:
panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
@ -213,11 +243,28 @@ func (m *WeightedAddr) GetWeight() uint32 {
return 0
}
type NoEndpoints struct {
Exists bool `protobuf:"varint,1,opt,name=exists" json:"exists,omitempty"`
}
func (m *NoEndpoints) Reset() { *m = NoEndpoints{} }
func (m *NoEndpoints) String() string { return proto.CompactTextString(m) }
func (*NoEndpoints) ProtoMessage() {}
func (*NoEndpoints) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *NoEndpoints) GetExists() bool {
if m != nil {
return m.Exists
}
return false
}
func init() {
proto.RegisterType((*Update)(nil), "conduit.proxy.destination.Update")
proto.RegisterType((*AddrSet)(nil), "conduit.proxy.destination.AddrSet")
proto.RegisterType((*WeightedAddrSet)(nil), "conduit.proxy.destination.WeightedAddrSet")
proto.RegisterType((*WeightedAddr)(nil), "conduit.proxy.destination.WeightedAddr")
proto.RegisterType((*NoEndpoints)(nil), "conduit.proxy.destination.NoEndpoints")
}
// Reference imports to suppress errors if they are not otherwise used.
@ -326,23 +373,26 @@ var _Destination_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("proxy/destination/destination.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 281 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0x4b, 0x4b, 0xc4, 0x30,
0x14, 0x85, 0x27, 0x56, 0xa3, 0xdc, 0x2a, 0x42, 0x04, 0xa9, 0x75, 0x53, 0xe3, 0xc2, 0xe2, 0x22,
0x33, 0xd4, 0xa5, 0x0f, 0x50, 0x04, 0x75, 0x27, 0xf5, 0xb9, 0xad, 0xbd, 0x41, 0xbb, 0x68, 0x53,
0xd2, 0x8c, 0x8f, 0x7f, 0xe1, 0x4f, 0x96, 0xa6, 0xd1, 0xa9, 0x03, 0x16, 0x57, 0x79, 0xdc, 0x73,
0xbe, 0x9c, 0x9b, 0x0b, 0xbb, 0xb5, 0x56, 0xef, 0x1f, 0x63, 0x94, 0x8d, 0x29, 0xaa, 0xcc, 0x14,
0xaa, 0xea, 0xef, 0x45, 0xad, 0x95, 0x51, 0x6c, 0x2b, 0x57, 0x15, 0x4e, 0x0b, 0x23, 0xac, 0x58,
0xf4, 0x04, 0xe1, 0x46, 0xae, 0xca, 0x52, 0x55, 0xe3, 0x6e, 0xe9, 0xf4, 0xfc, 0x93, 0x00, 0xbd,
0xab, 0x31, 0x33, 0x92, 0x9d, 0x80, 0x97, 0x21, 0x06, 0x24, 0x22, 0xb1, 0x9f, 0xec, 0x8b, 0x3f,
0x41, 0xe2, 0x41, 0x16, 0xcf, 0x2f, 0x46, 0xe2, 0x29, 0xa2, 0xbe, 0x91, 0xe6, 0x72, 0x94, 0xb6,
0x46, 0x76, 0x04, 0x54, 0xcb, 0x52, 0xbd, 0xca, 0x60, 0xc1, 0x22, 0xf8, 0x00, 0x62, 0x66, 0x75,
0x9e, 0xb3, 0x15, 0xa0, 0x53, 0x9b, 0x83, 0x1f, 0xc2, 0xb2, 0x2b, 0xb3, 0x09, 0x2c, 0x65, 0x88,
0xba, 0x09, 0x48, 0xe4, 0xc5, 0x7e, 0x12, 0xfe, 0x10, 0x5d, 0x0f, 0xb7, 0x79, 0xdd, 0x4a, 0x65,
0xd3, 0xa4, 0x9d, 0x90, 0x5f, 0xc3, 0xfa, 0x5c, 0x3c, 0x76, 0xfc, 0x1b, 0xb2, 0xf7, 0xcf, 0xce,
0xbe, 0x89, 0xf7, 0xb0, 0xda, 0xbf, 0x66, 0x02, 0x16, 0xdb, 0x82, 0xfb, 0xa7, 0xa1, 0x48, 0x56,
0xc7, 0x36, 0x81, 0xbe, 0x59, 0x7f, 0xe0, 0x45, 0x24, 0x5e, 0x4b, 0xdd, 0x29, 0x79, 0x04, 0xff,
0x7c, 0xf6, 0x34, 0xbb, 0x02, 0xef, 0x42, 0x1a, 0xb6, 0x3d, 0xcf, 0xeb, 0x69, 0xc2, 0x9d, 0x81,
0xe8, 0xdd, 0x10, 0xf9, 0x68, 0x42, 0x9e, 0xa8, 0x1d, 0xed, 0xc1, 0x57, 0x00, 0x00, 0x00, 0xff,
0xff, 0xf8, 0xcd, 0x94, 0x88, 0x31, 0x02, 0x00, 0x00,
// 327 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0x5d, 0x4b, 0xf3, 0x30,
0x14, 0xc7, 0xd7, 0xa7, 0x8f, 0x75, 0x9c, 0x4e, 0x84, 0x08, 0x52, 0xeb, 0xcd, 0x8c, 0xa8, 0xc3,
0x8b, 0x6c, 0xd4, 0x4b, 0x5f, 0x40, 0x51, 0x9c, 0x08, 0x22, 0xf5, 0xf5, 0x4e, 0x6a, 0x13, 0x34,
0x17, 0x4d, 0x4a, 0x93, 0xe9, 0xfc, 0xb0, 0x7e, 0x17, 0x69, 0x9a, 0xd9, 0x3a, 0xb0, 0x78, 0xd5,
0x9e, 0xf6, 0x77, 0x7e, 0xc9, 0xf9, 0x73, 0x60, 0x33, 0x2f, 0xe4, 0xf4, 0x63, 0x48, 0x99, 0xd2,
0x5c, 0x24, 0x9a, 0x4b, 0xd1, 0x7c, 0x27, 0x79, 0x21, 0xb5, 0x44, 0x6b, 0xa9, 0x14, 0x74, 0xc2,
0x35, 0x31, 0x30, 0x69, 0x00, 0xe1, 0x4a, 0x2a, 0xb3, 0x4c, 0x8a, 0x61, 0xf5, 0xa8, 0x78, 0xfc,
0xe9, 0x80, 0x77, 0x97, 0xd3, 0x44, 0x33, 0x74, 0x04, 0x6e, 0x42, 0x69, 0xe0, 0xf4, 0x9d, 0x81,
0x1f, 0xed, 0x92, 0x5f, 0x45, 0xe4, 0x81, 0xf1, 0x97, 0x57, 0xcd, 0xe8, 0x31, 0xa5, 0xc5, 0x0d,
0xd3, 0xe3, 0x4e, 0x5c, 0x36, 0xa2, 0x03, 0xf0, 0x0a, 0x96, 0xc9, 0x37, 0x16, 0xfc, 0x33, 0x0a,
0xdc, 0xa2, 0xa8, 0x5b, 0x6d, 0x0f, 0xba, 0x84, 0x9e, 0x90, 0x4f, 0x4c, 0xd0, 0x5c, 0x72, 0xa1,
0x55, 0xe0, 0x1a, 0xc7, 0x76, 0x8b, 0xe3, 0x4a, 0x9e, 0xcd, 0xe8, 0x71, 0x27, 0xf6, 0x45, 0x5d,
0x9e, 0x74, 0xc1, 0x9b, 0x98, 0xa1, 0xf0, 0x3e, 0x2c, 0xda, 0xb3, 0xd0, 0x08, 0x16, 0x12, 0x4a,
0x0b, 0x15, 0x38, 0x7d, 0x77, 0xe0, 0x47, 0xe1, 0xb7, 0xda, 0x06, 0x72, 0x9b, 0xe6, 0x25, 0xca,
0x94, 0x8a, 0x2b, 0x10, 0x5f, 0xc3, 0xf2, 0xdc, 0xac, 0xe8, 0xf0, 0xa7, 0x64, 0xe7, 0x8f, 0x31,
0xcd, 0x8c, 0xf7, 0xd0, 0x6b, 0x7e, 0x46, 0x04, 0xfe, 0x97, 0x3f, 0x6c, 0xe8, 0x6d, 0x57, 0x32,
0x1c, 0x5a, 0x05, 0xef, 0xdd, 0xf4, 0x9b, 0x7c, 0x96, 0x62, 0x5b, 0xe1, 0x2d, 0xf0, 0x1b, 0x71,
0x94, 0x18, 0x9b, 0x72, 0xa5, 0x95, 0x11, 0x77, 0x63, 0x5b, 0x45, 0x8f, 0xe0, 0x9f, 0xd6, 0x37,
0x44, 0x17, 0xe0, 0x9e, 0x33, 0x8d, 0xd6, 0xe7, 0x8f, 0x6d, 0x30, 0xe1, 0x46, 0xcb, 0x84, 0xd5,
0xe2, 0xe0, 0xce, 0xc8, 0x79, 0xf6, 0xcc, 0x3a, 0xed, 0x7d, 0x05, 0x00, 0x00, 0xff, 0xff, 0x8d,
0x6e, 0xee, 0x2b, 0xa5, 0x02, 0x00, 0x00,
}

View File

@ -75,9 +75,7 @@ func (m *ObserveRequest_Match) String() string { return proto.Compact
func (*ObserveRequest_Match) ProtoMessage() {}
func (*ObserveRequest_Match) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 0} }
type isObserveRequest_Match_Match interface {
isObserveRequest_Match_Match()
}
type isObserveRequest_Match_Match interface{ isObserveRequest_Match_Match() }
type ObserveRequest_Match_All struct {
All *ObserveRequest_Match_Seq `protobuf:"bytes,1,opt,name=all,oneof"`
@ -332,9 +330,7 @@ func (m *ObserveRequest_Match_Tcp) String() string { return proto.Com
func (*ObserveRequest_Match_Tcp) ProtoMessage() {}
func (*ObserveRequest_Match_Tcp) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 0, 1} }
type isObserveRequest_Match_Tcp_Match interface {
isObserveRequest_Match_Tcp_Match()
}
type isObserveRequest_Match_Tcp_Match interface{ isObserveRequest_Match_Tcp_Match() }
type ObserveRequest_Match_Tcp_Netmask_ struct {
Netmask *ObserveRequest_Match_Tcp_Netmask `protobuf:"bytes,1,opt,name=netmask,oneof"`
@ -511,9 +507,7 @@ func (m *ObserveRequest_Match_Http) String() string { return proto.Co
func (*ObserveRequest_Match_Http) ProtoMessage() {}
func (*ObserveRequest_Match_Http) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 0, 2} }
type isObserveRequest_Match_Http_Match interface {
isObserveRequest_Match_Http_Match()
}
type isObserveRequest_Match_Http_Match interface{ isObserveRequest_Match_Http_Match() }
type ObserveRequest_Match_Http_Scheme struct {
Scheme *conduit_common.Scheme `protobuf:"bytes,1,opt,name=scheme,oneof"`
@ -694,9 +688,7 @@ func (*ObserveRequest_Match_Http_StringMatch) Descriptor() ([]byte, []int) {
return fileDescriptor0, []int{0, 0, 2, 0}
}
type isObserveRequest_Match_Http_StringMatch_Match interface {
isObserveRequest_Match_Http_StringMatch_Match()
}
type isObserveRequest_Match_Http_StringMatch_Match interface{ isObserveRequest_Match_Http_StringMatch_Match() }
type ObserveRequest_Match_Http_StringMatch_Exact struct {
Exact string `protobuf:"bytes,1,opt,name=exact,oneof"`

View File

@ -419,9 +419,7 @@ func (m *EosCtx) String() string { return proto.CompactTextString(m)
func (*EosCtx) ProtoMessage() {}
func (*EosCtx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} }
type isEosCtx_End interface {
isEosCtx_End()
}
type isEosCtx_End interface{ isEosCtx_End() }
type EosCtx_GrpcStatusCode struct {
GrpcStatusCode uint32 `protobuf:"varint,1,opt,name=grpc_status_code,json=grpcStatusCode,oneof"`

View File

@ -206,9 +206,7 @@ func (m *MetricValue) String() string { return proto.CompactTextStrin
func (*MetricValue) ProtoMessage() {}
func (*MetricValue) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
type isMetricValue_Value interface {
isMetricValue_Value()
}
type isMetricValue_Value interface{ isMetricValue_Value() }
type MetricValue_Counter struct {
Counter int64 `protobuf:"varint,1,opt,name=counter,oneof"`
@ -637,9 +635,7 @@ func (m *TapRequest) String() string { return proto.CompactTextString
func (*TapRequest) ProtoMessage() {}
func (*TapRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} }
type isTapRequest_Target interface {
isTapRequest_Target()
}
type isTapRequest_Target interface{ isTapRequest_Target() }
type TapRequest_Pod struct {
Pod string `protobuf:"bytes,1,opt,name=pod,oneof"`

View File

@ -22,6 +22,7 @@ const (
type EndpointsListener interface {
Update(add []common.TcpAddress, remove []common.TcpAddress)
NoEndpoints(exists bool)
}
/// EndpointsWatcher ///
@ -93,7 +94,8 @@ func (e *EndpointsWatcher) Subscribe(service string, port uint32, listener Endpo
(*svc)[port] = svcPort
}
svcPort.subscribe(listener)
_, exists, _ := e.GetService(service)
svcPort.subscribe(exists, listener)
return nil
}
@ -119,7 +121,7 @@ func (e *EndpointsWatcher) Unsubscribe(service string, port uint32, listener End
}
func (e *EndpointsWatcher) GetService(service string) (*v1.Service, bool, error) {
obj, exists, err := (*e.serviceInformer.store).GetByKey(service)
obj, exists, err := e.serviceInformer.store.GetByKey(service)
if err != nil || !exists {
return nil, exists, err
}
@ -131,7 +133,7 @@ func (e *EndpointsWatcher) GetService(service string) (*v1.Service, bool, error)
// Watches a Kubernetes resource type
type informer struct {
informer cache.Controller
store *cache.Store
store cache.Store
stopCh chan struct{}
}
@ -215,7 +217,7 @@ func newEndpointInformer(clientset *kubernetes.Clientset, servicePorts *map[stri
return informer{
informer: inf,
store: &store,
store: store,
stopCh: stopCh,
}
}
@ -245,22 +247,6 @@ func newServiceInformer(clientset *kubernetes.Clientset, servicePorts *map[strin
}
mutex.RUnlock()
},
DeleteFunc: func(obj interface{}) {
service := obj.(*v1.Service)
if service.Namespace == kubeSystem {
return
}
id := service.Namespace + "/" + service.Name
mutex.RLock()
svc, ok := (*servicePorts)[id]
if ok {
for _, sp := range *svc {
sp.deleteService()
}
}
mutex.RUnlock()
},
UpdateFunc: func(oldObj, newObj interface{}) {
service := newObj.(*v1.Service)
if service.Namespace == kubeSystem {
@ -284,7 +270,7 @@ func newServiceInformer(clientset *kubernetes.Clientset, servicePorts *map[strin
return informer{
informer: inf,
store: &store,
store: store,
stopCh: stopCh,
}
}
@ -312,7 +298,7 @@ type servicePort struct {
func newServicePort(service string, port uint32, e *EndpointsWatcher) (*servicePort, error) {
endpoints := &v1.Endpoints{}
obj, exists, err := (*e.endpointInformer.store).GetByKey(service)
obj, exists, err := e.endpointInformer.store.GetByKey(service)
if err != nil {
return nil, err
}
@ -322,7 +308,7 @@ func newServicePort(service string, port uint32, e *EndpointsWatcher) (*serviceP
// Use the service port as the target port by default.
targetPort := intstr.FromInt(int(port))
obj, exists, err = (*e.serviceInformer.store).GetByKey(service)
obj, exists, err = e.serviceInformer.store.GetByKey(service)
if err != nil {
return nil, err
}
@ -354,16 +340,8 @@ func (sp *servicePort) updateEndpoints(newEndpoints *v1.Endpoints) {
defer sp.mutex.Unlock()
newAddresses := addresses(newEndpoints, sp.targetPort)
log.Debugf("Updating %s:%d to %s", sp.service, sp.port, util.AddressesToString(newAddresses))
add, remove := util.DiffAddresses(sp.addresses, newAddresses)
for _, listener := range sp.listeners {
listener.Update(add, remove)
}
sp.updateAddresses(newAddresses)
sp.endpoints = newEndpoints
sp.addresses = newAddresses
}
func (sp *servicePort) deleteEndpoints() {
@ -373,7 +351,7 @@ func (sp *servicePort) deleteEndpoints() {
log.Debugf("Deleting %s:%d", sp.service, sp.port)
for _, listener := range sp.listeners {
listener.Update(nil, sp.addresses)
listener.NoEndpoints(false)
}
sp.endpoints = &v1.Endpoints{}
sp.addresses = []common.TcpAddress{}
@ -395,43 +373,38 @@ func (sp *servicePort) updateService(newService *v1.Service) {
}
if newTargetPort != sp.targetPort {
newAddresses := addresses(sp.endpoints, newTargetPort)
sp.updateAddresses(newAddresses)
sp.targetPort = newTargetPort
}
}
log.Debugf("Updating %s:%d to %s", sp.service, sp.port, util.AddressesToString(newAddresses))
func (sp *servicePort) updateAddresses(newAddresses []common.TcpAddress) {
log.Debugf("Updating %s:%d to %s", sp.service, sp.port, util.AddressesToString(newAddresses))
if len(newAddresses) == 0 {
for _, listener := range sp.listeners {
listener.NoEndpoints(true)
}
} else {
add, remove := util.DiffAddresses(sp.addresses, newAddresses)
for _, listener := range sp.listeners {
listener.Update(add, remove)
}
sp.targetPort = newTargetPort
sp.addresses = newAddresses
}
sp.addresses = newAddresses
}
func (sp *servicePort) deleteService() {
sp.mutex.Lock()
defer sp.mutex.Unlock()
newTargetPort := intstr.FromInt(int(sp.port))
if newTargetPort != sp.targetPort {
newAddresses := addresses(sp.endpoints, newTargetPort)
log.Debugf("Updating %s:%d to %s", sp.service, sp.port, util.AddressesToString(newAddresses))
add, remove := util.DiffAddresses(sp.addresses, newAddresses)
for _, listener := range sp.listeners {
listener.Update(add, remove)
}
sp.targetPort = newTargetPort
sp.addresses = newAddresses
}
}
func (sp *servicePort) subscribe(listener EndpointsListener) {
func (sp *servicePort) subscribe(exists bool, listener EndpointsListener) {
sp.mutex.Lock()
defer sp.mutex.Unlock()
sp.listeners = append(sp.listeners, listener)
listener.Update(sp.addresses, nil)
if !exists {
listener.NoEndpoints(false)
} else if len(sp.addresses) == 0 {
listener.NoEndpoints(true)
} else {
listener.Update(sp.addresses, nil)
}
}
// true iff the listener was found and removed

View File

@ -9,6 +9,7 @@ import (
"github.com/runconduit/conduit/controller/destination"
common "github.com/runconduit/conduit/controller/gen/common"
pb "github.com/runconduit/conduit/controller/gen/proxy/destination"
"github.com/runconduit/conduit/controller/util"
log "github.com/sirupsen/logrus"
)
@ -19,6 +20,7 @@ func main() {
rand.Seed(time.Now().UnixNano())
addr := flag.String("addr", ":8089", "address of proxy api")
path := flag.String("path", "strest-server.default.svc.cluster.local:8888", "destination path")
flag.Parse()
client, conn, err := destination.NewClient(*addr)
@ -29,7 +31,7 @@ func main() {
req := &common.Destination{
Scheme: "k8s",
Path: "strest-server.default.svc.cluster.local:8888",
Path: *path,
}
rsp, err := client.Get(context.Background(), req)
@ -45,19 +47,24 @@ func main() {
if err != nil {
log.Fatal(err.Error())
}
if add := update.GetAdd(); add != nil {
switch updateType := update.Update.(type) {
case *pb.Update_Add:
log.Println("Add:")
for _, addr := range add.Addrs {
for _, addr := range updateType.Add.Addrs {
log.Printf("- %s:%d", util.IPToString(addr.Addr.GetIp()), addr.Addr.Port)
}
log.Println()
}
if remove := update.GetRemove(); remove != nil {
case *pb.Update_Remove:
log.Println("Remove:")
for _, addr := range remove.Addrs {
for _, addr := range updateType.Remove.Addrs {
log.Printf("- %s:%d", util.IPToString(addr.GetIp()), addr.Port)
}
log.Println()
case *pb.Update_NoEndpoints:
log.Println("NoEndpoints:")
log.Printf("- exists:%t", updateType.NoEndpoints.Exists)
log.Println()
}
}
}

View File

@ -10,12 +10,40 @@ import "common/common.proto";
// weighted set of addresses and address metadata. Can be implemented with DNS
// or lookups against other service discovery backends.
//
// If the service does not exist then the controller must send
// `no_endpoints{exists: false}` ASAP when a client subscribes or when the
// service stops existing. If the service exists and has endpoints available
// then the controller must send `add` that lists all (or at least a large
// number) of the endpoints for the service. If and only if the service exists
// but does not have any endpoints available then the controller SHOULD send
// `no_endpoints{exists: true}` when a client subscribes. In other words, the
// `no_endpoints` message must only be sent when there are *no*endpoints for
// the service.
//
// The controller is expected to send an Update every time there is a
// change in service discovery. The controller is also expected to send an
// update at least once every ADDRESS_UPDATE_INTERVAL to indicate that the
// controller is still healthy. If no service discovery updates have taken
// place, the controller can simply send an empty `add`. The controller may
// determine the value of ADDRESS_UPDATE_INTERVAL.
//
// The client MUST be prepared to receive messages in any order and the client
// MUST be able to cope with the presence or absence of redundant messages.
//
// `no_endpoints` followed by an `add` is *not* equivalent to just sending the
// `add` regardless of the value of the `exists` field in the `no_endpoints`
// message. `remove` followed by a `no_endpoints` message is equivalent to
// sending just the `no_endpoints` message, and a `remove` that removes the
// last endpoint is equivalent to a `no_endpoints{exists: true}` message.
//
// When the client gets disconnected from the controller and reconnects, the
// client may use stale results from its previous subscription until, and only
// until, it receives the first message. This is why the controller must send
// a message at the start of a subscription. This is also why the controller
// must not send a `no_endpoints` message before an `add` message; the client
// would clear its cached messages between the time it receives the
// `no_endpoints` message and the time it receives the `add` message, which is
// not the desired behavior.
service Destination {
// Given a destination, return all addresses in that destination as a long-
@ -25,8 +53,19 @@ service Destination {
message Update {
oneof update {
// A new set of endpoints are available for the service. The set might be
// empty.
WeightedAddrSet add = 1;
// Some endpoints have been removed from the service.
AddrSet remove = 2;
// `no_endpoints{exists: false}` indicates that the service does not exist
// and the client MAY try an alternate service discovery method (e.g. DNS).
//
// `no_endpoints(exists: true)` indicates that the service does exist and
// the client MUST NOT fall back to an alternate service discovery method.
NoEndpoints no_endpoints = 3;
}
}
@ -42,3 +81,7 @@ message WeightedAddr {
common.TcpAddress addr = 1;
uint32 weight = 3;
}
message NoEndpoints {
bool exists = 1;
}

View File

@ -2,6 +2,7 @@ use std::collections::{HashSet, VecDeque};
use std::collections::hash_map::{Entry, HashMap};
use std::net::SocketAddr;
use std::fmt;
use std::mem;
use futures::{Async, Future, Poll, Stream};
use futures::sync::mpsc;
@ -54,12 +55,24 @@ pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> {
}
struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
addrs: HashSet<SocketAddr>,
addrs: Exists<HashSet<SocketAddr>>,
needs_reconnect: bool,
rx: UpdateRx<T>,
txs: Vec<mpsc::UnboundedSender<Update>>,
}
enum Exists<T> {
Unknown, // Unknown if the item exists or not
Yes(T),
No, // Affirmatively known to not exist.
}
impl<T> Exists<T> {
fn take(&mut self) -> Exists<T> {
mem::replace(self, Exists::Unknown)
}
}
/// Receiver for destination set updates.
///
/// The destination RPC returns a `ResponseFuture` whose item is a
@ -255,8 +268,14 @@ where
let set = occ.get_mut();
// we may already know of some addresses here, so push
// them onto the new watch first
for &addr in &set.addrs {
let _ = tx.unbounded_send(Update::Insert(addr));
match set.addrs {
Exists::Yes(ref mut addrs) => {
for &addr in addrs.iter() {
tx.unbounded_send(Update::Insert(addr))
.expect("unbounded_send does not fail");
}
},
Exists::No | Exists::Unknown => (),
}
set.txs.push(tx);
}
@ -272,7 +291,7 @@ where
let stream = UpdateRx::Waiting(response);
vac.insert(DestinationSet {
addrs: HashSet::new(),
addrs: Exists::Unknown,
needs_reconnect: false,
rx: stream,
txs: vec![tx],
@ -322,28 +341,17 @@ where
match set.rx.poll() {
Ok(Async::Ready(Some(update))) => match update.update {
Some(PbUpdate2::Add(a_set)) => for addr in a_set.addrs {
if let Some(addr) = addr.addr.and_then(pb_to_sock_addr) {
if set.addrs.insert(addr) {
trace!("update {:?} for {:?}", addr, auth);
// retain is used to drop any senders that are dead
set.txs.retain(|tx| {
tx.unbounded_send(Update::Insert(addr)).is_ok()
});
}
}
},
Some(PbUpdate2::Remove(r_set)) => for addr in r_set.addrs {
if let Some(addr) = pb_to_sock_addr(addr) {
if set.addrs.remove(&addr) {
trace!("remove {:?} for {:?}", addr, auth);
// retain is used to drop any senders that are dead
set.txs.retain(|tx| {
tx.unbounded_send(Update::Remove(addr)).is_ok()
});
}
}
},
Some(PbUpdate2::Add(a_set)) =>
set.add(
auth,
a_set.addrs.iter().filter_map(
|addr| addr.addr.clone().and_then(pb_to_sock_addr))),
Some(PbUpdate2::Remove(r_set)) =>
set.remove(
auth,
r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone()))),
Some(PbUpdate2::NoEndpoints(no_endpoints)) =>
set.no_endpoints(auth, no_endpoints.exists),
None => (),
},
Ok(Async::Ready(None)) => {
@ -369,6 +377,82 @@ where
}
}
// ===== impl DestinationSet =====
impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
fn add<Addrs>(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_add: Addrs)
where Addrs: Iterator<Item = SocketAddr>
{
let mut addrs = match self.addrs.take() {
Exists::Yes(addrs) => addrs,
Exists::Unknown | Exists::No => {
trace!("adding entries for {:?} that wasn't known to exist. Now assuming it does.",
authority_for_logging);
HashSet::new()
},
};
for addr in addrs_to_add {
if addrs.insert(addr) {
trace!("update {:?} for {:?}", addr, authority_for_logging);
// retain is used to drop any senders that are dead
self.txs.retain(|tx| {
tx.unbounded_send(Update::Insert(addr)).is_ok()
});
}
}
self.addrs = Exists::Yes(addrs);
}
fn remove<Addrs>(&mut self, authority_for_logging: &FullyQualifiedAuthority,
addrs_to_remove: Addrs)
where Addrs: Iterator<Item = SocketAddr>
{
let addrs = match self.addrs.take() {
Exists::Yes(mut addrs) => {
for addr in addrs_to_remove {
if addrs.remove(&addr) {
self.notify_of_removal(addr, authority_for_logging)
}
}
addrs
},
Exists::Unknown | Exists::No => {
trace!("remove addresses for {:?} that wasn't known to exist. Now assuming it does.",
authority_for_logging);
HashSet::new()
},
};
self.addrs = Exists::Yes(addrs)
}
fn no_endpoints(&mut self, authority_for_logging: &FullyQualifiedAuthority, exists: bool) {
trace!("no endpoints for {:?} that is known to {}", authority_for_logging,
if exists { "exist"} else { "not exist"});
match self.addrs.take() {
Exists::Yes(addrs) => {
for addr in addrs {
self.notify_of_removal(addr, authority_for_logging)
}
},
Exists::No | Exists::Unknown => {},
}
self.addrs = if exists {
Exists::Yes(HashSet::new())
} else {
Exists::No
};
}
fn notify_of_removal(&mut self, addr: SocketAddr,
authority_for_logging: &FullyQualifiedAuthority) {
trace!("remove {:?} for {:?}", addr, authority_for_logging);
// retain is used to drop any senders that are dead
self.txs.retain(|tx| {
tx.unbounded_send(Update::Remove(addr)).is_ok()
});
}
}
// ===== impl Bind =====
impl<F, S, E> Bind for F