Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide access to inner Write for parquet writers #5471

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,38 @@ impl<W: Write + Send> ArrowWriter<W> {
self.writer.append_key_value_metadata(kv_metadata)
}

/// Returns a reference to the underlying writer.
pub fn inner(&self) -> &W {
self.writer.inner()
}

/// Returns a mutable reference to the underlying writer.
///
/// It is inadvisable to directly write to the underlying writer, doing so
/// will likely result in a corrupt parquet file
pub fn inner_mut(&mut self) -> &mut W {
self.writer.inner_mut()
}

/// Flushes any outstanding data and returns the underlying writer.
pub fn into_inner(mut self) -> Result<W> {
self.flush()?;
self.writer.into_inner()
}

/// Close and finalize the underlying Parquet writer
pub fn close(mut self) -> Result<crate::format::FileMetaData> {
///
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
///
/// returning the completed [`crate::formatFileMetaData`] for the written file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is sufficiently obvious

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My rationale for this suggestion was to make it easier for someone to quickly determine "what is the difference between the very similarly sounding finish() and close() methods

/// Unlike [`Self::close`] this does not consume self
///
/// Attempting to write after calling finish will result in an error
pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
self.flush()?;
self.writer.close()
self.writer.finish()
}

/// Close and finalize the underlying Parquet writer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Close and finalize the underlying Parquet writer
/// Close and finalize the underlying Parquet writer, consuming self
///
/// Returns the metadata for the newly created Parquet file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again I think this is sufficiently obvious

pub fn close(mut self) -> Result<crate::format::FileMetaData> {
self.finish()
}
}

Expand Down
78 changes: 17 additions & 61 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
//! # }
//! ```

use std::{io::Write, sync::Arc};

use crate::{
arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
Expand Down Expand Up @@ -94,14 +92,11 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
/// ```
pub struct AsyncArrowWriter<W> {
/// Underlying sync writer
sync_writer: ArrowWriter<SharedBuffer>,
sync_writer: ArrowWriter<Vec<u8>>,

/// Async writer provided by caller
async_writer: W,

/// The inner buffer shared by the `sync_writer` and the `async_writer`
shared_buffer: SharedBuffer,

/// Trigger forced flushing once buffer size reaches this value
buffer_size: usize,
}
Expand Down Expand Up @@ -135,14 +130,15 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
buffer_size: usize,
options: ArrowWriterOptions,
) -> Result<Self> {
let shared_buffer = SharedBuffer::new(buffer_size);
let sync_writer =
ArrowWriter::try_new_with_options(shared_buffer.clone(), arrow_schema, options)?;
let sync_writer = ArrowWriter::try_new_with_options(
Vec::with_capacity(buffer_size),
arrow_schema,
options,
)?;

Ok(Self {
sync_writer,
async_writer: writer,
shared_buffer,
buffer_size,
})
}
Expand Down Expand Up @@ -173,18 +169,13 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
/// checked and flush if at least half full
pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.sync_writer.write(batch)?;
Self::try_flush(
&mut self.shared_buffer,
&mut self.async_writer,
self.buffer_size,
)
.await
self.try_flush(false).await
}

/// Flushes all buffered rows into a new row group
pub async fn flush(&mut self) -> Result<()> {
self.sync_writer.flush()?;
Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, 0).await?;
self.try_flush(false).await?;

Ok(())
}
Expand All @@ -200,34 +191,29 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
///
/// All the data in the inner buffer will be force flushed.
pub async fn close(mut self) -> Result<FileMetaData> {
let metadata = self.sync_writer.close()?;
let metadata = self.sync_writer.finish()?;

// Force to flush the remaining data.
Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, 0).await?;
self.try_flush(true).await?;
self.async_writer.shutdown().await?;

Ok(metadata)
}

/// Flush the data in the [`SharedBuffer`] into the `async_writer` if its size
/// exceeds the threshold.
async fn try_flush(
shared_buffer: &mut SharedBuffer,
async_writer: &mut W,
buffer_size: usize,
) -> Result<()> {
let mut buffer = shared_buffer.buffer.try_lock().unwrap();
if buffer.is_empty() || buffer.len() < buffer_size {
/// Flush the buffered data into the `async_writer`
async fn try_flush(&mut self, force: bool) -> Result<()> {
let buffer = self.sync_writer.inner_mut();
if !force && (buffer.is_empty() || buffer.len() < self.buffer_size) {
// no need to flush
return Ok(());
}

async_writer
self.async_writer
.write_all(buffer.as_slice())
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;

async_writer
self.async_writer
.flush()
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
Expand All @@ -239,42 +225,12 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
}
}

/// A buffer with interior mutability shared by the [`ArrowWriter`] and
/// [`AsyncArrowWriter`].
#[derive(Clone)]
struct SharedBuffer {
/// The inner buffer for reading and writing
///
/// The lock is used to obtain internal mutability, so no worry about the
/// lock contention.
buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
}

impl SharedBuffer {
pub fn new(capacity: usize) -> Self {
Self {
buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
}
}
}

impl Write for SharedBuffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::write(&mut *buffer, buf)
}

