Skip to content

Commit

Permalink
Merge pull request #152 from seanmonstar/proxy
Browse files Browse the repository at this point in the history
add a `Proxy` type
  • Loading branch information
seanmonstar authored Jun 22, 2017
2 parents 855e661 + 6cdaff4 commit 478309e
Show file tree
Hide file tree
Showing 8 changed files with 481 additions and 16 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ categories = ["web-programming::http-client"]
bytes = "0.4"
futures = "0.1.14"
hyper = "0.11"
hyper-tls = "0.1"
hyper-tls = "0.1.1"
libflate = "0.1.5"
log = "0.3"
native-tls = "0.1"
serde = "1.0"
serde_json = "1.0"
serde_urlencoded = "0.5"
tokio-core = "0.1.6"
tokio-io = "0.1"
url = "1.2"

[dev-dependencies]
Expand Down
45 changes: 31 additions & 14 deletions src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ use tokio_core::reactor::Handle;
use super::body;
use super::request::{self, Request, RequestBuilder};
use super::response::{self, Response};
use connect::Connector;
use into_url::to_uri;
use redirect::{self, RedirectPolicy, check_redirect, remove_sensitive_headers};
use {Certificate, IntoUrl, Method, StatusCode, Url};
use {Certificate, IntoUrl, Method, proxy, Proxy, StatusCode, Url};

