#![cfg_attr(feature = "cargo-clippy", allow(clone_on_ref_ptr))] #![cfg_attr(feature = "cargo-clippy", allow(new_without_default_derive))] #![deny(warnings)] extern crate bytes; extern crate conduit_proxy_controller_grpc; extern crate convert; extern crate env_logger; extern crate deflate; #[macro_use] extern crate futures; extern crate futures_mpsc_lossy; extern crate futures_watch; extern crate h2; extern crate http; extern crate httparse; extern crate hyper; extern crate ipnet; #[cfg(target_os = "linux")] extern crate libc; #[macro_use] extern crate log; #[cfg_attr(test, macro_use)] extern crate indexmap; extern crate prost; extern crate prost_types; #[cfg(test)] #[macro_use] extern crate quickcheck; extern crate rand; extern crate tokio_connect; extern crate tokio_core; extern crate tokio_io; extern crate tower_balance; extern crate tower_buffer; extern crate tower_discover; extern crate tower_grpc; extern crate tower_h2; extern crate tower_reconnect; extern crate tower_service; extern crate conduit_proxy_router; extern crate tower_util; extern crate tower_in_flight_limit; extern crate trust_dns_resolver; use futures::*; use std::error::Error; use std::io; use std::net::SocketAddr; use std::sync::Arc; use std::thread; use std::time::Duration; use indexmap::IndexSet; use tokio_core::reactor::{Core, Handle}; use tower_service::NewService; use tower_fn::*; use conduit_proxy_router::{Recognize, Router, Error as RouteError}; pub mod app; mod bind; pub mod config; mod connection; pub mod control; pub mod ctx; mod dns; mod drain; mod inbound; mod logging; mod map_err; mod outbound; pub mod telemetry; mod transparency; mod transport; pub mod timeout; mod tower_fn; // TODO: move to tower-fn use bind::Bind; use connection::BoundPort; use inbound::Inbound; use map_err::MapErr; use transparency::{HttpBody, Server}; pub use transport::{GetOriginalDst, SoOriginalDst}; use outbound::Outbound; /// Runs a sidecar proxy. /// /// The proxy binds two listeners: /// /// - a private socket (TCP or UNIX) for outbound requests to other instances; /// - and a public socket (TCP and optionally TLS) for inbound requests from other /// instances. /// /// The public listener forwards requests to a local socket (TCP or UNIX). /// /// The private listener routes requests to service-discovery-aware load-balancer. /// pub struct Main { config: config::Config, control_listener: BoundPort, inbound_listener: BoundPort, outbound_listener: BoundPort, metrics_listener: BoundPort, get_original_dst: G, reactor: Core, } impl Main where G: GetOriginalDst + Clone + 'static, { pub fn new(config: config::Config, get_original_dst: G) -> Self { let control_listener = BoundPort::new(config.control_listener.addr) .expect("controller listener bind"); let inbound_listener = BoundPort::new(config.public_listener.addr) .expect("public listener bind"); let outbound_listener = BoundPort::new(config.private_listener.addr) .expect("private listener bind"); let reactor = Core::new().expect("reactor"); let metrics_listener = BoundPort::new(config.metrics_listener.addr) .expect("metrics listener bind"); Main { config, control_listener, inbound_listener, outbound_listener, metrics_listener, get_original_dst, reactor, } } pub fn control_addr(&self) -> SocketAddr { self.control_listener.local_addr() } pub fn inbound_addr(&self) -> SocketAddr { self.inbound_listener.local_addr() } pub fn outbound_addr(&self) -> SocketAddr { self.outbound_listener.local_addr() } pub fn handle(&self) -> Handle { self.reactor.handle() } pub fn metrics_addr(&self) -> SocketAddr { self.metrics_listener.local_addr() } pub fn run_until(self, shutdown_signal: F) where F: Future, { let process_ctx = ctx::Process::new(&self.config); let Main { config, control_listener, inbound_listener, outbound_listener, metrics_listener, get_original_dst, reactor: mut core, } = self; let control_host_and_port = config.control_host_and_port.clone(); info!("using controller at {:?}", control_host_and_port); info!("routing on {:?}", outbound_listener.local_addr()); info!( "proxying on {:?} to {:?}", inbound_listener.local_addr(), config.private_forward ); info!( "serving Prometheus metrics on {:?}", metrics_listener.local_addr(), ); info!( "protocol detection disabled for inbound ports {:?}", config.inbound_ports_disable_protocol_detection, ); info!( "protocol detection disabled for outbound ports {:?}", config.outbound_ports_disable_protocol_detection, ); let (sensors, telemetry) = telemetry::new( &process_ctx, config.event_buffer_capacity, config.metrics_retain_idle, ); let dns_config = dns::Config::from_system_config() .unwrap_or_else(|e| { // TODO: Make DNS configuration infallible. panic!("invalid DNS configuration: {:?}", e); }); let (control, control_bg) = control::new(dns_config.clone(), config.pod_namespace.clone()); let executor = core.handle(); let (drain_tx, drain_rx) = drain::channel(); let bind = Bind::new(executor.clone()).with_sensors(sensors.clone()); // Setup the public listener. This will listen on a publicly accessible // address and listen for inbound connections that should be forwarded // to the managed application (private destination). let inbound = { let ctx = ctx::Proxy::inbound(&process_ctx); let bind = bind.clone().with_ctx(ctx.clone()); let default_addr = config.private_forward.map(|a| a.into()); let fut = serve( inbound_listener, Inbound::new(default_addr, bind), config.inbound_router_capacity, config.private_connect_timeout, config.inbound_ports_disable_protocol_detection, ctx, sensors.clone(), get_original_dst.clone(), drain_rx.clone(), &executor, ); ::logging::context_future("inbound", fut) }; // Setup the private listener. This will listen on a locally accessible // address and listen for outbound requests that should be routed // to a remote service (public destination). let outbound = { let ctx = ctx::Proxy::outbound(&process_ctx); let bind = bind.clone().with_ctx(ctx.clone()); let outgoing = Outbound::new(bind, control, config.bind_timeout); let fut = serve( outbound_listener, outgoing, config.outbound_router_capacity, config.public_connect_timeout, config.outbound_ports_disable_protocol_detection, ctx, sensors, get_original_dst, drain_rx, &executor, ); ::logging::context_future("outbound", fut) }; trace!("running"); let (_tx, controller_shutdown_signal) = futures::sync::oneshot::channel::<()>(); { thread::Builder::new() .name("controller-client".into()) .spawn(move || { use conduit_proxy_controller_grpc::tap::server::TapServer; let mut core = Core::new().expect("initialize controller core"); let executor = core.handle(); let (taps, observe) = control::Observe::new(100); let new_service = TapServer::new(observe); let server = serve_control( control_listener, new_service, &executor, ); let telemetry = telemetry .make_control(&taps, &executor) .expect("bad news in telemetry town"); let metrics_server = telemetry .serve_metrics(metrics_listener); let client = control_bg.bind( control_host_and_port, dns_config, &executor ); let fut = client.join4( server.map_err(|_| {}), telemetry, metrics_server.map_err(|_| {}), ).map(|_| {}); executor.spawn(::logging::context_future("controller-client", fut)); let shutdown = controller_shutdown_signal.then(|_| Ok::<(), ()>(())); core.run(shutdown).expect("controller api"); }) .expect("initialize controller api thread"); } let fut = inbound .join(outbound) .map(|_| ()) .map_err(|err| error!("main error: {:?}", err)); core.handle().spawn(fut); let shutdown_signal = shutdown_signal.and_then(move |()| { debug!("shutdown signaled"); drain_tx.drain() }); core.run(shutdown_signal).expect("executor"); debug!("shutdown complete"); } } fn serve( bound_port: BoundPort, recognize: R, router_capacity: usize, tcp_connect_timeout: Duration, disable_protocol_detection_ports: IndexSet, proxy_ctx: Arc, sensors: telemetry::Sensors, get_orig_dst: G, drain_rx: drain::Watch, executor: &Handle, ) -> Box + 'static> where B: tower_h2::Body + Default + 'static, E: Error + 'static, F: Error + 'static, R: Recognize< Request = http::Request, Response = http::Response>, Error = E, RouteError = F, > + 'static, G: GetOriginalDst + 'static, { let router = Router::new(recognize, router_capacity); let stack = Arc::new(NewServiceFn::new(move || { // Clone the router handle let router = router.clone(); // Map errors to appropriate response error codes. let map_err = MapErr::new(router, |e| { match e { RouteError::Route(r) => { error!(" turning route error: {} into 500", r); http::StatusCode::INTERNAL_SERVER_ERROR } RouteError::Inner(i) => { error!("turning {} into 500", i); http::StatusCode::INTERNAL_SERVER_ERROR } RouteError::NotRecognized => { error!("turning route not recognized error into 500"); http::StatusCode::INTERNAL_SERVER_ERROR } RouteError::NoCapacity(capacity) => { // TODO For H2 streams, we should probably signal a protocol-level // capacity change. error!("router at capacity ({}); returning a 503", capacity); http::StatusCode::SERVICE_UNAVAILABLE } } }); // Install the request open timestamp module at the very top // of the stack, in order to take the timestamp as close as // possible to the beginning of the request's lifetime. telemetry::sensor::http::TimestampRequestOpen::new(map_err) })); let listen_addr = bound_port.local_addr(); let server = Server::new( listen_addr, proxy_ctx, sensors, get_orig_dst, stack, tcp_connect_timeout, disable_protocol_detection_ports, drain_rx.clone(), executor.clone(), ); let accept = bound_port.listen_and_fold( executor, (), move |(), (connection, remote_addr)| { server.serve(connection, remote_addr); Ok(()) }, ); let accept_until = Cancelable { future: accept, canceled: false, }; // As soon as we get a shutdown signal, the listener // is canceled immediately. Box::new(drain_rx.watch(accept_until, |accept| { accept.canceled = true; })) } /// Can cancel a future by setting a flag. /// /// Used to 'watch' the accept futures, and close the listeners /// as soon as the shutdown signal starts. struct Cancelable { future: F, canceled: bool, } impl Future for Cancelable where F: Future, { type Item = (); type Error = F::Error; fn poll(&mut self) -> Poll { if self.canceled { Ok(().into()) } else { self.future.poll() } } } fn serve_control( bound_port: BoundPort, new_service: N, executor: &Handle, ) -> Box + 'static> where B: tower_h2::Body + 'static, N: NewService, Response = http::Response> + 'static, { let h2_builder = h2::server::Builder::default(); let server = tower_h2::Server::new(new_service, h2_builder, executor.clone()); bound_port.listen_and_fold( executor, (server, executor.clone()), move |(server, executor), (session, _)| { let s = server.serve(session).map_err(|_| ()); executor.spawn(::logging::context_future("serve_control", s)); future::ok((server, executor)) }, ) }