fn flush(&mut self) -> std::io::Result<()> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::flush(&mut *buffer)
}
}

#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader};
use bytes::Bytes;
use std::sync::Arc;
use tokio::pin;

use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
Expand Down
53 changes: 50 additions & 3 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ impl<W: Write> TrackedWrite<W> {
self.bytes_written
}

/// Returns a reference to the underlying writer.
pub fn inner(&self) -> &W {
self.inner.get_ref()
}

/// Returns a mutable reference to the underlying writer.
///
/// It is inadvisable to directly write to the underlying writer, doing so
/// will likely result in data corruption
pub fn inner_mut(&mut self) -> &mut W {
self.inner.get_mut()
}

/// Returns the underlying writer.
pub fn into_inner(self) -> Result<W> {
self.inner.into_inner().map_err(|err| {
Expand Down Expand Up @@ -137,6 +150,7 @@ pub struct SerializedFileWriter<W: Write> {
row_group_index: usize,
// kv_metadatas will be appended to `props` when `write_metadata`
kv_metadatas: Vec<KeyValue>,
finished: bool,
}

impl<W: Write> Debug for SerializedFileWriter<W> {
Expand Down Expand Up @@ -167,6 +181,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
offset_indexes: Vec::new(),
row_group_index: 0,
kv_metadatas: Vec::new(),
finished: false,
})
}

Expand Down Expand Up @@ -210,13 +225,23 @@ impl<W: Write + Send> SerializedFileWriter<W> {
&self.row_groups
}

/// Closes and finalises file writer, returning the file metadata.
pub fn close(mut self) -> Result<parquet::FileMetaData> {
/// Close and finalize the underlying Parquet writer
///
/// Unlike [`Self::close`] this does not consume self
///
/// Attempting to write after calling finish will result in an error
pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
self.assert_previous_writer_closed()?;
let metadata = self.write_metadata()?;
self.buf.flush()?;
Ok(metadata)
}

/// Closes and finalises file writer, returning the file metadata.
pub fn close(mut self) -> Result<parquet::FileMetaData> {
self.finish()
}

/// Writes magic bytes at the beginning of the file.
fn start_file(buf: &mut TrackedWrite<W>) -> Result<()> {
buf.write_all(&PARQUET_MAGIC)?;
Expand Down Expand Up @@ -303,6 +328,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {

/// Assembles and writes metadata at the end of the file.
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
self.finished = true;
let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();

let mut row_groups = self
Expand Down Expand Up @@ -366,6 +392,10 @@ impl<W: Write + Send> SerializedFileWriter<W> {

#[inline]
fn assert_previous_writer_closed(&self) -> Result<()> {
if self.finished {
return Err(general_err!("SerializedFileWriter already finished"));
}

if self.row_group_index != self.row_groups.len() {
Err(general_err!("Previous row group writer was not closed"))
} else {
Expand All @@ -387,6 +417,18 @@ impl<W: Write + Send> SerializedFileWriter<W> {
&self.props
}

/// Returns a reference to the underlying writer.
pub fn inner(&self) -> &W {
self.buf.inner()
}

/// Returns a mutable reference to the underlying writer.
///
/// It is inadvisable to directly write to the underlying writer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, I think it would be good to explain why it is inadvisable and document when one would use this API

pub fn inner_mut(&mut self) -> &mut W {
self.buf.inner_mut()
}

/// Writes the file footer and returns the underlying writer.
pub fn into_inner(mut self) -> Result<W> {
self.assert_previous_writer_closed()?;
Expand Down Expand Up @@ -1755,7 +1797,7 @@ mod tests {
b_writer.close().unwrap();
row_group_writer.close().unwrap();

let metadata = file_writer.close().unwrap();
let metadata = file_writer.finish().unwrap();
assert_eq!(metadata.row_groups.len(), 1);
let row_group = &metadata.row_groups[0];
assert_eq!(row_group.columns.len(), 2);
Expand All @@ -1766,6 +1808,11 @@ mod tests {
assert!(row_group.columns[1].offset_index_offset.is_some());
assert!(row_group.columns[1].column_index_offset.is_none());

let err = file_writer.next_row_group().err().unwrap().to_string();
assert_eq!(err, "Parquet error: SerializedFileWriter already finished");

drop(file_writer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the drop necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file_writer has a borrow of the vec


let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();

Expand Down
Loading