Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
Wrap the TcpStream in a BufferedStream.
Browse files Browse the repository at this point in the history
As it stood, the header-reading part of a client used a BufferedReader
which was then discarded, although it had probably consumed at least the
start of the request body.

I had been using TcpStream, BufferedReader<TcpStream> and
BufferedWriter<TcpStream> in different places. This was entirely
unworkable, so I've replaced them with immediately wrapping the
TcpStream in a BufferedStream and using it everywhere. This makes things
sane for now.

Alas, this also introduces a rather severe regression owing to an
as-yet-unidentified Rust bug, for the call to
http::client::response::ResponseReader::construct is causing a segfault.
That is, execution is failing after the caller has begun calling the
function, but before construct has begun to execute. This renders the
client inoperable for the moment.

--HG--
extra : amend_source : 9263d96068f59080326179877528b84946587e85
  • Loading branch information
chris-morgan committed Aug 10, 2013
1 parent f4ea90c commit f782362
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 99 deletions.
139 changes: 67 additions & 72 deletions src/libhttp/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,76 @@
/// Memory buffers for the benefit of `std::rt::io::net` which has slow read/write.
use std::rt::io::{Reader, Writer};
use std::rt::io::{Reader, Writer, Stream};
use std::rt::io::net::tcp::TcpStream;
use std::cast::transmute_mut;
use std::cmp::min;
use std::ptr;

pub type BufTcpStream = BufferedStream<TcpStream>;

// 64KB chunks (moderately arbitrary)
static READ_BUF_SIZE: uint = 0x10000;
static WRITE_BUF_SIZE: uint = 0x10000;
// TODO: consider removing constants and giving a buffer size in the constructor

struct BufferedReader<'self, T> {
wrapped: &'self mut T,
buffer: [u8, ..READ_BUF_SIZE],
struct BufferedStream<T> {
wrapped: T,
read_buffer: [u8, ..READ_BUF_SIZE],
// The current position in the buffer
pos: uint,
read_pos: uint,
// The last valid position in the reader
max: uint,
read_max: uint,
write_buffer: [u8, ..WRITE_BUF_SIZE],
write_len: uint,

/// Some things being written may not like flush() being called yet (e.g. explicitly fail!())
/// The BufferedReader may need to be flushed for good control, but let it provide for such
/// cases by not calling the wrapped object's flush method in turn.
call_wrapped_flush: bool,
}

