arrow_row/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A comparable row-oriented representation of a collection of [`Array`].
19//!
20//! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared],
21//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort].
22//! This makes the row format ideal for implementing efficient multi-column sorting,
23//! grouping, aggregation, windowing and more, as described in more detail
24//! [in this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/).
25//!
26//! For example, given three input [`Array`], [`RowConverter`] creates byte
27//! sequences that [compare] the same as when using [`lexsort`].
28//!
29//! ```text
30//!    ┌─────┐   ┌─────┐   ┌─────┐
31//!    │     │   │     │   │     │
32//!    ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐              ┏━━━━━━━━━━━━━┓
33//!    │     │   │     │   │     │  ─────────────▶┃             ┃
34//!    ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘              ┗━━━━━━━━━━━━━┛
35//!    │     │   │     │   │     │
36//!    └─────┘   └─────┘   └─────┘
37//!                ...
38//!    ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐              ┏━━━━━━━━┓
39//!    │     │   │     │   │     │  ─────────────▶┃        ┃
40//!    └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘              ┗━━━━━━━━┛
41//!     UInt64      Utf8     F64
42//!
43//!           Input Arrays                          Row Format
44//!     (Columns)
45//! ```
46//!
47//! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison
48//! to be meaningful._
49//!
50//! # Basic Example
51//! ```
52//! # use std::sync::Arc;
53//! # use arrow_row::{RowConverter, SortField};
54//! # use arrow_array::{ArrayRef, Int32Array, StringArray};
55//! # use arrow_array::cast::{AsArray, as_string_array};
56//! # use arrow_array::types::Int32Type;
57//! # use arrow_schema::DataType;
58//!
59//! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef;
60//! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef;
61//! let arrays = vec![a1, a2];
62//!
63//! // Convert arrays to rows
64//! let converter = RowConverter::new(vec![
65//!     SortField::new(DataType::Int32),
66//!     SortField::new(DataType::Utf8),
67//! ]).unwrap();
68//! let rows = converter.convert_columns(&arrays).unwrap();
69//!
70//! // Compare rows
71//! for i in 0..4 {
72//!     assert!(rows.row(i) <= rows.row(i + 1));
73//! }
74//! assert_eq!(rows.row(3), rows.row(4));
75//!
76//! // Convert rows back to arrays
77//! let converted = converter.convert_rows(&rows).unwrap();
78//! assert_eq!(arrays, converted);
79//!
80//! // Compare rows from different arrays
81//! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef;
82//! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef;
83//! let arrays = vec![a1, a2];
84//! let rows2 = converter.convert_columns(&arrays).unwrap();
85//!
86//! assert!(rows.row(4) < rows2.row(0));
87//! assert!(rows.row(4) < rows2.row(1));
88//!
89//! // Convert selection of rows back to arrays
90//! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)];
91//! let converted = converter.convert_rows(selection).unwrap();
92//! let c1 = converted[0].as_primitive::<Int32Type>();
93//! assert_eq!(c1.values(), &[-1, 4, 0, 3]);
94//!
95//! let c2 = converted[1].as_string::<i32>();
96//! let c2_values: Vec<_> = c2.iter().flatten().collect();
97//! assert_eq!(&c2_values, &["a", "f", "c", "e"]);
98//! ```
99//!
100//! # Lexsort
101//!
102//! The row format can also be used to implement a fast multi-column / lexicographic sort
103//!
104//! ```
105//! # use arrow_row::{RowConverter, SortField};
106//! # use arrow_array::{ArrayRef, UInt32Array};
107//! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array {
108//!     let fields = arrays
109//!         .iter()
110//!         .map(|a| SortField::new(a.data_type().clone()))
111//!         .collect();
112//!     let converter = RowConverter::new(fields).unwrap();
113//!     let rows = converter.convert_columns(arrays).unwrap();
114//!     let mut sort: Vec<_> = rows.iter().enumerate().collect();
115//!     sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
116//!     UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
117//! }
118//! ```
119//!
120//! [non-comparison sorts]: https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts
121//! [radix sort]: https://en.wikipedia.org/wiki/Radix_sort
122//! [normalized for sorting]: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf
123//! [`memcmp`]: https://www.man7.org/linux/man-pages/man3/memcmp.3.html
124//! [`lexsort`]: https://docs.rs/arrow-ord/latest/arrow_ord/sort/fn.lexsort.html
125//! [compared]: PartialOrd
126//! [compare]: PartialOrd
127
128#![doc(
129    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
130    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
131)]
132#![cfg_attr(docsrs, feature(doc_auto_cfg))]
133#![warn(missing_docs)]
134use std::cmp::Ordering;
135use std::hash::{Hash, Hasher};
136use std::sync::Arc;
137
138use arrow_array::cast::*;
139use arrow_array::types::ArrowDictionaryKeyType;
140use arrow_array::*;
141use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
142use arrow_data::ArrayDataBuilder;
143use arrow_schema::*;
144use variable::{decode_binary_view, decode_string_view};
145
146use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
147use crate::variable::{decode_binary, decode_string};
148
149mod fixed;
150mod list;
151mod variable;
152
153/// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
154///
155/// *Note: The encoding of the row format may change from release to release.*
156///
157/// ## Overview
158///
159/// The row format is a variable length byte sequence created by
160/// concatenating the encoded form of each column. The encoding for
161/// each column depends on its datatype (and sort options).
162///
163/// The encoding is carefully designed in such a way that escaping is
164/// unnecessary: it is never ambiguous as to whether a byte is part of
165/// a sentinel (e.g. null) or a value.
166///
167/// ## Unsigned Integer Encoding
168///
169/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding
170/// to the integer's length.
171///
172/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the
173/// integer.
174///
175/// ```text
176///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
177///    3          │03│00│00│00│      │01│00│00│00│03│
178///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
179///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
180///   258         │02│01│00│00│      │01│00│00│01│02│
181///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
182///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
183///  23423        │7F│5B│00│00│      │01│00│00│5B│7F│
184///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
185///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
186///  NULL         │??│??│??│??│      │00│00│00│00│00│
187///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
188///
189///              32-bit (4 bytes)        Row Format
190///  Value        Little Endian
191/// ```
192///
193/// ## Signed Integer Encoding
194///
195/// Signed integers have their most significant sign bit flipped, and are then encoded in the
196/// same manner as an unsigned integer.
197///
198/// ```text
199///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
200///     5  │05│00│00│00│       │05│00│00│80│       │01│80│00│00│05│
201///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
202///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
203///    -5  │FB│FF│FF│FF│       │FB│FF│FF│7F│       │01│7F│FF│FF│FB│
204///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
205///
206///  Value  32-bit (4 bytes)    High bit flipped      Row Format
207///          Little Endian
208/// ```
209///
210/// ## Float Encoding
211///
212/// Floats are converted from IEEE 754 representation to a signed integer representation
213/// by flipping all bar the sign bit if they are negative.
214///
215/// They are then encoded in the same manner as a signed integer.
216///
217/// ## Fixed Length Bytes Encoding
218///
219/// Fixed length bytes are encoded in the same fashion as primitive types above.
220///
221/// For a fixed length array of length `n`:
222///
223/// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes
224///
225/// A valid value is encoded as `1_u8` followed by the value bytes
226///
227/// ## Variable Length Bytes (including Strings) Encoding
228///
229/// A null is encoded as a `0_u8`.
230///
231/// An empty byte array is encoded as `1_u8`.
232///
233/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
234/// encoded using a block based scheme described below.
235///
236/// The byte array is broken up into fixed-width blocks, each block is written in turn
237/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
238/// with `0_u8` and written to the output, followed by the un-padded length in bytes
239/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent
240/// blocks using a length of 32, this is to reduce space amplification for small strings.
241///
242/// Note the following example encodings use a block size of 4 bytes for brevity:
243///
244/// ```text
245///                       ┌───┬───┬───┬───┬───┬───┐
246///  "MEEP"               │02 │'M'│'E'│'E'│'P'│04 │
247///                       └───┴───┴───┴───┴───┴───┘
248///
249///                       ┌───┐
250///  ""                   │01 |
251///                       └───┘
252///
253///  NULL                 ┌───┐
254///                       │00 │
255///                       └───┘
256///
257/// "Defenestration"      ┌───┬───┬───┬───┬───┬───┐
258///                       │02 │'D'│'e'│'f'│'e'│FF │
259///                       └───┼───┼───┼───┼───┼───┤
260///                           │'n'│'e'│'s'│'t'│FF │
261///                           ├───┼───┼───┼───┼───┤
262///                           │'r'│'a'│'t'│'r'│FF │
263///                           ├───┼───┼───┼───┼───┤
264///                           │'a'│'t'│'i'│'o'│FF │
265///                           ├───┼───┼───┼───┼───┤
266///                           │'n'│00 │00 │00 │01 │
267///                           └───┴───┴───┴───┴───┘
268/// ```
269///
270/// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional
271/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256.
272///
273/// ## Dictionary Encoding
274///
275/// Dictionaries are hydrated to their underlying values
276///
277/// ## Struct Encoding
278///
279/// A null is encoded as a `0_u8`.
280///
281/// A valid value is encoded as `1_u8` followed by the row encoding of each child.
282///
283/// This encoding effectively flattens the schema in a depth-first fashion.
284///
285/// For example
286///
287/// ```text
288/// ┌───────┬────────────────────────┬───────┐
289/// │ Int32 │ Struct[Int32, Float32] │ Int32 │
290/// └───────┴────────────────────────┴───────┘
291/// ```
292///
293/// Is encoded as
294///
295/// ```text
296/// ┌───────┬───────────────┬───────┬─────────┬───────┐
297/// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │
298/// └───────┴───────────────┴───────┴─────────┴───────┘
299/// ```
300///
301/// ## List Encoding
302///
303/// Lists are encoded by first encoding all child elements to the row format.
304///
305/// A list value is then encoded as the concatenation of each of the child elements,
306/// separately encoded using the variable length encoding described above, followed
307/// by the variable length encoding of an empty byte array.
308///
309/// For example given:
310///
311/// ```text
312/// [1_u8, 2_u8, 3_u8]
313/// [1_u8, null]
314/// []
315/// null
316/// ```
317///
318/// The elements would be converted to:
319///
320/// ```text
321///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
322///  1  │01│01│  2  │01│02│  3  │01│03│  1  │01│01│  null  │00│00│
323///     └──┴──┘     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
324///```
325///
326/// Which would be encoded as
327///
328/// ```text
329///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
330///  [1_u8, 2_u8, 3_u8]     │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
331///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
332///                          └──── 1_u8 ────┘   └──── 2_u8 ────┘  └──── 3_u8 ────┘
333///
334///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
335///  [1_u8, null]           │02│01│01│00│00│02│02│00│00│00│00│02│01│
336///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
337///                          └──── 1_u8 ────┘   └──── null ────┘
338///
339///```
340///
341/// With `[]` represented by an empty byte array, and `null` a null byte array.
342///
343/// # Ordering
344///
345/// ## Float Ordering
346///
347/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined
348/// in the IEEE 754 (2008 revision) floating point standard.
349///
350/// The ordering established by this does not always agree with the
351/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
352/// they consider negative and positive zero equal, while this does not
353///
354/// ## Null Ordering
355///
356/// The encoding described above will order nulls first, this can be inverted by representing
357/// nulls as `0xFF_u8` instead of `0_u8`
358///
359/// ## Reverse Column Ordering
360///
361/// The order of a given column can be reversed by negating the encoded bytes of non-null values
362///
363/// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
364/// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing
365#[derive(Debug)]
366pub struct RowConverter {
367    fields: Arc<[SortField]>,
368    /// State for codecs
369    codecs: Vec<Codec>,
370}
371
372#[derive(Debug)]
373enum Codec {
374    /// No additional codec state is necessary
375    Stateless,
376    /// A row converter for the dictionary values
377    /// and the encoding of a row containing only nulls
378    Dictionary(RowConverter, OwnedRow),
379    /// A row converter for the child fields
380    /// and the encoding of a row containing only nulls
381    Struct(RowConverter, OwnedRow),
382    /// A row converter for the child field
383    List(RowConverter),
384}
385
386impl Codec {
387    fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
388        match &sort_field.data_type {
389            DataType::Dictionary(_, values) => {
390                let sort_field =
391                    SortField::new_with_options(values.as_ref().clone(), sort_field.options);
392
393                let converter = RowConverter::new(vec![sort_field])?;
394                let null_array = new_null_array(values.as_ref(), 1);
395                let nulls = converter.convert_columns(&[null_array])?;
396
397                let owned = OwnedRow {
398                    data: nulls.buffer.into(),
399                    config: nulls.config,
400                };
401                Ok(Self::Dictionary(converter, owned))
402            }
403            d if !d.is_nested() => Ok(Self::Stateless),
404            DataType::List(f) | DataType::LargeList(f) => {
405                // The encoded contents will be inverted if descending is set to true
406                // As such we set `descending` to false and negate nulls first if it
407                // it set to true
408                let options = SortOptions {
409                    descending: false,
410                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
411                };
412
413                let field = SortField::new_with_options(f.data_type().clone(), options);
414                let converter = RowConverter::new(vec![field])?;
415                Ok(Self::List(converter))
416            }
417            DataType::Struct(f) => {
418                let sort_fields = f
419                    .iter()
420                    .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
421                    .collect();
422
423                let converter = RowConverter::new(sort_fields)?;
424                let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
425
426                let nulls = converter.convert_columns(&nulls)?;
427                let owned = OwnedRow {
428                    data: nulls.buffer.into(),
429                    config: nulls.config,
430                };
431
432                Ok(Self::Struct(converter, owned))
433            }
434            _ => Err(ArrowError::NotYetImplemented(format!(
435                "not yet implemented: {:?}",
436                sort_field.data_type
437            ))),
438        }
439    }
440
441    fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
442        match self {
443            Codec::Stateless => Ok(Encoder::Stateless),
444            Codec::Dictionary(converter, nulls) => {
445                let values = array.as_any_dictionary().values().clone();
446                let rows = converter.convert_columns(&[values])?;
447                Ok(Encoder::Dictionary(rows, nulls.row()))
448            }
449            Codec::Struct(converter, null) => {
450                let v = as_struct_array(array);
451                let rows = converter.convert_columns(v.columns())?;
452                Ok(Encoder::Struct(rows, null.row()))
453            }
454            Codec::List(converter) => {
455                let values = match array.data_type() {
456                    DataType::List(_) => as_list_array(array).values(),
457                    DataType::LargeList(_) => as_large_list_array(array).values(),
458                    _ => unreachable!(),
459                };
460                let rows = converter.convert_columns(&[values.clone()])?;
461                Ok(Encoder::List(rows))
462            }
463        }
464    }
465
466    fn size(&self) -> usize {
467        match self {
468            Codec::Stateless => 0,
469            Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
470            Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
471            Codec::List(converter) => converter.size(),
472        }
473    }
474}
475
476#[derive(Debug)]
477enum Encoder<'a> {
478    /// No additional encoder state is necessary
479    Stateless,
480    /// The encoding of the child array and the encoding of a null row
481    Dictionary(Rows, Row<'a>),
482    /// The row encoding of the child arrays and the encoding of a null row
483    ///
484    /// It is necessary to encode to a temporary [`Rows`] to avoid serializing
485    /// values that are masked by a null in the parent StructArray, otherwise
486    /// this would establish an ordering between semantically null values
487    Struct(Rows, Row<'a>),
488    /// The row encoding of the child array
489    List(Rows),
490}
491
492/// Configure the data type and sort order for a given column
493#[derive(Debug, Clone, PartialEq, Eq)]
494pub struct SortField {
495    /// Sort options
496    options: SortOptions,
497    /// Data type
498    data_type: DataType,
499}
500
501impl SortField {
502    /// Create a new column with the given data type
503    pub fn new(data_type: DataType) -> Self {
504        Self::new_with_options(data_type, Default::default())
505    }
506
507    /// Create a new column with the given data type and [`SortOptions`]
508    pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
509        Self { options, data_type }
510    }
511
512    /// Return size of this instance in bytes.
513    ///
514    /// Includes the size of `Self`.
515    pub fn size(&self) -> usize {
516        self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
517    }
518}
519
520impl RowConverter {
521    /// Create a new [`RowConverter`] with the provided schema
522    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
523        if !Self::supports_fields(&fields) {
524            return Err(ArrowError::NotYetImplemented(format!(
525                "Row format support not yet implemented for: {fields:?}"
526            )));
527        }
528
529        let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
530        Ok(Self {
531            fields: fields.into(),
532            codecs,
533        })
534    }
535
536    /// Check if the given fields are supported by the row format.
537    pub fn supports_fields(fields: &[SortField]) -> bool {
538        fields.iter().all(|x| Self::supports_datatype(&x.data_type))
539    }
540
541    fn supports_datatype(d: &DataType) -> bool {
542        match d {
543            _ if !d.is_nested() => true,
544            DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
545                Self::supports_datatype(f.data_type())
546            }
547            DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
548            _ => false,
549        }
550    }
551
552    /// Convert [`ArrayRef`] columns into [`Rows`]
553    ///
554    /// See [`Row`] for information on when [`Row`] can be compared
555    ///
556    /// # Panics
557    ///
558    /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`]
559    pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
560        let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
561        let mut rows = self.empty_rows(num_rows, 0);
562        self.append(&mut rows, columns)?;
563        Ok(rows)
564    }
565
566    /// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
567    ///
568    /// See [`Row`] for information on when [`Row`] can be compared
569    ///
570    /// # Panics
571    ///
572    /// Panics if
573    /// * The schema of `columns` does not match that provided to [`RowConverter::new`]
574    /// * The provided [`Rows`] were not created by this [`RowConverter`]
575    ///
576    /// ```
577    /// # use std::sync::Arc;
578    /// # use std::collections::HashSet;
579    /// # use arrow_array::cast::AsArray;
580    /// # use arrow_array::StringArray;
581    /// # use arrow_row::{Row, RowConverter, SortField};
582    /// # use arrow_schema::DataType;
583    /// #
584    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
585    /// let a1 = StringArray::from(vec!["hello", "world"]);
586    /// let a2 = StringArray::from(vec!["a", "a", "hello"]);
587    ///
588    /// let mut rows = converter.empty_rows(5, 128);
589    /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
590    /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
591    ///
592    /// let back = converter.convert_rows(&rows).unwrap();
593    /// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
594    /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
595    /// ```
596    pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
597        assert!(
598            Arc::ptr_eq(&rows.config.fields, &self.fields),
599            "rows were not produced by this RowConverter"
600        );
601
602        if columns.len() != self.fields.len() {
603            return Err(ArrowError::InvalidArgumentError(format!(
604                "Incorrect number of arrays provided to RowConverter, expected {} got {}",
605                self.fields.len(),
606                columns.len()
607            )));
608        }
609
610        let encoders = columns
611            .iter()
612            .zip(&self.codecs)
613            .zip(self.fields.iter())
614            .map(|((column, codec), field)| {
615                if !column.data_type().equals_datatype(&field.data_type) {
616                    return Err(ArrowError::InvalidArgumentError(format!(
617                        "RowConverter column schema mismatch, expected {} got {}",
618                        field.data_type,
619                        column.data_type()
620                    )));
621                }
622                codec.encoder(column.as_ref())
623            })
624            .collect::<Result<Vec<_>, _>>()?;
625
626        let write_offset = rows.num_rows();
627        let lengths = row_lengths(columns, &encoders);
628
629        // We initialize the offsets shifted down by one row index.
630        //
631        // As the rows are appended to the offsets will be incremented to match
632        //
633        // For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
634        // The offsets would be initialized to `0, 0, 3, 7`
635        //
636        // Writing the first row entirely would yield `0, 3, 3, 7`
637        // The second, `0, 3, 7, 7`
638        // The third, `0, 3, 7, 13`
639        //
640        // This would be the final offsets for reading
641        //
642        // In this way offsets tracks the position during writing whilst eventually serving
643        // as identifying the offsets of the written rows
644        rows.offsets.reserve(lengths.len());
645        let mut cur_offset = rows.offsets[write_offset];
646        for l in lengths {
647            rows.offsets.push(cur_offset);
648            cur_offset = cur_offset.checked_add(l).expect("overflow");
649        }
650
651        // Note this will not zero out any trailing data in `rows.buffer`,
652        // e.g. resulting from a call to `Rows::clear`, relying instead on the
653        // encoders not assuming a zero-initialized buffer
654        rows.buffer.resize(cur_offset, 0);
655
656        for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
657            // We encode a column at a time to minimise dispatch overheads
658            encode_column(
659                &mut rows.buffer,
660                &mut rows.offsets[write_offset..],
661                column.as_ref(),
662                field.options,
663                &encoder,
664            )
665        }
666
667        if cfg!(debug_assertions) {
668            assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
669            rows.offsets
670                .windows(2)
671                .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
672        }
673
674        Ok(())
675    }
676
677    /// Convert [`Rows`] columns into [`ArrayRef`]
678    ///
679    /// # Panics
680    ///
681    /// Panics if the rows were not produced by this [`RowConverter`]
682    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
683    where
684        I: IntoIterator<Item = Row<'a>>,
685    {
686        let mut validate_utf8 = false;
687        let mut rows: Vec<_> = rows
688            .into_iter()
689            .map(|row| {
690                assert!(
691                    Arc::ptr_eq(&row.config.fields, &self.fields),
692                    "rows were not produced by this RowConverter"
693                );
694                validate_utf8 |= row.config.validate_utf8;
695                row.data
696            })
697            .collect();
698
699        // SAFETY
700        // We have validated that the rows came from this [`RowConverter`]
701        // and therefore must be valid
702        unsafe { self.convert_raw(&mut rows, validate_utf8) }
703    }
704
705    /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
706    /// a total length of `data_capacity`
707    ///
708    /// This can be used to buffer a selection of [`Row`]
709    ///
710    /// ```
711    /// # use std::sync::Arc;
712    /// # use std::collections::HashSet;
713    /// # use arrow_array::cast::AsArray;
714    /// # use arrow_array::StringArray;
715    /// # use arrow_row::{Row, RowConverter, SortField};
716    /// # use arrow_schema::DataType;
717    /// #
718    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
719    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
720    ///
721    /// // Convert to row format and deduplicate
722    /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
723    /// let mut distinct_rows = converter.empty_rows(3, 100);
724    /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
725    /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
726    ///
727    /// // Note: we could skip buffering and feed the filtered iterator directly
728    /// // into convert_rows, this is done for demonstration purposes only
729    /// let distinct = converter.convert_rows(&distinct_rows).unwrap();
730    /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
731    /// assert_eq!(&values, &["hello", "world", "a"]);
732    /// ```
733    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
734        let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
735        offsets.push(0);
736
737        Rows {
738            offsets,
739            buffer: Vec::with_capacity(data_capacity),
740            config: RowConfig {
741                fields: self.fields.clone(),
742                validate_utf8: false,
743            },
744        }
745    }
746
747    /// Create a new [Rows] instance from the given binary data.
748    ///
749    /// ```
750    /// # use std::sync::Arc;
751    /// # use std::collections::HashSet;
752    /// # use arrow_array::cast::AsArray;
753    /// # use arrow_array::StringArray;
754    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
755    /// # use arrow_schema::DataType;
756    /// #
757    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
758    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
759    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
760    ///
761    /// // We can convert rows into binary format and back in batch.
762    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
763    /// let binary = rows.try_into_binary().expect("known-small array");
764    /// let converted = converter.from_binary(binary.clone());
765    /// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
766    /// ```
767    ///
768    /// # Panics
769    ///
770    /// This function expects the passed [BinaryArray] to contain valid row data as produced by this
771    /// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may
772    /// panic if the data is malformed.
773    pub fn from_binary(&self, array: BinaryArray) -> Rows {
774        assert_eq!(
775            array.null_count(),
776            0,
777            "can't construct Rows instance from array with nulls"
778        );
779        Rows {
780            buffer: array.values().to_vec(),
781            offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
782            config: RowConfig {
783                fields: Arc::clone(&self.fields),
784                validate_utf8: true,
785            },
786        }
787    }
788
789    /// Convert raw bytes into [`ArrayRef`]
790    ///
791    /// # Safety
792    ///
793    /// `rows` must contain valid data for this [`RowConverter`]
794    unsafe fn convert_raw(
795        &self,
796        rows: &mut [&[u8]],
797        validate_utf8: bool,
798    ) -> Result<Vec<ArrayRef>, ArrowError> {
799        self.fields
800            .iter()
801            .zip(&self.codecs)
802            .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8))
803            .collect()
804    }
805
806    /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes
807    pub fn parser(&self) -> RowParser {
808        RowParser::new(Arc::clone(&self.fields))
809    }
810
811    /// Returns the size of this instance in bytes
812    ///
813    /// Includes the size of `Self`.
814    pub fn size(&self) -> usize {
815        std::mem::size_of::<Self>()
816            + self.fields.iter().map(|x| x.size()).sum::<usize>()
817            + self.codecs.capacity() * std::mem::size_of::<Codec>()
818            + self.codecs.iter().map(Codec::size).sum::<usize>()
819    }
820}
821
822/// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`]
823#[derive(Debug)]
824pub struct RowParser {
825    config: RowConfig,
826}
827
828impl RowParser {
829    fn new(fields: Arc<[SortField]>) -> Self {
830        Self {
831            config: RowConfig {
832                fields,
833                validate_utf8: true,
834            },
835        }
836    }
837
838    /// Creates a [`Row`] from the provided `bytes`.
839    ///
840    /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with
841    /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic
842    pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
843        Row {
844            data: bytes,
845            config: &self.config,
846        }
847    }
848}
849
850/// The config of a given set of [`Row`]
851#[derive(Debug, Clone)]
852struct RowConfig {
853    /// The schema for these rows
854    fields: Arc<[SortField]>,
855    /// Whether to run UTF-8 validation when converting to arrow arrays
856    validate_utf8: bool,
857}
858
859/// A row-oriented representation of arrow data, that is normalized for comparison.
860///
861/// See the [module level documentation](self) and [`RowConverter`] for more details.
862#[derive(Debug)]
863pub struct Rows {
864    /// Underlying row bytes
865    buffer: Vec<u8>,
866    /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
867    offsets: Vec<usize>,
868    /// The config for these rows
869    config: RowConfig,
870}
871
872impl Rows {
873    /// Append a [`Row`] to this [`Rows`]
874    pub fn push(&mut self, row: Row<'_>) {
875        assert!(
876            Arc::ptr_eq(&row.config.fields, &self.config.fields),
877            "row was not produced by this RowConverter"
878        );
879        self.config.validate_utf8 |= row.config.validate_utf8;
880        self.buffer.extend_from_slice(row.data);
881        self.offsets.push(self.buffer.len())
882    }
883
884    /// Returns the row at index `row`
885    pub fn row(&self, row: usize) -> Row<'_> {
886        assert!(row + 1 < self.offsets.len());
887        unsafe { self.row_unchecked(row) }
888    }
889
890    /// Returns the row at `index` without bounds checking
891    ///
892    /// # Safety
893    /// Caller must ensure that `index` is less than the number of offsets (#rows + 1)
894    pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
895        let end = unsafe { self.offsets.get_unchecked(index + 1) };
896        let start = unsafe { self.offsets.get_unchecked(index) };
897        let data = unsafe { self.buffer.get_unchecked(*start..*end) };
898        Row {
899            data,
900            config: &self.config,
901        }
902    }
903
904    /// Sets the length of this [`Rows`] to 0
905    pub fn clear(&mut self) {
906        self.offsets.truncate(1);
907        self.buffer.clear();
908    }
909
910    /// Returns the number of [`Row`] in this [`Rows`]
911    pub fn num_rows(&self) -> usize {
912        self.offsets.len() - 1
913    }
914
915    /// Returns an iterator over the [`Row`] in this [`Rows`]
916    pub fn iter(&self) -> RowsIter<'_> {
917        self.into_iter()
918    }
919
920    /// Returns the size of this instance in bytes
921    ///
922    /// Includes the size of `Self`.
923    pub fn size(&self) -> usize {
924        // Size of fields is accounted for as part of RowConverter
925        std::mem::size_of::<Self>()
926            + self.buffer.len()
927            + self.offsets.len() * std::mem::size_of::<usize>()
928    }
929
930    /// Create a [BinaryArray] from the [Rows] data without reallocating the
931    /// underlying bytes.
932    ///
933    ///
934    /// ```
935    /// # use std::sync::Arc;
936    /// # use std::collections::HashSet;
937    /// # use arrow_array::cast::AsArray;
938    /// # use arrow_array::StringArray;
939    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
940    /// # use arrow_schema::DataType;
941    /// #
942    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
943    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
944    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
945    ///
946    /// // We can convert rows into binary format and back.
947    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
948    /// let binary = rows.try_into_binary().expect("known-small array");
949    /// let parser = converter.parser();
950    /// let parsed: Vec<OwnedRow> =
951    ///   binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
952    /// assert_eq!(values, parsed);
953    /// ```
954    ///
955    /// # Errors
956    ///
957    /// This function will return an error if there is more data than can be stored in
958    /// a [BinaryArray] -- i.e. if the total data size is more than 2GiB.
959    pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
960        if self.buffer.len() > i32::MAX as usize {
961            return Err(ArrowError::InvalidArgumentError(format!(
962                "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
963                self.buffer.len()
964            )));
965        }
966        // We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
967        let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
968        // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
969        let array = unsafe {
970            BinaryArray::new_unchecked(
971                OffsetBuffer::new_unchecked(offsets_scalar),
972                Buffer::from_vec(self.buffer),
973                None,
974            )
975        };
976        Ok(array)
977    }
978}
979
980impl<'a> IntoIterator for &'a Rows {
981    type Item = Row<'a>;
982    type IntoIter = RowsIter<'a>;
983
984    fn into_iter(self) -> Self::IntoIter {
985        RowsIter {
986            rows: self,
987            start: 0,
988            end: self.num_rows(),
989        }
990    }
991}
992
993/// An iterator over [`Rows`]
994#[derive(Debug)]
995pub struct RowsIter<'a> {
996    rows: &'a Rows,
997    start: usize,
998    end: usize,
999}
1000
1001impl<'a> Iterator for RowsIter<'a> {
1002    type Item = Row<'a>;
1003
1004    fn next(&mut self) -> Option<Self::Item> {
1005        if self.end == self.start {
1006            return None;
1007        }
1008
1009        // SAFETY: We have checked that `start` is less than `end`
1010        let row = unsafe { self.rows.row_unchecked(self.start) };
1011        self.start += 1;
1012        Some(row)
1013    }
1014
1015    fn size_hint(&self) -> (usize, Option<usize>) {
1016        let len = self.len();
1017        (len, Some(len))
1018    }
1019}
1020
1021impl ExactSizeIterator for RowsIter<'_> {
1022    fn len(&self) -> usize {
1023        self.end - self.start
1024    }
1025}
1026
1027impl DoubleEndedIterator for RowsIter<'_> {
1028    fn next_back(&mut self) -> Option<Self::Item> {
1029        if self.end == self.start {
1030            return None;
1031        }
1032        // Safety: We have checked that `start` is less than `end`
1033        let row = unsafe { self.rows.row_unchecked(self.end) };
1034        self.end -= 1;
1035        Some(row)
1036    }
1037}
1038
1039/// A comparable representation of a row.
1040///
1041/// See the [module level documentation](self) for more details.
1042///
1043/// Two [`Row`] can only be compared if they both belong to [`Rows`]
1044/// returned by calls to [`RowConverter::convert_columns`] on the same
1045/// [`RowConverter`]. If different [`RowConverter`]s are used, any
1046/// ordering established by comparing the [`Row`] is arbitrary.
1047#[derive(Debug, Copy, Clone)]
1048pub struct Row<'a> {
1049    data: &'a [u8],
1050    config: &'a RowConfig,
1051}
1052
1053impl<'a> Row<'a> {
1054    /// Create owned version of the row to detach it from the shared [`Rows`].
1055    pub fn owned(&self) -> OwnedRow {
1056        OwnedRow {
1057            data: self.data.into(),
1058            config: self.config.clone(),
1059        }
1060    }
1061
1062    /// The row's bytes, with the lifetime of the underlying data.
1063    pub fn data(&self) -> &'a [u8] {
1064        self.data
1065    }
1066}
1067
1068// Manually derive these as don't wish to include `fields`
1069
1070impl PartialEq for Row<'_> {
1071    #[inline]
1072    fn eq(&self, other: &Self) -> bool {
1073        self.data.eq(other.data)
1074    }
1075}
1076
1077impl Eq for Row<'_> {}
1078
1079impl PartialOrd for Row<'_> {
1080    #[inline]
1081    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1082        Some(self.cmp(other))
1083    }
1084}
1085
1086impl Ord for Row<'_> {
1087    #[inline]
1088    fn cmp(&self, other: &Self) -> Ordering {
1089        self.data.cmp(other.data)
1090    }
1091}
1092
1093impl Hash for Row<'_> {
1094    #[inline]
1095    fn hash<H: Hasher>(&self, state: &mut H) {
1096        self.data.hash(state)
1097    }
1098}
1099
1100impl AsRef<[u8]> for Row<'_> {
1101    #[inline]
1102    fn as_ref(&self) -> &[u8] {
1103        self.data
1104    }
1105}
1106
1107/// Owned version of a [`Row`] that can be moved/cloned freely.
1108///
1109/// This contains the data for the one specific row (not the entire buffer of all rows).
1110#[derive(Debug, Clone)]
1111pub struct OwnedRow {
1112    data: Box<[u8]>,
1113    config: RowConfig,
1114}
1115
1116impl OwnedRow {
1117    /// Get borrowed [`Row`] from owned version.
1118    ///
1119    /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`].
1120    pub fn row(&self) -> Row<'_> {
1121        Row {
1122            data: &self.data,
1123            config: &self.config,
1124        }
1125    }
1126}
1127
1128// Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here.
1129
1130impl PartialEq for OwnedRow {
1131    #[inline]
1132    fn eq(&self, other: &Self) -> bool {
1133        self.row().eq(&other.row())
1134    }
1135}
1136
1137impl Eq for OwnedRow {}
1138
1139impl PartialOrd for OwnedRow {
1140    #[inline]
1141    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1142        Some(self.cmp(other))
1143    }
1144}
1145
1146impl Ord for OwnedRow {
1147    #[inline]
1148    fn cmp(&self, other: &Self) -> Ordering {
1149        self.row().cmp(&other.row())
1150    }
1151}
1152
1153impl Hash for OwnedRow {
1154    #[inline]
1155    fn hash<H: Hasher>(&self, state: &mut H) {
1156        self.row().hash(state)
1157    }
1158}
1159
1160impl AsRef<[u8]> for OwnedRow {
1161    #[inline]
1162    fn as_ref(&self) -> &[u8] {
1163        &self.data
1164    }
1165}
1166
1167/// Returns the null sentinel, negated if `invert` is true
1168#[inline]
1169fn null_sentinel(options: SortOptions) -> u8 {
1170    match options.nulls_first {
1171        true => 0,
1172        false => 0xFF,
1173    }
1174}
1175
1176/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
1177fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> {
1178    use fixed::FixedLengthEncoding;
1179
1180    let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1181    let mut lengths = vec![0; num_rows];
1182
1183    for (array, encoder) in cols.iter().zip(encoders) {
1184        match encoder {
1185            Encoder::Stateless => {
1186                downcast_primitive_array! {
1187                    array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)),
1188                    DataType::Null => {},
1189                    DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN),
1190                    DataType::Binary => as_generic_binary_array::<i32>(array)
1191                        .iter()
1192                        .zip(lengths.iter_mut())
1193                        .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
1194                    DataType::LargeBinary => as_generic_binary_array::<i64>(array)
1195                        .iter()
1196                        .zip(lengths.iter_mut())
1197                        .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
1198                    DataType::BinaryView => array.as_binary_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| {
1199                            *length += variable::encoded_len(slice)
1200                        }),
1201                    DataType::Utf8 => array.as_string::<i32>()
1202                        .iter()
1203                        .zip(lengths.iter_mut())
1204                        .for_each(|(slice, length)| {
1205                            *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
1206                        }),
1207                    DataType::LargeUtf8 => array.as_string::<i64>()
1208                        .iter()
1209                        .zip(lengths.iter_mut())
1210                        .for_each(|(slice, length)| {
1211                            *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
1212                        }),
1213                    DataType::Utf8View => array.as_string_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| {
1214                        *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
1215                    }),
1216                    DataType::FixedSizeBinary(len) => {
1217                        let len = len.to_usize().unwrap();
1218                        lengths.iter_mut().for_each(|x| *x += 1 + len)
1219                    }
1220                    _ => unimplemented!("unsupported data type: {}", array.data_type()),
1221                }
1222            }
1223            Encoder::Dictionary(values, null) => {
1224                downcast_dictionary_array! {
1225                    array => {
1226                        for (v, length) in array.keys().iter().zip(lengths.iter_mut()) {
1227                            *length += match v {
1228                                Some(k) => values.row(k.as_usize()).data.len(),
1229                                None => null.data.len(),
1230                            }
1231                        }
1232                    }
1233                    _ => unreachable!(),
1234                }
1235            }
1236            Encoder::Struct(rows, null) => {
1237                let array = as_struct_array(array);
1238                lengths.iter_mut().enumerate().for_each(|(idx, length)| {
1239                    match array.is_valid(idx) {
1240                        true => *length += 1 + rows.row(idx).as_ref().len(),
1241                        false => *length += 1 + null.data.len(),
1242                    }
1243                });
1244            }
1245            Encoder::List(rows) => match array.data_type() {
1246                DataType::List(_) => {
1247                    list::compute_lengths(&mut lengths, rows, as_list_array(array))
1248                }
1249                DataType::LargeList(_) => {
1250                    list::compute_lengths(&mut lengths, rows, as_large_list_array(array))
1251                }
1252                _ => unreachable!(),
1253            },
1254        }
1255    }
1256
1257    lengths
1258}
1259
1260/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
1261fn encode_column(
1262    data: &mut [u8],
1263    offsets: &mut [usize],
1264    column: &dyn Array,
1265    opts: SortOptions,
1266    encoder: &Encoder<'_>,
1267) {
1268    match encoder {
1269        Encoder::Stateless => {
1270            downcast_primitive_array! {
1271                column => {
1272                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1273                        fixed::encode(data, offsets, column.values(), nulls, opts)
1274                    } else {
1275                        fixed::encode_not_null(data, offsets, column.values(), opts)
1276                    }
1277                }
1278                DataType::Null => {}
1279                DataType::Boolean => {
1280                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1281                        fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1282                    } else {
1283                        fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1284                    }
1285                }
1286                DataType::Binary => {
1287                    variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1288                }
1289                DataType::BinaryView => {
1290                    variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1291                }
1292                DataType::LargeBinary => {
1293                    variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1294                }
1295                DataType::Utf8 => variable::encode(
1296                    data, offsets,
1297                    column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1298                    opts,
1299                ),
1300                DataType::LargeUtf8 => variable::encode(
1301                    data, offsets,
1302                    column.as_string::<i64>()
1303                        .iter()
1304                        .map(|x| x.map(|x| x.as_bytes())),
1305                    opts,
1306                ),
1307                DataType::Utf8View => variable::encode(
1308                    data, offsets,
1309                    column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1310                    opts,
1311                ),
1312                DataType::FixedSizeBinary(_) => {
1313                    let array = column.as_any().downcast_ref().unwrap();
1314                    fixed::encode_fixed_size_binary(data, offsets, array, opts)
1315                }
1316                _ => unimplemented!("unsupported data type: {}", column.data_type()),
1317            }
1318        }
1319        Encoder::Dictionary(values, nulls) => {
1320            downcast_dictionary_array! {
1321                column => encode_dictionary_values(data, offsets, column, values, nulls),
1322                _ => unreachable!()
1323            }
1324        }
1325        Encoder::Struct(rows, null) => {
1326            let array = as_struct_array(column);
1327            let null_sentinel = null_sentinel(opts);
1328            offsets
1329                .iter_mut()
1330                .skip(1)
1331                .enumerate()
1332                .for_each(|(idx, offset)| {
1333                    let (row, sentinel) = match array.is_valid(idx) {
1334                        true => (rows.row(idx), 0x01),
1335                        false => (*null, null_sentinel),
1336                    };
1337                    let end_offset = *offset + 1 + row.as_ref().len();
1338                    data[*offset] = sentinel;
1339                    data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1340                    *offset = end_offset;
1341                })
1342        }
1343        Encoder::List(rows) => match column.data_type() {
1344            DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1345            DataType::LargeList(_) => {
1346                list::encode(data, offsets, rows, opts, as_large_list_array(column))
1347            }
1348            _ => unreachable!(),
1349        },
1350    }
1351}
1352
1353/// Encode dictionary values not preserving the dictionary encoding
1354pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1355    data: &mut [u8],
1356    offsets: &mut [usize],
1357    column: &DictionaryArray<K>,
1358    values: &Rows,
1359    null: &Row<'_>,
1360) {
1361    for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1362        let row = match k {
1363            Some(k) => values.row(k.as_usize()).data,
1364            None => null.data,
1365        };
1366        let end_offset = *offset + row.len();
1367        data[*offset..end_offset].copy_from_slice(row);
1368        *offset = end_offset;
1369    }
1370}
1371
1372macro_rules! decode_primitive_helper {
1373    ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1374        Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1375    };
1376}
1377
1378/// Decodes a the provided `field` from `rows`
1379///
1380/// # Safety
1381///
1382/// Rows must contain valid data for the provided field
1383unsafe fn decode_column(
1384    field: &SortField,
1385    rows: &mut [&[u8]],
1386    codec: &Codec,
1387    validate_utf8: bool,
1388) -> Result<ArrayRef, ArrowError> {
1389    let options = field.options;
1390
1391    let array: ArrayRef = match codec {
1392        Codec::Stateless => {
1393            let data_type = field.data_type.clone();
1394            downcast_primitive! {
1395                data_type => (decode_primitive_helper, rows, data_type, options),
1396                DataType::Null => Arc::new(NullArray::new(rows.len())),
1397                DataType::Boolean => Arc::new(decode_bool(rows, options)),
1398                DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1399                DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1400                DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1401                DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1402                DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)),
1403                DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)),
1404                DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)),
1405                _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {}", data_type)))
1406            }
1407        }
1408        Codec::Dictionary(converter, _) => {
1409            let cols = converter.convert_raw(rows, validate_utf8)?;
1410            cols.into_iter().next().unwrap()
1411        }
1412        Codec::Struct(converter, _) => {
1413            let (null_count, nulls) = fixed::decode_nulls(rows);
1414            rows.iter_mut().for_each(|row| *row = &row[1..]);
1415            let children = converter.convert_raw(rows, validate_utf8)?;
1416
1417            let child_data = children.iter().map(|c| c.to_data()).collect();
1418            let builder = ArrayDataBuilder::new(field.data_type.clone())
1419                .len(rows.len())
1420                .null_count(null_count)
1421                .null_bit_buffer(Some(nulls))
1422                .child_data(child_data);
1423
1424            Arc::new(StructArray::from(builder.build_unchecked()))
1425        }
1426        Codec::List(converter) => match &field.data_type {
1427            DataType::List(_) => {
1428                Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
1429            }
1430            DataType::LargeList(_) => {
1431                Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
1432            }
1433            _ => unreachable!(),
1434        },
1435    };
1436    Ok(array)
1437}
1438
1439#[cfg(test)]
1440mod tests {
1441    use rand::distr::uniform::SampleUniform;
1442    use rand::distr::{Distribution, StandardUniform};
1443    use rand::{rng, Rng};
1444
1445    use arrow_array::builder::*;
1446    use arrow_array::types::*;
1447    use arrow_array::*;
1448    use arrow_buffer::{i256, NullBuffer};
1449    use arrow_buffer::{Buffer, OffsetBuffer};
1450    use arrow_cast::display::{ArrayFormatter, FormatOptions};
1451    use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1452
1453    use super::*;
1454
1455    #[test]
1456    fn test_fixed_width() {
1457        let cols = [
1458            Arc::new(Int16Array::from_iter([
1459                Some(1),
1460                Some(2),
1461                None,
1462                Some(-5),
1463                Some(2),
1464                Some(2),
1465                Some(0),
1466            ])) as ArrayRef,
1467            Arc::new(Float32Array::from_iter([
1468                Some(1.3),
1469                Some(2.5),
1470                None,
1471                Some(4.),
1472                Some(0.1),
1473                Some(-4.),
1474                Some(-0.),
1475            ])) as ArrayRef,
1476        ];
1477
1478        let converter = RowConverter::new(vec![
1479            SortField::new(DataType::Int16),
1480            SortField::new(DataType::Float32),
1481        ])
1482        .unwrap();
1483        let rows = converter.convert_columns(&cols).unwrap();
1484
1485        assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1486        assert_eq!(
1487            rows.buffer,
1488            &[
1489                1, 128, 1, //
1490                1, 191, 166, 102, 102, //
1491                1, 128, 2, //
1492                1, 192, 32, 0, 0, //
1493                0, 0, 0, //
1494                0, 0, 0, 0, 0, //
1495                1, 127, 251, //
1496                1, 192, 128, 0, 0, //
1497                1, 128, 2, //
1498                1, 189, 204, 204, 205, //
1499                1, 128, 2, //
1500                1, 63, 127, 255, 255, //
1501                1, 128, 0, //
1502                1, 127, 255, 255, 255 //
1503            ]
1504        );
1505
1506        assert!(rows.row(3) < rows.row(6));
1507        assert!(rows.row(0) < rows.row(1));
1508        assert!(rows.row(3) < rows.row(0));
1509        assert!(rows.row(4) < rows.row(1));
1510        assert!(rows.row(5) < rows.row(4));
1511
1512        let back = converter.convert_rows(&rows).unwrap();
1513        for (expected, actual) in cols.iter().zip(&back) {
1514            assert_eq!(expected, actual);
1515        }
1516    }
1517
1518    #[test]
1519    fn test_decimal128() {
1520        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1521            DECIMAL128_MAX_PRECISION,
1522            7,
1523        ))])
1524        .unwrap();
1525        let col = Arc::new(
1526            Decimal128Array::from_iter([
1527                None,
1528                Some(i128::MIN),
1529                Some(-13),
1530                Some(46_i128),
1531                Some(5456_i128),
1532                Some(i128::MAX),
1533            ])
1534            .with_precision_and_scale(38, 7)
1535            .unwrap(),
1536        ) as ArrayRef;
1537
1538        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1539        for i in 0..rows.num_rows() - 1 {
1540            assert!(rows.row(i) < rows.row(i + 1));
1541        }
1542
1543        let back = converter.convert_rows(&rows).unwrap();
1544        assert_eq!(back.len(), 1);
1545        assert_eq!(col.as_ref(), back[0].as_ref())
1546    }
1547
1548    #[test]
1549    fn test_decimal256() {
1550        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1551            DECIMAL256_MAX_PRECISION,
1552            7,
1553        ))])
1554        .unwrap();
1555        let col = Arc::new(
1556            Decimal256Array::from_iter([
1557                None,
1558                Some(i256::MIN),
1559                Some(i256::from_parts(0, -1)),
1560                Some(i256::from_parts(u128::MAX, -1)),
1561                Some(i256::from_parts(u128::MAX, 0)),
1562                Some(i256::from_parts(0, 46_i128)),
1563                Some(i256::from_parts(5, 46_i128)),
1564                Some(i256::MAX),
1565            ])
1566            .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1567            .unwrap(),
1568        ) as ArrayRef;
1569
1570        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1571        for i in 0..rows.num_rows() - 1 {
1572            assert!(rows.row(i) < rows.row(i + 1));
1573        }
1574
1575        let back = converter.convert_rows(&rows).unwrap();
1576        assert_eq!(back.len(), 1);
1577        assert_eq!(col.as_ref(), back[0].as_ref())
1578    }
1579
1580    #[test]
1581    fn test_bool() {
1582        let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1583
1584        let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1585
1586        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1587        assert!(rows.row(2) > rows.row(1));
1588        assert!(rows.row(2) > rows.row(0));
1589        assert!(rows.row(1) > rows.row(0));
1590
1591        let cols = converter.convert_rows(&rows).unwrap();
1592        assert_eq!(&cols[0], &col);
1593
1594        let converter = RowConverter::new(vec![SortField::new_with_options(
1595            DataType::Boolean,
1596            SortOptions::default().desc().with_nulls_first(false),
1597        )])
1598        .unwrap();
1599
1600        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1601        assert!(rows.row(2) < rows.row(1));
1602        assert!(rows.row(2) < rows.row(0));
1603        assert!(rows.row(1) < rows.row(0));
1604        let cols = converter.convert_rows(&rows).unwrap();
1605        assert_eq!(&cols[0], &col);
1606    }
1607
1608    #[test]
1609    fn test_timezone() {
1610        let a =
1611            TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
1612        let d = a.data_type().clone();
1613
1614        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
1615        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
1616        let back = converter.convert_rows(&rows).unwrap();
1617        assert_eq!(back.len(), 1);
1618        assert_eq!(back[0].data_type(), &d);
1619
1620        // Test dictionary
1621        let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
1622        a.append(34).unwrap();
1623        a.append_null();
1624        a.append(345).unwrap();
1625
1626        // Construct dictionary with a timezone
1627        let dict = a.finish();
1628        let values = TimestampNanosecondArray::from(dict.values().to_data());
1629        let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
1630        let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
1631        let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
1632
1633        assert_eq!(dict_with_tz.data_type(), &d);
1634        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
1635        let rows = converter
1636            .convert_columns(&[Arc::new(dict_with_tz) as _])
1637            .unwrap();
1638        let back = converter.convert_rows(&rows).unwrap();
1639        assert_eq!(back.len(), 1);
1640        assert_eq!(back[0].data_type(), &v);
1641    }
1642
1643    #[test]
1644    fn test_null_encoding() {
1645        let col = Arc::new(NullArray::new(10));
1646        let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
1647        let rows = converter.convert_columns(&[col]).unwrap();
1648        assert_eq!(rows.num_rows(), 10);
1649        assert_eq!(rows.row(1).data.len(), 0);
1650    }
1651
1652    #[test]
1653    fn test_variable_width() {
1654        let col = Arc::new(StringArray::from_iter([
1655            Some("hello"),
1656            Some("he"),
1657            None,
1658            Some("foo"),
1659            Some(""),
1660        ])) as ArrayRef;
1661
1662        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1663        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1664
1665        assert!(rows.row(1) < rows.row(0));
1666        assert!(rows.row(2) < rows.row(4));
1667        assert!(rows.row(3) < rows.row(0));
1668        assert!(rows.row(3) < rows.row(1));
1669
1670        let cols = converter.convert_rows(&rows).unwrap();
1671        assert_eq!(&cols[0], &col);
1672
1673        let col = Arc::new(BinaryArray::from_iter([
1674            None,
1675            Some(vec![0_u8; 0]),
1676            Some(vec![0_u8; 6]),
1677            Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
1678            Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
1679            Some(vec![0_u8; variable::BLOCK_SIZE]),
1680            Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
1681            Some(vec![1_u8; 6]),
1682            Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
1683            Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
1684            Some(vec![1_u8; variable::BLOCK_SIZE]),
1685            Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
1686            Some(vec![0xFF_u8; 6]),
1687            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
1688            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
1689            Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
1690            Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
1691        ])) as ArrayRef;
1692
1693        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1694        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1695
1696        for i in 0..rows.num_rows() {
1697            for j in i + 1..rows.num_rows() {
1698                assert!(
1699                    rows.row(i) < rows.row(j),
1700                    "{} < {} - {:?} < {:?}",
1701                    i,
1702                    j,
1703                    rows.row(i),
1704                    rows.row(j)
1705                );
1706            }
1707        }
1708
1709        let cols = converter.convert_rows(&rows).unwrap();
1710        assert_eq!(&cols[0], &col);
1711
1712        let converter = RowConverter::new(vec![SortField::new_with_options(
1713            DataType::Binary,
1714            SortOptions::default().desc().with_nulls_first(false),
1715        )])
1716        .unwrap();
1717        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1718
1719        for i in 0..rows.num_rows() {
1720            for j in i + 1..rows.num_rows() {
1721                assert!(
1722                    rows.row(i) > rows.row(j),
1723                    "{} > {} - {:?} > {:?}",
1724                    i,
1725                    j,
1726                    rows.row(i),
1727                    rows.row(j)
1728                );
1729            }
1730        }
1731
1732        let cols = converter.convert_rows(&rows).unwrap();
1733        assert_eq!(&cols[0], &col);
1734    }
1735
1736    /// If `exact` is false performs a logical comparison between a and dictionary-encoded b
1737    fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
1738        match b.data_type() {
1739            DataType::Dictionary(_, v) => {
1740                assert_eq!(a.data_type(), v.as_ref());
1741                let b = arrow_cast::cast(b, v).unwrap();
1742                assert_eq!(a, b.as_ref())
1743            }
1744            _ => assert_eq!(a, b),
1745        }
1746    }
1747
1748    #[test]
1749    fn test_string_dictionary() {
1750        let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1751            Some("foo"),
1752            Some("hello"),
1753            Some("he"),
1754            None,
1755            Some("hello"),
1756            Some(""),
1757            Some("hello"),
1758            Some("hello"),
1759        ])) as ArrayRef;
1760
1761        let field = SortField::new(a.data_type().clone());
1762        let converter = RowConverter::new(vec![field]).unwrap();
1763        let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1764
1765        assert!(rows_a.row(3) < rows_a.row(5));
1766        assert!(rows_a.row(2) < rows_a.row(1));
1767        assert!(rows_a.row(0) < rows_a.row(1));
1768        assert!(rows_a.row(3) < rows_a.row(0));
1769
1770        assert_eq!(rows_a.row(1), rows_a.row(4));
1771        assert_eq!(rows_a.row(1), rows_a.row(6));
1772        assert_eq!(rows_a.row(1), rows_a.row(7));
1773
1774        let cols = converter.convert_rows(&rows_a).unwrap();
1775        dictionary_eq(&cols[0], &a);
1776
1777        let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1778            Some("hello"),
1779            None,
1780            Some("cupcakes"),
1781        ])) as ArrayRef;
1782
1783        let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
1784        assert_eq!(rows_a.row(1), rows_b.row(0));
1785        assert_eq!(rows_a.row(3), rows_b.row(1));
1786        assert!(rows_b.row(2) < rows_a.row(0));
1787
1788        let cols = converter.convert_rows(&rows_b).unwrap();
1789        dictionary_eq(&cols[0], &b);
1790
1791        let converter = RowConverter::new(vec![SortField::new_with_options(
1792            a.data_type().clone(),
1793            SortOptions::default().desc().with_nulls_first(false),
1794        )])
1795        .unwrap();
1796
1797        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1798        assert!(rows_c.row(3) > rows_c.row(5));
1799        assert!(rows_c.row(2) > rows_c.row(1));
1800        assert!(rows_c.row(0) > rows_c.row(1));
1801        assert!(rows_c.row(3) > rows_c.row(0));
1802
1803        let cols = converter.convert_rows(&rows_c).unwrap();
1804        dictionary_eq(&cols[0], &a);
1805
1806        let converter = RowConverter::new(vec![SortField::new_with_options(
1807            a.data_type().clone(),
1808            SortOptions::default().desc().with_nulls_first(true),
1809        )])
1810        .unwrap();
1811
1812        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1813        assert!(rows_c.row(3) < rows_c.row(5));
1814        assert!(rows_c.row(2) > rows_c.row(1));
1815        assert!(rows_c.row(0) > rows_c.row(1));
1816        assert!(rows_c.row(3) < rows_c.row(0));
1817
1818        let cols = converter.convert_rows(&rows_c).unwrap();
1819        dictionary_eq(&cols[0], &a);
1820    }
1821
1822    #[test]
1823    fn test_struct() {
1824        // Test basic
1825        let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
1826        let a_f = Arc::new(Field::new("int", DataType::Int32, false));
1827        let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
1828        let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
1829        let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
1830
1831        let sort_fields = vec![SortField::new(s1.data_type().clone())];
1832        let converter = RowConverter::new(sort_fields).unwrap();
1833        let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
1834
1835        for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
1836            assert!(a < b);
1837        }
1838
1839        let back = converter.convert_rows(&r1).unwrap();
1840        assert_eq!(back.len(), 1);
1841        assert_eq!(&back[0], &s1);
1842
1843        // Test struct nullability
1844        let data = s1
1845            .to_data()
1846            .into_builder()
1847            .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
1848            .null_count(2)
1849            .build()
1850            .unwrap();
1851
1852        let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
1853        let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
1854        assert_eq!(r2.row(0), r2.row(2)); // Nulls equal
1855        assert!(r2.row(0) < r2.row(1)); // Nulls first
1856        assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null
1857        assert_eq!(r1.row(1), r2.row(1)); // Values equal
1858
1859        let back = converter.convert_rows(&r2).unwrap();
1860        assert_eq!(back.len(), 1);
1861        assert_eq!(&back[0], &s2);
1862
1863        back[0].to_data().validate_full().unwrap();
1864    }
1865
1866    #[test]
1867    fn test_primitive_dictionary() {
1868        let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
1869        builder.append(2).unwrap();
1870        builder.append(3).unwrap();
1871        builder.append(0).unwrap();
1872        builder.append_null();
1873        builder.append(5).unwrap();
1874        builder.append(3).unwrap();
1875        builder.append(-1).unwrap();
1876
1877        let a = builder.finish();
1878        let data_type = a.data_type().clone();
1879        let columns = [Arc::new(a) as ArrayRef];
1880
1881        let field = SortField::new(data_type.clone());
1882        let converter = RowConverter::new(vec![field]).unwrap();
1883        let rows = converter.convert_columns(&columns).unwrap();
1884        assert!(rows.row(0) < rows.row(1));
1885        assert!(rows.row(2) < rows.row(0));
1886        assert!(rows.row(3) < rows.row(2));
1887        assert!(rows.row(6) < rows.row(2));
1888        assert!(rows.row(3) < rows.row(6));
1889    }
1890
1891    #[test]
1892    fn test_dictionary_nulls() {
1893        let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
1894        let keys =
1895            Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
1896
1897        let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
1898        let data = keys
1899            .into_builder()
1900            .data_type(data_type.clone())
1901            .child_data(vec![values])
1902            .build()
1903            .unwrap();
1904
1905        let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
1906        let field = SortField::new(data_type.clone());
1907        let converter = RowConverter::new(vec![field]).unwrap();
1908        let rows = converter.convert_columns(&columns).unwrap();
1909
1910        assert_eq!(rows.row(0), rows.row(1));
1911        assert_eq!(rows.row(3), rows.row(4));
1912        assert_eq!(rows.row(4), rows.row(5));
1913        assert!(rows.row(3) < rows.row(0));
1914    }
1915
1916    #[test]
1917    #[should_panic(expected = "Encountered non UTF-8 data")]
1918    fn test_invalid_utf8() {
1919        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1920        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
1921        let rows = converter.convert_columns(&[array]).unwrap();
1922        let binary_row = rows.row(0);
1923
1924        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1925        let parser = converter.parser();
1926        let utf8_row = parser.parse(binary_row.as_ref());
1927
1928        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
1929    }
1930
1931    #[test]
1932    #[should_panic(expected = "Encountered non UTF-8 data")]
1933    fn test_invalid_utf8_array() {
1934        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1935        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
1936        let rows = converter.convert_columns(&[array]).unwrap();
1937        let binary_rows = rows.try_into_binary().expect("known-small rows");
1938
1939        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1940        let parsed = converter.from_binary(binary_rows);
1941
1942        converter.convert_rows(parsed.iter()).unwrap();
1943    }
1944
1945    #[test]
1946    #[should_panic(expected = "index out of bounds")]
1947    fn test_invalid_empty() {
1948        let binary_row: &[u8] = &[];
1949
1950        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1951        let parser = converter.parser();
1952        let utf8_row = parser.parse(binary_row.as_ref());
1953
1954        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
1955    }
1956
1957    #[test]
1958    #[should_panic(expected = "index out of bounds")]
1959    fn test_invalid_empty_array() {
1960        let row: &[u8] = &[];
1961        let binary_rows = BinaryArray::from(vec![row]);
1962
1963        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1964        let parsed = converter.from_binary(binary_rows);
1965
1966        converter.convert_rows(parsed.iter()).unwrap();
1967    }
1968
1969    #[test]
1970    #[should_panic(expected = "index out of bounds")]
1971    fn test_invalid_truncated() {
1972        let binary_row: &[u8] = &[0x02];
1973
1974        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1975        let parser = converter.parser();
1976        let utf8_row = parser.parse(binary_row.as_ref());
1977
1978        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
1979    }
1980
1981    #[test]
1982    #[should_panic(expected = "index out of bounds")]
1983    fn test_invalid_truncated_array() {
1984        let row: &[u8] = &[0x02];
1985        let binary_rows = BinaryArray::from(vec![row]);
1986
1987        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1988        let parsed = converter.from_binary(binary_rows);
1989
1990        converter.convert_rows(parsed.iter()).unwrap();
1991    }
1992
1993    #[test]
1994    #[should_panic(expected = "rows were not produced by this RowConverter")]
1995    fn test_different_converter() {
1996        let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
1997        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
1998        let rows = converter.convert_columns(&[values]).unwrap();
1999
2000        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2001        let _ = converter.convert_rows(&rows);
2002    }
2003
2004    fn test_single_list<O: OffsetSizeTrait>() {
2005        let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2006        builder.values().append_value(32);
2007        builder.values().append_value(52);
2008        builder.values().append_value(32);
2009        builder.append(true);
2010        builder.values().append_value(32);
2011        builder.values().append_value(52);
2012        builder.values().append_value(12);
2013        builder.append(true);
2014        builder.values().append_value(32);
2015        builder.values().append_value(52);
2016        builder.append(true);
2017        builder.values().append_value(32); // MASKED
2018        builder.values().append_value(52); // MASKED
2019        builder.append(false);
2020        builder.values().append_value(32);
2021        builder.values().append_null();
2022        builder.append(true);
2023        builder.append(true);
2024
2025        let list = Arc::new(builder.finish()) as ArrayRef;
2026        let d = list.data_type().clone();
2027
2028        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2029
2030        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2031        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2032        assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12]
2033        assert!(rows.row(3) < rows.row(2)); // null < [32, 42]
2034        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42]
2035        assert!(rows.row(5) < rows.row(2)); // [] < [32, 42]
2036        assert!(rows.row(3) < rows.row(5)); // null < []
2037
2038        let back = converter.convert_rows(&rows).unwrap();
2039        assert_eq!(back.len(), 1);
2040        back[0].to_data().validate_full().unwrap();
2041        assert_eq!(&back[0], &list);
2042
2043        let options = SortOptions::default().asc().with_nulls_first(false);
2044        let field = SortField::new_with_options(d.clone(), options);
2045        let converter = RowConverter::new(vec![field]).unwrap();
2046        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2047
2048        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2049        assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12]
2050        assert!(rows.row(3) > rows.row(2)); // null > [32, 42]
2051        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42]
2052        assert!(rows.row(5) < rows.row(2)); // [] < [32, 42]
2053        assert!(rows.row(3) > rows.row(5)); // null > []
2054
2055        let back = converter.convert_rows(&rows).unwrap();
2056        assert_eq!(back.len(), 1);
2057        back[0].to_data().validate_full().unwrap();
2058        assert_eq!(&back[0], &list);
2059
2060        let options = SortOptions::default().desc().with_nulls_first(false);
2061        let field = SortField::new_with_options(d.clone(), options);
2062        let converter = RowConverter::new(vec![field]).unwrap();
2063        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2064
2065        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2066        assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12]
2067        assert!(rows.row(3) > rows.row(2)); // null > [32, 42]
2068        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42]
2069        assert!(rows.row(5) > rows.row(2)); // [] > [32, 42]
2070        assert!(rows.row(3) > rows.row(5)); // null > []
2071
2072        let back = converter.convert_rows(&rows).unwrap();
2073        assert_eq!(back.len(), 1);
2074        back[0].to_data().validate_full().unwrap();
2075        assert_eq!(&back[0], &list);
2076
2077        let options = SortOptions::default().desc().with_nulls_first(true);
2078        let field = SortField::new_with_options(d, options);
2079        let converter = RowConverter::new(vec![field]).unwrap();
2080        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2081
2082        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2083        assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12]
2084        assert!(rows.row(3) < rows.row(2)); // null < [32, 42]
2085        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42]
2086        assert!(rows.row(5) > rows.row(2)); // [] > [32, 42]
2087        assert!(rows.row(3) < rows.row(5)); // null < []
2088
2089        let back = converter.convert_rows(&rows).unwrap();
2090        assert_eq!(back.len(), 1);
2091        back[0].to_data().validate_full().unwrap();
2092        assert_eq!(&back[0], &list);
2093    }
2094
2095    fn test_nested_list<O: OffsetSizeTrait>() {
2096        let mut builder =
2097            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2098
2099        builder.values().values().append_value(1);
2100        builder.values().values().append_value(2);
2101        builder.values().append(true);
2102        builder.values().values().append_value(1);
2103        builder.values().values().append_null();
2104        builder.values().append(true);
2105        builder.append(true);
2106
2107        builder.values().values().append_value(1);
2108        builder.values().values().append_null();
2109        builder.values().append(true);
2110        builder.values().values().append_value(1);
2111        builder.values().values().append_null();
2112        builder.values().append(true);
2113        builder.append(true);
2114
2115        builder.values().values().append_value(1);
2116        builder.values().values().append_null();
2117        builder.values().append(true);
2118        builder.values().append(false);
2119        builder.append(true);
2120        builder.append(false);
2121
2122        builder.values().values().append_value(1);
2123        builder.values().values().append_value(2);
2124        builder.values().append(true);
2125        builder.append(true);
2126
2127        let list = Arc::new(builder.finish()) as ArrayRef;
2128        let d = list.data_type().clone();
2129
2130        // [
2131        //   [[1, 2], [1, null]],
2132        //   [[1, null], [1, null]],
2133        //   [[1, null], null]
2134        //   null
2135        //   [[1, 2]]
2136        // ]
2137        let options = SortOptions::default().asc().with_nulls_first(true);
2138        let field = SortField::new_with_options(d.clone(), options);
2139        let converter = RowConverter::new(vec![field]).unwrap();
2140        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2141
2142        assert!(rows.row(0) > rows.row(1));
2143        assert!(rows.row(1) > rows.row(2));
2144        assert!(rows.row(2) > rows.row(3));
2145        assert!(rows.row(4) < rows.row(0));
2146        assert!(rows.row(4) > rows.row(1));
2147
2148        let back = converter.convert_rows(&rows).unwrap();
2149        assert_eq!(back.len(), 1);
2150        back[0].to_data().validate_full().unwrap();
2151        assert_eq!(&back[0], &list);
2152
2153        let options = SortOptions::default().desc().with_nulls_first(true);
2154        let field = SortField::new_with_options(d.clone(), options);
2155        let converter = RowConverter::new(vec![field]).unwrap();
2156        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2157
2158        assert!(rows.row(0) > rows.row(1));
2159        assert!(rows.row(1) > rows.row(2));
2160        assert!(rows.row(2) > rows.row(3));
2161        assert!(rows.row(4) > rows.row(0));
2162        assert!(rows.row(4) > rows.row(1));
2163
2164        let back = converter.convert_rows(&rows).unwrap();
2165        assert_eq!(back.len(), 1);
2166        back[0].to_data().validate_full().unwrap();
2167        assert_eq!(&back[0], &list);
2168
2169        let options = SortOptions::default().desc().with_nulls_first(false);
2170        let field = SortField::new_with_options(d, options);
2171        let converter = RowConverter::new(vec![field]).unwrap();
2172        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2173
2174        assert!(rows.row(0) < rows.row(1));
2175        assert!(rows.row(1) < rows.row(2));
2176        assert!(rows.row(2) < rows.row(3));
2177        assert!(rows.row(4) > rows.row(0));
2178        assert!(rows.row(4) < rows.row(1));
2179
2180        let back = converter.convert_rows(&rows).unwrap();
2181        assert_eq!(back.len(), 1);
2182        back[0].to_data().validate_full().unwrap();
2183        assert_eq!(&back[0], &list);
2184    }
2185
2186    #[test]
2187    fn test_list() {
2188        test_single_list::<i32>();
2189        test_nested_list::<i32>();
2190    }
2191
2192    #[test]
2193    fn test_large_list() {
2194        test_single_list::<i64>();
2195        test_nested_list::<i64>();
2196    }
2197
2198    fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
2199    where
2200        K: ArrowPrimitiveType,
2201        StandardUniform: Distribution<K::Native>,
2202    {
2203        let mut rng = rng();
2204        (0..len)
2205            .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
2206            .collect()
2207    }
2208
2209    fn generate_strings<O: OffsetSizeTrait>(
2210        len: usize,
2211        valid_percent: f64,
2212    ) -> GenericStringArray<O> {
2213        let mut rng = rng();
2214        (0..len)
2215            .map(|_| {
2216                rng.random_bool(valid_percent).then(|| {
2217                    let len = rng.random_range(0..100);
2218                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2219                    String::from_utf8(bytes).unwrap()
2220                })
2221            })
2222            .collect()
2223    }
2224
2225    fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
2226        let mut rng = rng();
2227        (0..len)
2228            .map(|_| {
2229                rng.random_bool(valid_percent).then(|| {
2230                    let len = rng.random_range(0..100);
2231                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2232                    String::from_utf8(bytes).unwrap()
2233                })
2234            })
2235            .collect()
2236    }
2237
2238    fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
2239        let mut rng = rng();
2240        (0..len)
2241            .map(|_| {
2242                rng.random_bool(valid_percent).then(|| {
2243                    let len = rng.random_range(0..100);
2244                    let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
2245                    bytes
2246                })
2247            })
2248            .collect()
2249    }
2250
2251    fn generate_dictionary<K>(
2252        values: ArrayRef,
2253        len: usize,
2254        valid_percent: f64,
2255    ) -> DictionaryArray<K>
2256    where
2257        K: ArrowDictionaryKeyType,
2258        K::Native: SampleUniform,
2259    {
2260        let mut rng = rng();
2261        let min_key = K::Native::from_usize(0).unwrap();
2262        let max_key = K::Native::from_usize(values.len()).unwrap();
2263        let keys: PrimitiveArray<K> = (0..len)
2264            .map(|_| {
2265                rng.random_bool(valid_percent)
2266                    .then(|| rng.random_range(min_key..max_key))
2267            })
2268            .collect();
2269
2270        let data_type =
2271            DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
2272
2273        let data = keys
2274            .into_data()
2275            .into_builder()
2276            .data_type(data_type)
2277            .add_child_data(values.to_data())
2278            .build()
2279            .unwrap();
2280
2281        DictionaryArray::from(data)
2282    }
2283
2284    fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
2285        let mut rng = rng();
2286        let width = rng.random_range(0..20);
2287        let mut builder = FixedSizeBinaryBuilder::new(width);
2288
2289        let mut b = vec![0; width as usize];
2290        for _ in 0..len {
2291            match rng.random_bool(valid_percent) {
2292                true => {
2293                    b.iter_mut().for_each(|x| *x = rng.random());
2294                    builder.append_value(&b).unwrap();
2295                }
2296                false => builder.append_null(),
2297            }
2298        }
2299
2300        builder.finish()
2301    }
2302
2303    fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
2304        let mut rng = rng();
2305        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2306        let a = generate_primitive_array::<Int32Type>(len, valid_percent);
2307        let b = generate_strings::<i32>(len, valid_percent);
2308        let fields = Fields::from(vec![
2309            Field::new("a", DataType::Int32, true),
2310            Field::new("b", DataType::Utf8, true),
2311        ]);
2312        let values = vec![Arc::new(a) as _, Arc::new(b) as _];
2313        StructArray::new(fields, values, Some(nulls))
2314    }
2315
2316    fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
2317    where
2318        F: FnOnce(usize) -> ArrayRef,
2319    {
2320        let mut rng = rng();
2321        let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
2322        let values_len = offsets.last().unwrap().to_usize().unwrap();
2323        let values = values(values_len);
2324        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2325        let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
2326        ListArray::new(field, offsets, values, Some(nulls))
2327    }
2328
2329    fn generate_column(len: usize) -> ArrayRef {
2330        let mut rng = rng();
2331        match rng.random_range(0..16) {
2332            0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
2333            1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
2334            2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
2335            3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
2336            4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
2337            5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
2338            6 => Arc::new(generate_strings::<i32>(len, 0.8)),
2339            7 => Arc::new(generate_dictionary::<Int64Type>(
2340                // Cannot test dictionaries containing null values because of #2687
2341                Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
2342                len,
2343                0.8,
2344            )),
2345            8 => Arc::new(generate_dictionary::<Int64Type>(
2346                // Cannot test dictionaries containing null values because of #2687
2347                Arc::new(generate_primitive_array::<Int64Type>(
2348                    rng.random_range(1..len),
2349                    1.0,
2350                )),
2351                len,
2352                0.8,
2353            )),
2354            9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
2355            10 => Arc::new(generate_struct(len, 0.8)),
2356            11 => Arc::new(generate_list(len, 0.8, |values_len| {
2357                Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
2358            })),
2359            12 => Arc::new(generate_list(len, 0.8, |values_len| {
2360                Arc::new(generate_strings::<i32>(values_len, 0.8))
2361            })),
2362            13 => Arc::new(generate_list(len, 0.8, |values_len| {
2363                Arc::new(generate_struct(values_len, 0.8))
2364            })),
2365            14 => Arc::new(generate_string_view(len, 0.8)),
2366            15 => Arc::new(generate_byte_view(len, 0.8)),
2367            _ => unreachable!(),
2368        }
2369    }
2370
2371    fn print_row(cols: &[SortColumn], row: usize) -> String {
2372        let t: Vec<_> = cols
2373            .iter()
2374            .map(|x| match x.values.is_valid(row) {
2375                true => {
2376                    let opts = FormatOptions::default().with_null("NULL");
2377                    let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
2378                    formatter.value(row).to_string()
2379                }
2380                false => "NULL".to_string(),
2381            })
2382            .collect();
2383        t.join(",")
2384    }
2385
2386    fn print_col_types(cols: &[SortColumn]) -> String {
2387        let t: Vec<_> = cols
2388            .iter()
2389            .map(|x| x.values.data_type().to_string())
2390            .collect();
2391        t.join(",")
2392    }
2393
2394    #[test]
2395    #[cfg_attr(miri, ignore)]
2396    fn fuzz_test() {
2397        for _ in 0..100 {
2398            let mut rng = rng();
2399            let num_columns = rng.random_range(1..5);
2400            let len = rng.random_range(5..100);
2401            let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
2402
2403            let options: Vec<_> = (0..num_columns)
2404                .map(|_| SortOptions {
2405                    descending: rng.random_bool(0.5),
2406                    nulls_first: rng.random_bool(0.5),
2407                })
2408                .collect();
2409
2410            let sort_columns: Vec<_> = options
2411                .iter()
2412                .zip(&arrays)
2413                .map(|(o, c)| SortColumn {
2414                    values: Arc::clone(c),
2415                    options: Some(*o),
2416                })
2417                .collect();
2418
2419            let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
2420
2421            let columns: Vec<SortField> = options
2422                .into_iter()
2423                .zip(&arrays)
2424                .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
2425                .collect();
2426
2427            let converter = RowConverter::new(columns).unwrap();
2428            let rows = converter.convert_columns(&arrays).unwrap();
2429
2430            for i in 0..len {
2431                for j in 0..len {
2432                    let row_i = rows.row(i);
2433                    let row_j = rows.row(j);
2434                    let row_cmp = row_i.cmp(&row_j);
2435                    let lex_cmp = comparator.compare(i, j);
2436                    assert_eq!(
2437                        row_cmp,
2438                        lex_cmp,
2439                        "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
2440                        print_row(&sort_columns, i),
2441                        print_row(&sort_columns, j),
2442                        row_i,
2443                        row_j,
2444                        print_col_types(&sort_columns)
2445                    );
2446                }
2447            }
2448
2449            let back = converter.convert_rows(&rows).unwrap();
2450            for (actual, expected) in back.iter().zip(&arrays) {
2451                actual.to_data().validate_full().unwrap();
2452                dictionary_eq(actual, expected)
2453            }
2454
2455            // Check that we can convert
2456            let rows = rows.try_into_binary().expect("reasonable size");
2457            let parser = converter.parser();
2458            let back = converter
2459                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
2460                .unwrap();
2461            for (actual, expected) in back.iter().zip(&arrays) {
2462                actual.to_data().validate_full().unwrap();
2463                dictionary_eq(actual, expected)
2464            }
2465
2466            let rows = converter.from_binary(rows);
2467            let back = converter.convert_rows(&rows).unwrap();
2468            for (actual, expected) in back.iter().zip(&arrays) {
2469                actual.to_data().validate_full().unwrap();
2470                dictionary_eq(actual, expected)
2471            }
2472        }
2473    }
2474
2475    #[test]
2476    fn test_clear() {
2477        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2478        let mut rows = converter.empty_rows(3, 128);
2479
2480        let first = Int32Array::from(vec![None, Some(2), Some(4)]);
2481        let second = Int32Array::from(vec![Some(2), None, Some(4)]);
2482        let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
2483
2484        for array in arrays.iter() {
2485            rows.clear();
2486            converter.append(&mut rows, &[array.clone()]).unwrap();
2487            let back = converter.convert_rows(&rows).unwrap();
2488            assert_eq!(&back[0], array);
2489        }
2490
2491        let mut rows_expected = converter.empty_rows(3, 128);
2492        converter.append(&mut rows_expected, &arrays[1..]).unwrap();
2493
2494        for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
2495            assert_eq!(
2496                actual, expected,
2497                "For row {}: expected {:?}, actual: {:?}",
2498                i, expected, actual
2499            );
2500        }
2501    }
2502
2503    #[test]
2504    fn test_append_codec_dictionary_binary() {
2505        use DataType::*;
2506        // Dictionary RowConverter
2507        let converter = RowConverter::new(vec![SortField::new(Dictionary(
2508            Box::new(Int32),
2509            Box::new(Binary),
2510        ))])
2511        .unwrap();
2512        let mut rows = converter.empty_rows(4, 128);
2513
2514        let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
2515        let values = BinaryArray::from(vec![
2516            Some("a".as_bytes()),
2517            Some(b"b"),
2518            Some(b"c"),
2519            Some(b"d"),
2520        ]);
2521        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2522
2523        rows.clear();
2524        let array = Arc::new(dict_array) as ArrayRef;
2525        converter.append(&mut rows, &[array.clone()]).unwrap();
2526        let back = converter.convert_rows(&rows).unwrap();
2527
2528        dictionary_eq(&back[0], &array);
2529    }
2530
2531    #[test]
2532    fn test_list_prefix() {
2533        let mut a = ListBuilder::new(Int8Builder::new());
2534        a.append_value([None]);
2535        a.append_value([None, None]);
2536        let a = a.finish();
2537
2538        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2539        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2540        assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
2541    }
2542}