-
Notifications
You must be signed in to change notification settings - Fork 884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide access to inner Write for parquet writers #5471
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -270,16 +270,38 @@ impl<W: Write + Send> ArrowWriter<W> { | |||||||||
self.writer.append_key_value_metadata(kv_metadata) | ||||||||||
} | ||||||||||
|
||||||||||
/// Returns a reference to the underlying writer. | ||||||||||
pub fn inner(&self) -> &W { | ||||||||||
self.writer.inner() | ||||||||||
} | ||||||||||
|
||||||||||
/// Returns a mutable reference to the underlying writer. | ||||||||||
/// | ||||||||||
/// It is inadvisable to directly write to the underlying writer, doing so | ||||||||||
/// will likely result in a corrupt parquet file | ||||||||||
pub fn inner_mut(&mut self) -> &mut W { | ||||||||||
self.writer.inner_mut() | ||||||||||
} | ||||||||||
|
||||||||||
/// Flushes any outstanding data and returns the underlying writer. | ||||||||||
pub fn into_inner(mut self) -> Result<W> { | ||||||||||
self.flush()?; | ||||||||||
self.writer.into_inner() | ||||||||||
} | ||||||||||
|
||||||||||
/// Close and finalize the underlying Parquet writer | ||||||||||
pub fn close(mut self) -> Result<crate::format::FileMetaData> { | ||||||||||
/// | ||||||||||
/// Unlike [`Self::close`] this does not consume self | ||||||||||
/// | ||||||||||
/// Attempting to write after calling finish will result in an error | ||||||||||
pub fn finish(&mut self) -> Result<crate::format::FileMetaData> { | ||||||||||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
self.flush()?; | ||||||||||
self.writer.close() | ||||||||||
self.writer.finish() | ||||||||||
} | ||||||||||
|
||||||||||
/// Close and finalize the underlying Parquet writer | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again I think this is sufficiently obvious |
||||||||||
pub fn close(mut self) -> Result<crate::format::FileMetaData> { | ||||||||||
self.finish() | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,6 +61,19 @@ impl<W: Write> TrackedWrite<W> { | |
self.bytes_written | ||
} | ||
|
||
/// Returns a reference to the underlying writer. | ||
pub fn inner(&self) -> &W { | ||
self.inner.get_ref() | ||
} | ||
|
||
/// Returns a mutable reference to the underlying writer. | ||
/// | ||
/// It is inadvisable to directly write to the underlying writer, doing so | ||
/// will likely result in data corruption | ||
pub fn inner_mut(&mut self) -> &mut W { | ||
self.inner.get_mut() | ||
} | ||
|
||
/// Returns the underlying writer. | ||
pub fn into_inner(self) -> Result<W> { | ||
self.inner.into_inner().map_err(|err| { | ||
|
@@ -137,6 +150,7 @@ pub struct SerializedFileWriter<W: Write> { | |
row_group_index: usize, | ||
// kv_metadatas will be appended to `props` when `write_metadata` | ||
kv_metadatas: Vec<KeyValue>, | ||
finished: bool, | ||
} | ||
|
||
impl<W: Write> Debug for SerializedFileWriter<W> { | ||
|
@@ -167,6 +181,7 @@ impl<W: Write + Send> SerializedFileWriter<W> { | |
offset_indexes: Vec::new(), | ||
row_group_index: 0, | ||
kv_metadatas: Vec::new(), | ||
finished: false, | ||
}) | ||
} | ||
|
||
|
@@ -210,13 +225,23 @@ impl<W: Write + Send> SerializedFileWriter<W> { | |
&self.row_groups | ||
} | ||
|
||
/// Closes and finalises file writer, returning the file metadata. | ||
pub fn close(mut self) -> Result<parquet::FileMetaData> { | ||
/// Close and finalize the underlying Parquet writer | ||
/// | ||
/// Unlike [`Self::close`] this does not consume self | ||
/// | ||
/// Attempting to write after calling finish will result in an error | ||
pub fn finish(&mut self) -> Result<parquet::FileMetaData> { | ||
self.assert_previous_writer_closed()?; | ||
let metadata = self.write_metadata()?; | ||
self.buf.flush()?; | ||
Ok(metadata) | ||
} | ||
|
||
/// Closes and finalises file writer, returning the file metadata. | ||
pub fn close(mut self) -> Result<parquet::FileMetaData> { | ||
self.finish() | ||
} | ||
|
||
/// Writes magic bytes at the beginning of the file. | ||
fn start_file(buf: &mut TrackedWrite<W>) -> Result<()> { | ||
buf.write_all(&PARQUET_MAGIC)?; | ||
|
@@ -303,6 +328,7 @@ impl<W: Write + Send> SerializedFileWriter<W> { | |
|
||
/// Assembles and writes metadata at the end of the file. | ||
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> { | ||
self.finished = true; | ||
let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); | ||
|
||
let mut row_groups = self | ||
|
@@ -366,6 +392,10 @@ impl<W: Write + Send> SerializedFileWriter<W> { | |
|
||
#[inline] | ||
fn assert_previous_writer_closed(&self) -> Result<()> { | ||
if self.finished { | ||
return Err(general_err!("SerializedFileWriter already finished")); | ||
} | ||
|
||
if self.row_group_index != self.row_groups.len() { | ||
Err(general_err!("Previous row group writer was not closed")) | ||
} else { | ||
|
@@ -387,6 +417,18 @@ impl<W: Write + Send> SerializedFileWriter<W> { | |
&self.props | ||
} | ||
|
||
/// Returns a reference to the underlying writer. | ||
pub fn inner(&self) -> &W { | ||
self.buf.inner() | ||
} | ||
|
||
/// Returns a mutable reference to the underlying writer. | ||
/// | ||
/// It is inadvisable to directly write to the underlying writer. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above, I think it would be good to explain why it is inadvisable and document when one would use this API |
||
pub fn inner_mut(&mut self) -> &mut W { | ||
self.buf.inner_mut() | ||
} | ||
|
||
/// Writes the file footer and returns the underlying writer. | ||
pub fn into_inner(mut self) -> Result<W> { | ||
self.assert_previous_writer_closed()?; | ||
|
@@ -1755,7 +1797,7 @@ mod tests { | |
b_writer.close().unwrap(); | ||
row_group_writer.close().unwrap(); | ||
|
||
let metadata = file_writer.close().unwrap(); | ||
let metadata = file_writer.finish().unwrap(); | ||
assert_eq!(metadata.row_groups.len(), 1); | ||
let row_group = &metadata.row_groups[0]; | ||
assert_eq!(row_group.columns.len(), 2); | ||
|
@@ -1766,6 +1808,11 @@ mod tests { | |
assert!(row_group.columns[1].offset_index_offset.is_some()); | ||
assert!(row_group.columns[1].column_index_offset.is_none()); | ||
|
||
let err = file_writer.next_row_group().err().unwrap().to_string(); | ||
assert_eq!(err, "Parquet error: SerializedFileWriter already finished"); | ||
|
||
drop(file_writer); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is the drop necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The file_writer has a borrow of the vec |
||
|
||
let options = ReadOptionsBuilder::new().with_page_index().build(); | ||
let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap(); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is sufficiently obvious
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My rationale for this suggestion was to make it easier for someone to quickly determine "what is the difference between the very similarly sounding
finish()
andclose()
methods