package destination import ( "fmt" "net" "strconv" "strings" common "github.com/runconduit/conduit/controller/gen/common" pb "github.com/runconduit/conduit/controller/gen/proxy/destination" "github.com/runconduit/conduit/controller/k8s" "github.com/runconduit/conduit/controller/util" log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) type ( server struct { endpoints *k8s.EndpointsWatcher } ) // The Destination service serves service discovery information to the proxy. // This implementation supports the "k8s" destination scheme and expects // destination paths to be of the form: // ..svc.cluster.local: // // If the port is omitted, 80 is used as a default. If the namespace is // omitted, "default" is used as a default.append // // Addresses for the given destination are fetched from the Kubernetes Endpoints // API. func NewServer(addr, kubeconfig string, done chan struct{}) (*grpc.Server, net.Listener, error) { clientSet, err := k8s.NewClientSet(kubeconfig) if err != nil { return nil, nil, err } endpoints := k8s.NewEndpointsWatcher(clientSet) go endpoints.Run() srv := &server{ endpoints: endpoints, } lis, err := net.Listen("tcp", addr) if err != nil { return nil, nil, err } s := util.NewGrpcServer() pb.RegisterDestinationServer(s, srv) go func() { <-done endpoints.Stop() }() return s, lis, nil } func (s *server) Get(dest *common.Destination, stream pb.Destination_GetServer) error { log.Debugf("Get %v", dest) if dest.Scheme != "k8s" { err := fmt.Errorf("Unsupported scheme %v", dest.Scheme) log.Error(err) return err } hostPort := strings.Split(dest.Path, ":") if len(hostPort) > 2 { err := fmt.Errorf("Invalid destination %s", dest.Path) log.Error(err) return err } host := hostPort[0] port := 80 if len(hostPort) == 2 { var err error port, err = strconv.Atoi(hostPort[1]) if err != nil { err := fmt.Errorf("Invalid port %s", hostPort[1]) log.Error(err) return err } } // service.namespace.svc.cluster.local domains := strings.Split(host, ".") if len(domains) < 2 { err := fmt.Errorf("not a service: %s", host) log.Error(err) return err } service := domains[0] namespace := domains[1] id := namespace + "/" + service listener := endpointListener{stream: stream} s.endpoints.Subscribe(id, uint32(port), listener) <-stream.Context().Done() s.endpoints.Unsubscribe(id, uint32(port), listener) return nil } type endpointListener struct { stream pb.Destination_GetServer } func (listener endpointListener) Update(add []common.TcpAddress, remove []common.TcpAddress) { if len(add) > 0 { update := &pb.Update{ Update: &pb.Update_Add{ Add: toWeightedAddrSet(add), }, } listener.stream.Send(update) } if len(remove) > 0 { update := &pb.Update{ Update: &pb.Update_Remove{ Remove: toAddrSet(remove), }, } listener.stream.Send(update) } } func toWeightedAddrSet(endpoints []common.TcpAddress) *pb.WeightedAddrSet { addrs := make([]*pb.WeightedAddr, 0) for i := range endpoints { addrs = append(addrs, &pb.WeightedAddr{ Addr: &endpoints[i], Weight: 1, }) } return &pb.WeightedAddrSet{Addrs: addrs} } func toAddrSet(endpoints []common.TcpAddress) *pb.AddrSet { addrs := make([]*common.TcpAddress, 0) for i := range endpoints { addrs = append(addrs, &endpoints[i]) } return &pb.AddrSet{Addrs: addrs} }