static DEFAULT_USER_AGENT: &'static str =
concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
Expand Down Expand Up @@ -79,6 +81,7 @@ pub struct ClientBuilder {
struct Config {
gzip: bool,
hostname_verification: bool,
proxies: Vec<Proxy>,
redirect_policy: RedirectPolicy,
referer: bool,
timeout: Option<Duration>,
Expand All @@ -97,6 +100,7 @@ impl ClientBuilder {
config: Some(Config {
gzip: true,
hostname_verification: true,
proxies: Vec::new(),
redirect_policy: RedirectPolicy::default(),
referer: true,
timeout: None,
Expand Down Expand Up @@ -127,7 +131,9 @@ impl ClientBuilder {
}
*/

let hyper_client = create_hyper_client(tls, handle);
let proxies = Arc::new(config.proxies);

let hyper_client = create_hyper_client(tls, proxies.clone(), handle);
//let mut hyper_client = create_hyper_client(tls_client);

//hyper_client.set_read_timeout(config.timeout);
Expand All @@ -137,6 +143,7 @@ impl ClientBuilder {
inner: Arc::new(ClientRef {
gzip: config.gzip,
hyper: hyper_client,
proxies: proxies,
redirect_policy: config.redirect_policy,
referer: config.referer,
}),
Expand Down Expand Up @@ -187,6 +194,13 @@ impl ClientBuilder {
self
}

/// Add a `Proxy` to the list of proxies the `Client` will use.
#[inline]
pub fn proxy(&mut self, proxy: Proxy) -> &mut ClientBuilder {
self.config_mut().proxies.push(proxy);
self
}

/// Set a `RedirectPolicy` for this client.
///
/// Default will follow redirects up to a maximum of 10.
Expand Down Expand Up @@ -226,14 +240,11 @@ impl ClientBuilder {
}
}

type HyperClient = ::hyper::Client<::hyper_tls::HttpsConnector<::hyper::client::HttpConnector>>;
type HyperClient = ::hyper::Client<Connector>;

fn create_hyper_client(tls: TlsConnector, handle: &Handle) -> HyperClient {
let mut http = ::hyper::client::HttpConnector::new(4, handle);
http.enforce_http(false);
let https = ::hyper_tls::HttpsConnector::from((http, tls));
fn create_hyper_client(tls: TlsConnector, proxies: Arc<Vec<Proxy>>, handle: &Handle) -> HyperClient {
::hyper::Client::configure()
.connector(https)
.connector(Connector::new(tls, proxies, handle))
.build(handle)
}

Expand Down Expand Up @@ -363,14 +374,19 @@ impl Client {
headers.set(AcceptEncoding(vec![qitem(Encoding::Gzip)]));
}

let mut req = ::hyper::Request::new(method.clone(), url_to_uri(&url));
let uri = to_uri(&url);
let mut req = ::hyper::Request::new(method.clone(), uri.clone());
*req.headers_mut() = headers.clone();
let body = body.and_then(|body| {
let (resuable, body) = body::into_hyper(body);
req.set_body(body);
resuable
});

if proxy::is_proxied(&self.inner.proxies, &uri) {
req.set_proxy(true);
}

let in_flight = self.inner.hyper.request(req);

Pending {
Expand Down Expand Up @@ -408,6 +424,7 @@ impl fmt::Debug for ClientBuilder {
struct ClientRef {
gzip: bool,
hyper: HyperClient,
proxies: Arc<Vec<Proxy>>,
redirect_policy: RedirectPolicy,
referer: bool,
}
Expand Down Expand Up @@ -473,14 +490,18 @@ impl Future for Pending {

remove_sensitive_headers(&mut self.headers, &self.url, &self.urls);
debug!("redirecting to {:?} '{}'", self.method, self.url);
let uri = to_uri(&self.url);
let mut req = ::hyper::Request::new(
self.method.clone(),
url_to_uri(&self.url)
uri.clone()
);
*req.headers_mut() = self.headers.clone();
if let Some(ref body) = self.body {
req.set_body(body.clone());
}
if proxy::is_proxied(&self.client.proxies, &uri) {
req.set_proxy(true);
}
self.in_flight = self.client.hyper.request(req);
continue;
},
Expand Down Expand Up @@ -525,10 +546,6 @@ fn make_referer(next: &Url, previous: &Url) -> Option<Referer> {
Some(Referer::new(referer.into_string()))
}

fn url_to_uri(url: &Url) -> ::hyper::Uri {
url.as_str().parse().expect("a parsed Url should always be a valid Uri")
}

// pub(crate)

pub fn take_builder(builder: &mut ClientBuilder) -> ClientBuilder {
Expand Down
9 changes: 8 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::sync::{mpsc, oneshot};

use request::{self, Request, RequestBuilder};
use response::{self, Response};
use {async_impl, Certificate, Method, IntoUrl, RedirectPolicy, wait};
use {async_impl, Certificate, Method, IntoUrl, Proxy, RedirectPolicy, wait};

/// A `Client` to make Requests with.
///
Expand Down Expand Up @@ -141,6 +141,13 @@ impl ClientBuilder {
self
}

/// Add a `Proxy` to the list of proxies the `Client` will use.
#[inline]
pub fn proxy(&mut self, proxy: Proxy) -> &mut ClientBuilder {
self.inner.proxy(proxy);
self
}

/// Set a `RedirectPolicy` for this client.
///
/// Default will follow redirects up to a maximum of 10.
Expand Down
200 changes: 200 additions & 0 deletions src/connect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use bytes::{BufMut, IntoBuf};
use futures::{Async, Future, Poll};
use hyper::client::{HttpConnector, Service};
use hyper::Uri;
use hyper_tls::{/*HttpsConnecting,*/ HttpsConnector, MaybeHttpsStream};
use native_tls::TlsConnector;
use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};

use std::io::{self, Cursor};
use std::sync::Arc;

use {proxy, Proxy};

// pub(crate)

pub struct Connector {
https: HttpsConnector<HttpConnector>,
proxies: Arc<Vec<Proxy>>,
}

impl Connector {
pub fn new(tls: TlsConnector, proxies: Arc<Vec<Proxy>>, handle: &Handle) -> Connector {
let mut http = HttpConnector::new(4, handle);
http.enforce_http(false);
let https = HttpsConnector::from((http, tls));

Connector {
https: https,
proxies: proxies,
}
}
}

impl Service for Connector {
type Request = Uri;
type Response = Conn;
type Error = io::Error;
type Future = Connecting;

fn call(&self, uri: Uri) -> Self::Future {
for prox in self.proxies.iter() {
if let Some(puri) = proxy::proxies(prox, &uri) {
if uri.scheme() == Some("https") {
let host = uri.authority().unwrap().to_owned();
return Box::new(self.https.call(puri).and_then(|conn| {
tunnel(conn, host)
}));
}
return Box::new(self.https.call(puri));
}
}
Box::new(self.https.call(uri))
}
}

pub type Conn = MaybeHttpsStream<<HttpConnector as Service>::Response>;
pub type Connecting = Box<Future<Item=Conn, Error=io::Error>>;

fn tunnel<T>(conn: T, host: String) -> Tunnel<T> {
let buf = format!("\
CONNECT {0} HTTP/1.1\r\n\
Host: {0}\r\n\
\r\n\
", host).into_bytes();

Tunnel {
buf: buf.into_buf(),
conn: Some(conn),
state: TunnelState::Writing,
}
}

struct Tunnel<T> {
buf: Cursor<Vec<u8>>,
conn: Option<T>,
state: TunnelState,
}

enum TunnelState {
Writing,
Reading
}

impl<T> Future for Tunnel<T>
where T: AsyncRead + AsyncWrite {
type Item = T;
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
if let TunnelState::Writing = self.state {
let n = try_ready!(self.conn.as_mut().unwrap().write_buf(&mut self.buf));
if !self.buf.has_remaining_mut() {
self.state = TunnelState::Reading;
self.buf.get_mut().truncate(0);
} else if n == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected eof while tunneling"));
}
} else {
let n = try_ready!(self.conn.as_mut().unwrap().read_buf(&mut self.buf.get_mut()));
let read = &self.buf.get_ref()[..];
if n == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected eof while tunneling"));
} else if read.len() > 12 {
if read.starts_with(b"HTTP/1.1 200") {
if read.ends_with(b"\r\n\r\n") {
return Ok(Async::Ready(self.conn.take().unwrap()));
}
// else read more
} else {
return Err(io::Error::new(io::ErrorKind::Other, "unsuccessful tunnel"));
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use std::io::{Read, Write};
use std::net::TcpListener;
use std::thread;
use futures::Future;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use super::tunnel;


macro_rules! mock_tunnel {
() => ({
mock_tunnel!(b"\
HTTP/1.1 200 OK\r\n\
\r\n\
")
});
($write:expr) => ({
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let connect_expected = format!("\
CONNECT {0} HTTP/1.1\r\n\
Host: {0}\r\n\
\r\n\
", addr).into_bytes();

thread::spawn(move || {
let (mut sock, _) = listener.accept().unwrap();
let mut buf = [0u8; 4096];
let n = sock.read(&mut buf).unwrap();
assert_eq!(&buf[..n], &connect_expected[..]);

sock.write_all($write).unwrap();
});
addr
})
}

#[test]
fn test_tunnel() {
let addr = mock_tunnel!();

let mut core = Core::new().unwrap();
let work = TcpStream::connect(&addr, &core.handle());
let host = addr.to_string();
let work = work.and_then(|tcp| {
tunnel(tcp, host)
});

core.run(work).unwrap();
}

#[test]
fn test_tunnel_eof() {
let addr = mock_tunnel!(b"HTTP/1.1 200 OK");

let mut core = Core::new().unwrap();
let work = TcpStream::connect(&addr, &core.handle());
let host = addr.to_string();
let work = work.and_then(|tcp| {
tunnel(tcp, host)
});

core.run(work).unwrap_err();
}

#[test]
fn test_tunnel_bad_response() {
let addr = mock_tunnel!(b"foo bar baz hallo");

let mut core = Core::new().unwrap();
let work = TcpStream::connect(&addr, &core.handle());
let host = addr.to_string();
let work = work.and_then(|tcp| {
tunnel(tcp, host)
});

core.run(work).unwrap_err();
}
}
4 changes: 4 additions & 0 deletions src/into_url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ impl<'a> PolyfillTryInto for &'a String {
Url::parse(self)
}
}

pub fn to_uri(url: &Url) -> ::hyper::Uri {
url.as_str().parse().expect("a parsed Url should always be a valid Uri")
}
Loading

0 comments on commit 478309e

Please sign in to comment.