Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dictionary encoding in structures for FlightDataEncoder, add documentation for arrow_flight::encode::Dictionary #5488

Merged
merged 7 commits into from
Mar 15, 2024
44 changes: 27 additions & 17 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,29 +388,39 @@ impl Stream for FlightDataEncoder {
/// Defines how a [`FlightDataEncoder`] encodes [`DictionaryArray`]s
///
/// [`DictionaryArray`]: arrow_array::DictionaryArray
///
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how things work in arrow-rs and I'm just assuming this is how the flight protocol is supposed to work :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update this comment maybe to say that the FlightDataEncoder doesn't really handle sending the same dictionary multiple times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't really handle as in will send the same dictionary over and over again for evert batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

/// The current implementation does not support "delta" dictionaries so a new dictionary batch will be sent each time the encoder sees a
/// dictionary which is not pointer-equal to the previously observed dictionary for a given `dict_id`.

to clarify that we do not send delta dictionaries but the current implementation will actually skip sending the dictionary if it is pointer-equal to existing tracked dictionary.

/// In the arrow flight protocol dictionary values and keys are sent as two separate messages.
/// When a sender is encoding a [`RecordBatch`] containing [`DictionaryArray`] columns, it will
/// first send a dictionary batch (a batch with header [`MessageHeader::DictionaryBatch`]) containing
/// the dictionary values. The receiver is responsible for reading this batch and maintaining state that associates
/// those dictionary values with the corresponding array using the `dict_id` as a key.
///
/// After sending the dictionary batch the sender will send the array data in a batch with header ['MessageHeader::RecordBatch '].
/// For any dictionary array batches in this message, the encoded flight message will only contain the dictionary keys. The receiver
/// is then responsible for rebuilding the `DictionaryArray` on the client side using the dictionary values from the DictionaryBatch message
/// and the keys from the RecordBatch message.
///
/// For example, if we have a batch with a `TypedDictionaryArray<'_, UInt32Type, Utf8Type>` (a dictionary array where they keys are `u32` and the
/// values are `String`), then the DictionaryBatch will contain a `StringArray` and the RecordBatch will contain a `UInt32Array`.
///
/// Not that since `dict_id` defined in the `Schema` is used as a key to assicated dictionary values to their arrays it is required that each
/// `DictionaryArray` in a `RecordBatch` have a unique `dict_id`.
#[derive(Debug, PartialEq)]
pub enum DictionaryHandling {
/// Expands to the underlying type (default). This likely sends more data
/// over the network but requires less memory (dictionaries are not tracked)
/// and is more compatible with other arrow flight client implementations
/// that may not support `DictionaryEncoding`
///
/// An IPC response, streaming or otherwise, defines its schema up front
/// which defines the mapping from dictionary IDs. It then sends these
/// dictionaries over the wire.
/// This method should be used if all batches over the entire lifetime fo the flight stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this description is accurate. The behavior is what is "supposed" to happen with Flight, but to the best of my knowledge, this behavior isn't implemented -- the gap is tracked in #3389 but I don't think it is yet implemented

I think what Hydrate actually does is to send arrays using the underlying type -- for example, DictionaryArray<Int32, Utf8> would be cast to StringArray and the StringArray would be sent over the network

I believe Resend actually resends a new dictionary for each DictionaryArray, without trying to figure out if it was the same as the previous dictionary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think you're right but the current implementation is actually broken then because it doesn't take into account nested dictionary fields:

fn prepare_schema_for_flight(schema: &Schema, send_dictionaries: bool) -> Schema {
    let fields: Fields = schema
        .fields()
        .iter()
        .map(|field| match field.data_type() {
            DataType::Dictionary(_, value_type) if !send_dictionaries => Field::new(
                field.name(),
                value_type.as_ref().clone(),
                field.is_nullable(),
            )
            .with_metadata(field.metadata().clone()),
            _ => field.as_ref().clone(),
        })
        .collect();

    Schema::new(fields).with_metadata(schema.metadata().clone())
}

Then if you have DictionaryHandling::Hydrate (the default) it will break on dictionary replacement of the nested field:

    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
        let dict_data = column.to_data();
        let dict_values = &dict_data.child_data()[0];

        // If a dictionary with this id was already emitted, check if it was the same.
        if let Some(last) = self.written.get(&dict_id) {
            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
                // Same dictionary values => no need to emit it again
                return Ok(false);
            }
            if self.error_on_replacement {
                // If error on replacement perform a logical comparison
                if last.child_data()[0] == *dict_values {
                    // Same dictionary values => no need to emit it again
                    return Ok(false);
                }
                return Err(ArrowError::InvalidArgumentError(
                    "Dictionary replacement detected when writing IPC file format. \
                     Arrow IPC files only support a single dictionary for a given field \
                     across all batches."
                        .to_string(),
                ));
            }
        }

        self.written.insert(dict_id, dict_data);
        Ok(true)
    }

Copy link
Contributor Author

@thinkharderdev thinkharderdev Mar 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok yeah, I think this is why I was getting so confused. The original description of Hydrate was correct but it just doesn't work correctly for nested dictionary arrays. So I guess we can

  1. Fix the issue with nested fields
  2. Revert back to original description

But I don't quite understand why Hydrate is even an option? It seems strictly worse than Resend? If you wanted to hydrate a dictionary array it seems like you would do that prior to sending over the flight stream instead of doing it on the fly in the protocol.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 It seems strictly worse than Resend?

If you have a sparse dictionary, as you might if you've applied a selective predicate, sending a dictionary for each batch where most of the values are not used could conceivably be worse.

That being said this is also historical, as resend was added later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it was mostly historical

The last time I checked (maybe 2 years ago) at least one client (maybe the golang one) didn't handle dictionaries (so if the rust server sent DictionaryArray the client couldn't deal with it) which was important for us as well

All in all, sorting out how to handle Dictionaries better / more rationally in arrow-flight would be a very nice improvement I think (aka working on #3389)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb @tustvold Ok so I fixed handling of nested dictionary fields so Hydrate works in those cases now and restored the documentation back to more or less what it originally said (with some additional comments to clarify things a bit

/// share the same dictionary (as determined by pointer-equality of the `DictionaryArray`'s `values` array).
///
/// This requires identifying the different dictionaries in use, assigning
/// them IDs, and sending new dictionaries, delta or otherwise, when needed
/// The shared dictionary will be sent only once and the encoder will error in the case where it detects
/// a dictionary which is not pointer-equal to the initial dictionary.
///
/// See also:
/// * <https://github.com/apache/arrow-rs/issues/1206>
/// The arrow flight protocol supports delta dictionaries where the sender can send new dictionary values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches my understanding too

/// incrementally but this is currently not supported in arrow-rs.
Hydrate,
/// Send dictionary FlightData with every RecordBatch that contains a
/// [`DictionaryArray`]. See [`Self::Hydrate`] for more tradeoffs. No
/// attempt is made to skip sending the same (logical) dictionary values
/// twice.
/// This method should be used if each batch may contain different dictionary values. This will require more data to be sent
/// across the wire as each batch sent will require a new DictionaryBatch message with all dictionary values for that batch.
///
/// [`DictionaryArray`]: arrow_array::DictionaryArray
/// The dictionary values will be sent for each batch separately and the receiver will use dictionary values
/// from the most recent dictionary batch it received for the given `dict_id`.
Resend,
}

Expand Down
Loading