impl<'self, T: Reader> BufferedReader<'self, T> {
pub fn new<'a>(reader: &'a mut T/*, buffer_size: uint*/) -> BufferedReader<'a, T> {
BufferedReader {
wrapped: reader,
buffer: [0u8, ..READ_BUF_SIZE], //[0u8, ..buffer_size],
pos: 0u,
max: 0u,
impl<T: Reader + Writer /*Stream*/> BufferedStream<T> {
pub fn new(stream: T, call_wrapped_flush: bool) -> BufferedStream<T> {
BufferedStream {
wrapped: stream,
read_buffer: [0u8, ..READ_BUF_SIZE],
read_pos: 0u,
read_max: 0u,
write_buffer: [0u8, ..WRITE_BUF_SIZE],
write_len: 0u,
call_wrapped_flush: call_wrapped_flush,
}
}
}

impl<T: Stream> Stream for BufferedStream<T>;

impl<T: Reader> BufferedStream<T> {
/// Poke a single byte back so it will be read next. For this to make sense, you must have just
/// read that byte. If `self.pos` is 0 and `self.max` is not 0 (i.e. if the buffer is just
/// filled
/// Very great caution must be used in calling this as it will fail if `self.pos` is 0.
pub fn poke_byte(&mut self, byte: u8) {
match (self.pos, self.max) {
(0, 0) => self.max = 1,
match (self.read_pos, self.read_max) {
(0, 0) => self.read_max = 1,
(0, _) => fail!("poke called when buffer is full"),
(_, _) => self.pos -= 1,
(_, _) => self.read_pos -= 1,
}
self.buffer[self.pos] = byte;
self.read_buffer[self.read_pos] = byte;
}

#[inline]
fn fill_buffer(&mut self) -> bool {
assert_eq!(self.pos, self.max);
match self.wrapped.read(self.buffer) {
assert_eq!(self.read_pos, self.read_max);
match self.wrapped.read(self.read_buffer) {
None => {
self.pos = 0;
self.max = 0;
self.read_pos = 0;
self.read_max = 0;
false
},
Some(i) => {
self.pos = 0;
self.max = i;
self.read_pos = 0;
self.read_max = i;
true
},
}
Expand All @@ -63,69 +80,46 @@ impl<'self, T: Reader> BufferedReader<'self, T> {
/// (which just uses `read()`)
#[inline]
pub fn read_byte(&mut self) -> Option<u8> {
if self.pos == self.max && !self.fill_buffer() {
if self.read_pos == self.read_max && !self.fill_buffer() {
// Run out of buffered content, no more to come
return None;
}
self.pos += 1;
Some(self.buffer[self.pos - 1])
self.read_pos += 1;
Some(self.read_buffer[self.read_pos - 1])
}
}

impl<'self, T: Reader> Reader for ~BufferedReader<'self, T> {
impl<T: Reader> Reader for BufferedStream<T> {
/// Read at most N bytes into `buf`, where N is the minimum of `buf.len()` and the buffer size.
///
/// At present, this makes no attempt to fill its buffer proactively, instead waiting until you
/// ask.
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
if self.pos == self.max && !self.fill_buffer() {
if self.read_pos == self.read_max && !self.fill_buffer() {
// Run out of buffered content, no more to come
return None;
}
let size = min(self.max - self.pos, buf.len());
let size = min(self.read_max - self.read_pos, buf.len());
unsafe {
do buf.as_mut_buf |p_dst, _len_dst| {
do self.buffer.as_imm_buf |p_src, _len_src| {
do self.read_buffer.as_imm_buf |p_src, _len_src| {
// Note that copy_memory works on bytes; good, u8 is byte-sized
ptr::copy_memory(p_dst, ptr::offset(p_src, self.pos as int), size)
ptr::copy_memory(p_dst, ptr::offset(p_src, self.read_pos as int), size)
}
}
}
self.pos += size;
self.read_pos += size;
Some(size)
}

/// Return whether the Reader has reached the end of the stream AND exhausted its buffer.
fn eof(&mut self) -> bool {
self.pos == self.max && self.wrapped.eof()
}
}

struct BufferedWriter<'self, T> {
wrapped: &'self mut T,
buffer: [u8, ..WRITE_BUF_SIZE],
buflen: uint,

/// Some things being written may not like flush() being called yet (e.g. explicitly fail!())
/// The BufferedReader may need to be flushed for good control, but let it provide for such
/// cases by not calling the wrapped object's flush method in turn.
call_wrapped_flush: bool,
}

impl<'self, T: Writer> BufferedWriter<'self, T> {
pub fn new<'a>(writer: &'a mut T, call_wrapped_flush: bool/*, buffer_size: uint*/)
-> BufferedWriter<'a, T> {
BufferedWriter {
wrapped: writer,
buffer: [0u8, ..WRITE_BUF_SIZE], //[0u8, ..buffer_size],
buflen: 0u,
call_wrapped_flush: call_wrapped_flush,
}
self.read_pos == self.read_max && self.wrapped.eof()
}
}

#[unsafe_destructor]
impl<'self, T: Writer> Drop for BufferedWriter<'self, T> {
impl<T: Writer> Drop for BufferedStream<T> {
fn drop(&self) {
// Clearly wouldn't be a good idea to finish without flushing!

Expand All @@ -135,39 +129,40 @@ impl<'self, T: Writer> Drop for BufferedWriter<'self, T> {
}
}

impl<'self, T: Writer> Writer for BufferedWriter<'self, T> {
impl<T: Writer> Writer for BufferedStream<T> {
fn write(&mut self, buf: &[u8]) {
if buf.len() + self.buflen > self.buffer.len() {
if buf.len() + self.write_len > self.write_buffer.len() {
// This is the lazy approach which may involve two writes where it's really not
// warranted. Maybe deal with that later.
if self.buflen > 0 {
self.wrapped.write(self.buffer.slice_to(self.buflen));
self.buflen = 0;
if self.write_len > 0 {
self.wrapped.write(self.write_buffer.slice_to(self.write_len));
self.write_len = 0;
}
self.wrapped.write(buf);
self.buflen = 0;
self.write_len = 0;
} else {
// Safely copy buf onto the "end" of self.buffer
unsafe {
do buf.as_imm_buf |p_src, len_src| {
do self.buffer.as_mut_buf |p_dst, _len_dst| {
do self.write_buffer.as_mut_buf |p_dst, _len_dst| {
// Note that copy_memory works on bytes; good, u8 is byte-sized
ptr::copy_memory(ptr::mut_offset(p_dst, self.buflen as int), p_src, len_src)
ptr::copy_memory(ptr::mut_offset(p_dst, self.write_len as int),
p_src, len_src)
}
}
}
self.buflen += buf.len();
if self.buflen == self.buffer.len() {
self.wrapped.write(self.buffer);
self.buflen = 0;
self.write_len += buf.len();
if self.write_len == self.write_buffer.len() {
self.wrapped.write(self.write_buffer);
self.write_len = 0;
}
}
}

fn flush(&mut self) {
if self.buflen > 0 {
self.wrapped.write(self.buffer.slice_to(self.buflen));
self.buflen = 0;
if self.write_len > 0 {
self.wrapped.write(self.write_buffer.slice_to(self.write_len));
self.write_len = 0;
}
if self.call_wrapped_flush {
self.wrapped.flush();
Expand Down
8 changes: 3 additions & 5 deletions src/libhttp/client/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::rt;
use std::rt::io::Writer;
use std::rt::io::net::ip::SocketAddr;
use std::rt::io::net::tcp::TcpStream;
use buffer::{BufTcpStream, BufferedStream};
use headers::Headers;
use headers::host::Host;

Expand All @@ -29,8 +30,7 @@ use client::response::ResponseReader;

pub struct RequestWriter {
// The place to write to (typically a TCP stream, rt::io::net::tcp::TcpStream)
//priv writer: Option<BufferedWriter<'self, TcpStream>>,
priv stream: Option<TcpStream>,
priv stream: Option<BufTcpStream>,
priv headers_written: bool,

/// The originating IP address of the request.
Expand Down Expand Up @@ -99,13 +99,11 @@ impl RequestWriter {

self.stream = match self.remote_addr {
Some(addr) => match TcpStream::connect(addr) {
Some(stream) => Some(stream),
Some(stream) => Some(BufferedStream::new(stream, false)),
None => return false,
},
None => fail!("connect() called before remote_addr was set"),
};
// Desired: BufferedWriter::new(stream, false), but lifetime woes make that not possible
// with how it's structured at present.
true
}

Expand Down
7 changes: 3 additions & 4 deletions src/libhttp/client/response.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use extra::treemap::TreeMap;
use std::rt::io::Reader;
use std::rt::io::extensions::ReaderUtil;
use std::rt::io::net::tcp::TcpStream;
use std::rt::io::{io_error, OtherIoError, IoError};
use std::rt;
use client::request::RequestWriter;
Expand All @@ -10,10 +9,11 @@ use common::read_http_version;
use headers::{Headers, normalise_header_name};
use status::Status;

use buffer::BufTcpStream;
use server::request::{RequestBuffer, EndOfFile, EndOfHeaders, MalformedHeader};

struct ResponseReader {
priv stream: TcpStream,
priv stream: BufTcpStream,

/// The request which this is a response to
request: RequestWriter,
Expand All @@ -38,7 +38,7 @@ fn bad_response_err() -> IoError {
}

impl ResponseReader {
pub fn construct(mut stream: TcpStream, request: RequestWriter)
pub fn construct(mut stream: BufTcpStream, request: RequestWriter)
-> Result<ResponseReader, RequestWriter> {
// TODO: raise condition at the points where Err is returned
//let mut b = [0u8, ..4096];
Expand Down Expand Up @@ -106,7 +106,6 @@ impl ResponseReader {
// between a request and response.
let headers = {
let mut buffer = RequestBuffer::new(&mut stream);

let mut headers = ~TreeMap::new();
loop {
match buffer.read_header_line() {
Expand Down
2 changes: 1 addition & 1 deletion src/libhttp/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::rt::io::extensions::ReaderUtil;

#[inline]
pub fn read_http_version<T: Reader>(reader: &mut T, next_u8: u8) -> Option<(uint, uint)> {
// XXX: by doing this, I've stopped the more efficient BufferedReader.read_byte from being used
// XXX: by doing this, I've stopped the more efficient BufferedStream.read_byte from being used
// HTTP/%u.%u
match reader.read_byte() {
Some(b) if b == 'h' as u8 || b == 'H' as u8 => (),
Expand Down
11 changes: 7 additions & 4 deletions src/libhttp/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use extra::time::precise_time_ns;

use std::rt::io::net::tcp::TcpListener;

use buffer::BufferedStream;

pub use self::request::{RequestBuffer, Request};
pub use self::response::ResponseWriter;

Expand Down Expand Up @@ -80,13 +82,14 @@ impl<T: Send + Clone + Server> ServerUtil for T {
let child_self = self.clone();
do spawn_supervised {
let mut time_start = time_start;
let mut stream = ~stream.take();
let mut stream = BufferedStream::new(stream.take(),
/* TcpStream.flush() fails! */ false);
debug!("accepted connection, got %?", stream);
loop { // A keep-alive loop, condition at end
let time_spawned = precise_time_ns();
let (request, err_status) = Request::get(~RequestBuffer::new(stream));
let (request, err_status) = Request::load(&mut stream);
let time_request_made = precise_time_ns();
let mut response = ~ResponseWriter::new(stream, request);
let mut response = ~ResponseWriter::new(&mut stream, request);
let time_response_made = precise_time_ns();
match err_status {
Ok(()) => {
Expand All @@ -104,7 +107,7 @@ impl<T: Send + Clone + Server> ServerUtil for T {
},
}
// This should not be necessary, but is, because of the Drop bug
// apparent in BufferedWriter. When that is fixed up, then it *may* be
// apparent in BufferedStream. When that is fixed up, then it *may* be
// suitable to remove flush() from here. I say "may" as it would mean
// that time_finished might not include writing all the response (a
// non-trivial time interval).
Expand Down
Loading

0 comments on commit f782362

Please sign in to comment.